Merge pull request #75 from kortschak/parse

Use error returns and interface type for parsing
This commit is contained in:
Barak Michener 2014-07-22 20:19:52 -04:00
commit be26f0faf6
11 changed files with 269 additions and 235 deletions

Binary file not shown.

View file

@ -15,14 +15,12 @@
package cayleyappengine
import (
"os"
"github.com/barakmich/glog"
"github.com/google/cayley/config"
"github.com/google/cayley/db"
"github.com/google/cayley/graph"
"github.com/google/cayley/http"
"github.com/google/cayley/nquads"
_ "github.com/google/cayley/graph/memstore"
)
@ -32,37 +30,6 @@ func init() {
cfg := config.ParseConfigFromFile("cayley_appengine.cfg")
ts, _ := graph.NewTripleStore("memstore", "", nil)
glog.Errorln(cfg)
LoadTriplesFromFileInto(ts, cfg.DatabasePath, cfg.LoadSize)
db.Load(ts, cfg, cfg.DatabasePath)
http.SetupRoutes(ts, cfg)
}
func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) {
f, err := os.Open(tripleFile)
if err != nil {
glog.Fatalln("Couldn't open file", tripleFile)
}
defer func() {
if err := f.Close(); err != nil {
glog.Fatalln(err)
}
}()
nquads.ReadNQuadsFromReader(c, f)
}
func LoadTriplesFromFileInto(ts graph.TripleStore, filename string, loadSize int) {
tChan := make(chan *graph.Triple)
go ReadTriplesFromFile(tChan, filename)
tripleblock := make([]*graph.Triple, loadSize)
i := 0
for t := range tChan {
tripleblock[i] = t
i++
if i == loadSize {
ts.AddTripleSet(tripleblock)
i = 0
}
}
ts.AddTripleSet(tripleblock[0:i])
}

View file

@ -58,37 +58,62 @@ func main() {
Usage()
os.Exit(1)
}
cmd := os.Args[1]
newargs := make([]string, 0)
newargs = append(newargs, os.Args[0])
newargs = append(newargs, os.Args[2:]...)
os.Args = newargs
flag.Parse()
var ts graph.TripleStore
cfg := config.ParseConfigFromFlagsAndFile(*configFile)
if os.Getenv("GOMAXPROCS") == "" {
runtime.GOMAXPROCS(runtime.NumCPU())
glog.Infoln("Setting GOMAXPROCS to", runtime.NumCPU())
} else {
glog.Infoln("GOMAXPROCS currently", os.Getenv("GOMAXPROCS"), " -- not adjusting")
}
var (
ts graph.TripleStore
err error
)
switch cmd {
case "init":
db.Init(cfg, *tripleFile)
err = db.Init(cfg, *tripleFile)
case "load":
ts = db.Open(cfg)
db.Load(ts, cfg, *tripleFile)
ts, err = db.Open(cfg)
if err != nil {
break
}
err = db.Load(ts, cfg, *tripleFile)
if err != nil {
break
}
ts.Close()
case "repl":
ts = db.Open(cfg)
db.Repl(ts, *queryLanguage, cfg)
ts, err = db.Open(cfg)
if err != nil {
break
}
err = db.Repl(ts, *queryLanguage, cfg)
if err != nil {
break
}
ts.Close()
case "http":
ts = db.Open(cfg)
ts, err = db.Open(cfg)
if err != nil {
break
}
http.Serve(ts, cfg)
ts.Close()
default:
fmt.Println("No command", cmd)
flag.Usage()
}
if err != nil {
glog.Fatalln(err)
}
}

View file

@ -19,16 +19,21 @@ import (
"github.com/google/cayley/graph"
)
func Init(cfg *config.Config, triplePath string) bool {
func Init(cfg *config.Config, triplePath string) error {
err := graph.InitTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
if err != nil {
return false
return err
}
if triplePath != "" {
ts := Open(cfg)
Load(ts, cfg, triplePath)
ts, err := Open(cfg)
if err != nil {
return err
}
err = Load(ts, cfg, triplePath)
if err != nil {
return err
}
ts.Close()
}
return true
return err
}

View file

@ -18,52 +18,62 @@ import (
"bytes"
"compress/bzip2"
"compress/gzip"
"fmt"
"io"
"os"
"github.com/barakmich/glog"
"github.com/google/cayley/config"
"github.com/google/cayley/graph"
"github.com/google/cayley/nquads"
)
func Load(ts graph.TripleStore, cfg *config.Config, triplePath string) {
tChan := make(chan *graph.Triple)
go ReadTriplesFromFile(tChan, triplePath)
bulker, canBulk := ts.(graph.BulkLoader)
if canBulk {
err := bulker.BulkLoad(tChan)
if err == nil {
return
}
if err != graph.ErrCannotBulkLoad {
glog.Errorln("Error attempting to bulk load: ", err)
}
}
LoadTriplesInto(tChan, ts, cfg.LoadSize)
}
func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) {
f, err := os.Open(tripleFile)
func Load(ts graph.TripleStore, cfg *config.Config, path string) error {
f, err := os.Open(path)
if err != nil {
glog.Fatalln("Couldn't open file", tripleFile)
return fmt.Errorf("could not open file %q: %v", path, err)
}
defer func() {
if err := f.Close(); err != nil {
glog.Fatalln(err)
}
}()
defer f.Close()
r, err := decompressor(f)
if err != nil {
glog.Fatalln(err)
}
nquads.ReadNQuadsFromReader(c, r)
dec := nquads.NewDecoder(r)
bulker, canBulk := ts.(graph.BulkLoader)
if canBulk {
err = bulker.BulkLoad(dec)
if err == nil {
return nil
}
if err == graph.ErrCannotBulkLoad {
err = nil
}
}
if err != nil {
return err
}
block := make([]*graph.Triple, 0, cfg.LoadSize)
for {
t, err := dec.Unmarshal()
if err != nil {
if err == io.EOF {
break
}
return err
}
block = append(block, t)
if len(block) == cap(block) {
ts.AddTripleSet(block)
block = block[:0]
}
}
ts.AddTripleSet(block)
return nil
}
const (
@ -91,17 +101,3 @@ func decompressor(r readAtReader) (io.Reader, error) {
return r, nil
}
}
func LoadTriplesInto(tChan chan *graph.Triple, ts graph.TripleStore, loadSize int) {
tripleblock := make([]*graph.Triple, loadSize)
i := 0
for t := range tChan {
tripleblock[i] = t
i++
if i == loadSize {
ts.AddTripleSet(tripleblock)
i = 0
}
}
ts.AddTripleSet(tripleblock[0:i])
}

View file

@ -21,17 +21,20 @@ import (
"github.com/google/cayley/graph"
)
func Open(cfg *config.Config) graph.TripleStore {
func Open(cfg *config.Config) (graph.TripleStore, error) {
glog.Infof("Opening database \"%s\" at %s", cfg.DatabaseType, cfg.DatabasePath)
ts, err := graph.NewTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
if err != nil {
glog.Fatalln(err.Error())
return nil, err
}
// Memstore is not persistent, so it MUST be loaded.
if cfg.DatabaseType == "memstore" {
Load(ts, cfg, cfg.DatabasePath)
err = Load(ts, cfg, cfg.DatabasePath)
if err != nil {
return nil, err
}
}
return ts
return ts, nil
}

View file

@ -16,10 +16,11 @@ package db
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/google/cayley/config"
@ -60,7 +61,7 @@ func Run(query string, ses graph.Session) {
}
}
func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) {
func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) error {
var ses graph.Session
switch queryLanguage {
case "sexp":
@ -72,72 +73,75 @@ func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) {
default:
ses = gremlin.NewSession(ts, cfg.GremlinTimeout, true)
}
inputBf := bufio.NewReader(os.Stdin)
line := ""
buf := bufio.NewReader(os.Stdin)
var line []byte
for {
if line == "" {
if len(line) == 0 {
fmt.Print("cayley> ")
} else {
fmt.Print("... ")
}
l, pre, err := inputBf.ReadLine()
l, prefix, err := buf.ReadLine()
if err == io.EOF {
if line != "" {
line = ""
if len(line) != 0 {
line = line[:0]
} else {
break
return nil
}
}
if err != nil {
line = ""
line = line[:0]
}
if pre {
panic("Line too long")
if prefix {
return errors.New("line too long")
}
line += string(l)
if line == "" {
line = append(line, l...)
if len(line) == 0 {
continue
}
if strings.HasPrefix(line, ":debug") {
if bytes.HasPrefix(line, []byte(":debug")) {
ses.ToggleDebug()
fmt.Println("Debug Toggled")
line = ""
line = line[:0]
continue
}
if strings.HasPrefix(line, ":a") {
if bytes.HasPrefix(line, []byte(":a")) {
var tripleStmt = line[3:]
triple := nquads.Parse(tripleStmt)
triple, err := nquads.Parse(string(tripleStmt))
if triple == nil {
fmt.Println("Not a valid triple.")
line = ""
if err != nil {
fmt.Printf("not a valid triple: %v\n", err)
}
line = line[:0]
continue
}
ts.AddTriple(triple)
line = ""
line = line[:0]
continue
}
if strings.HasPrefix(line, ":d") {
if bytes.HasPrefix(line, []byte(":d")) {
var tripleStmt = line[3:]
triple := nquads.Parse(tripleStmt)
triple, err := nquads.Parse(string(tripleStmt))
if triple == nil {
fmt.Println("Not a valid triple.")
line = ""
if err != nil {
fmt.Printf("not a valid triple: %v\n", err)
}
line = line[:0]
continue
}
ts.RemoveTriple(triple)
line = ""
line = line[:0]
continue
}
result, err := ses.InputParses(line)
result, err := ses.InputParses(string(line))
switch result {
case graph.Parsed:
Run(line, ses)
line = ""
Run(string(line), ses)
line = line[:0]
case graph.ParseFail:
fmt.Println("Error: ", err)
line = ""
line = line[:0]
case graph.ParseMore:
default:
}
}
}

View file

@ -122,10 +122,14 @@ func (d Options) StringKey(key string) (string, bool) {
var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load")
type BulkLoader interface {
// BulkLoad loads Triples from a channel in bulk to the TripleStore. It
// returns ErrCannotBulkLoad if bulk loading is not possible (i.e. if you
// cannot load in bulk to a non-empty database, and the db is non-empty)
BulkLoad(chan *Triple) error
// BulkLoad loads Triples from a TripleUnmarshaler in bulk to the TripleStore.
// It returns ErrCannotBulkLoad if bulk loading is not possible. For example if
// you cannot load in bulk to a non-empty database, and the db is non-empty.
BulkLoad(TripleUnmarshaler) error
}
type TripleUnmarshaler interface {
Unmarshal() (*Triple, error)
}
type NewStoreFunc func(string, Options) (TripleStore, error)

View file

@ -17,6 +17,7 @@ package http
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
@ -77,22 +78,32 @@ func (api *Api) ServeV1WriteNQuad(w http.ResponseWriter, r *http.Request, params
blockSize = int64(api.config.LoadSize)
}
tChan := make(chan *graph.Triple)
go nquads.ReadNQuadsFromReader(tChan, formFile)
tripleblock := make([]*graph.Triple, blockSize)
nTriples := 0
i := int64(0)
for t := range tChan {
tripleblock[i] = t
i++
nTriples++
if i == blockSize {
api.ts.AddTripleSet(tripleblock)
i = 0
dec := nquads.NewDecoder(formFile)
var (
n int
block = make([]*graph.Triple, 0, blockSize)
)
for {
t, err := dec.Unmarshal()
if err != nil {
if err == io.EOF {
break
}
panic("what can do this here?") // FIXME(kortschak)
}
block = append(block, t)
n++
if len(block) == cap(block) {
api.ts.AddTripleSet(block)
block = block[:0]
}
}
api.ts.AddTripleSet(tripleblock[0:i])
fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", nTriples)
api.ts.AddTripleSet(block)
fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", n)
return 200
}

View file

@ -16,109 +16,112 @@ package nquads
import (
"bufio"
"errors"
"fmt"
"io"
"strings"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
)
func isWhitespace(s uint8) bool {
return (s == '\t' || s == '\r' || s == ' ')
}
func Parse(str string) *graph.Triple {
var (
ErrAbsentSubject = errors.New("nqauds: absent subject")
ErrAbsentPredicate = errors.New("nqauds: absent predicate")
ErrAbsentObject = errors.New("nqauds: absent object")
ErrUnterminated = errors.New("nqauds: unterminated quad")
)
func Parse(str string) (*graph.Triple, error) {
// Skip leading whitespace.
str = skipWhitespace(str)
str = trimSpace(str)
// Check for a comment
if str != "" && str[0] == '#' {
return nil
return nil, nil
}
sub, remainder := getTripleComponent(str)
if sub == nil {
return nil
if sub == "" {
return nil, ErrAbsentSubject
}
str = skipWhitespace(remainder)
str = trimSpace(remainder)
pred, remainder := getTripleComponent(str)
if pred == nil {
return nil
if pred == "" {
return nil, ErrAbsentPredicate
}
str = skipWhitespace(remainder)
str = trimSpace(remainder)
obj, remainder := getTripleComponent(str)
if obj == nil {
return nil
if obj == "" {
return nil, ErrAbsentObject
}
str = skipWhitespace(remainder)
prov_ptr, remainder := getTripleComponent(str)
var prov string
if prov_ptr == nil {
prov = ""
} else {
prov = *prov_ptr
}
str = skipWhitespace(remainder)
str = trimSpace(remainder)
prov, remainder := getTripleComponent(str)
str = trimSpace(remainder)
if str != "" && str[0] == '.' {
return &graph.Triple{*sub, *pred, *obj, prov}
return &graph.Triple{sub, pred, obj, prov}, nil
}
return nil
return nil, ErrUnterminated
}
func skipWhitespace(str string) string {
func isSpace(s uint8) bool {
return s == ' ' || s == '\t' || s == '\r'
}
func trimSpace(str string) string {
i := 0
for i < len(str) && isWhitespace(str[i]) {
for i < len(str) && isSpace(str[i]) {
i += 1
}
return str[i:]
}
func getTripleComponent(str string) (*string, string) {
func getTripleComponent(str string) (head, tail string) {
if len(str) == 0 {
return nil, str
return "", str
}
if str[0] == '<' {
return getUriPart(str[1:])
} else if str[0] == '"' {
return getQuotedPart(str[1:])
} else if str[0] == '.' {
return nil, str
return "", str
} else {
// Technically not part of the spec. But we do it anyway for convenience.
return getUnquotedPart(str)
}
}
func getUriPart(str string) (*string, string) {
func getUriPart(str string) (head, tail string) {
i := 0
for i < len(str) && str[i] != '>' {
i += 1
}
if i == len(str) {
return nil, str
return "", str
}
part := str[0:i]
return &part, str[i+1:]
head = str[0:i]
return head, str[i+1:]
}
func getQuotedPart(str string) (*string, string) {
i := 0
start := 0
out := ""
func getQuotedPart(str string) (head, tail string) {
var (
i int
start int
)
for i < len(str) && str[i] != '"' {
if str[i] == '\\' {
out += str[start:i]
head += str[start:i]
switch str[i+1] {
case '\\':
out += "\\"
head += "\\"
case 'r':
out += "\r"
head += "\r"
case 'n':
out += "\n"
head += "\n"
case 't':
out += "\t"
head += "\t"
case '"':
out += "\""
head += "\""
default:
return nil, str
return "", str
}
i += 2
start = i
@ -127,70 +130,74 @@ func getQuotedPart(str string) (*string, string) {
i += 1
}
if i == len(str) {
return nil, str
return "", str
}
out += str[start:i]
head += str[start:i]
i += 1
var remainder string
if strings.HasPrefix(str[i:], "^^<") {
switch {
case strings.HasPrefix(str[i:], "^^<"):
// Ignore type, for now
_, remainder = getUriPart(str[i+3:])
} else if strings.HasPrefix(str[i:], "@") {
_, remainder = getUnquotedPart(str[i+1:])
} else {
remainder = str[i:]
_, tail = getUriPart(str[i+3:])
case str[i] == '@':
_, tail = getUnquotedPart(str[i+1:])
default:
tail = str[i:]
}
return &out, remainder
return head, tail
}
func getUnquotedPart(str string) (*string, string) {
i := 0
initStr := str
out := ""
start := 0
for i < len(str) && !isWhitespace(str[i]) {
func getUnquotedPart(str string) (head, tail string) {
var (
i int
initStr = str
start int
)
for i < len(str) && !isSpace(str[i]) {
if str[i] == '"' {
part, remainder := getQuotedPart(str[i+1:])
if part == nil {
if part == "" {
return part, initStr
}
out += str[start:i]
head += str[start:i]
str = remainder
i = 0
start = 0
out += *part
head += part
}
i += 1
}
out += str[start:i]
return &out, str[i:]
head += str[start:i]
return head, str[i:]
}
func ReadNQuadsFromReader(c chan *graph.Triple, reader io.Reader) {
bf := bufio.NewReader(reader)
type Decoder struct {
r *bufio.Reader
line []byte
}
nTriples := 0
line := ""
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: bufio.NewReader(r)}
}
func (dec *Decoder) Unmarshal() (*graph.Triple, error) {
dec.line = dec.line[:0]
for {
l, pre, err := bf.ReadLine()
if err == io.EOF {
l, pre, err := dec.r.ReadLine()
if err != nil {
return nil, err
}
dec.line = append(dec.line, l...)
if !pre {
break
}
if err != nil {
glog.Fatalln("Something bad happened while reading file " + err.Error())
}
line += string(l)
if pre {
continue
}
triple := Parse(line)
line = ""
if triple != nil {
nTriples++
c <- triple
}
}
glog.Infoln("Read", nTriples, "triples")
close(c)
triple, err := Parse(string(dec.line))
if err != nil {
return nil, fmt.Errorf("failed to parse %q: %v", dec.line, err)
}
if triple == nil {
return dec.Unmarshal()
}
return triple, nil
}

View file

@ -25,17 +25,26 @@ var testNTriples = []struct {
message string
input string
expect *graph.Triple
err error
}{
// NTriple tests.
{
message: "not parse invalid triples",
input: "invalid",
expect: nil,
err: ErrAbsentPredicate,
},
{
message: "invalid internal quote",
input: `":103032" "/film/performance/character" "Walter "Teacher" Cole" .`,
expect: nil,
err: ErrUnterminated,
},
{
message: "not parse comments",
input: "# nominally valid triple .",
expect: nil,
err: nil,
},
{
message: "parse simple triples",
@ -110,7 +119,10 @@ var testNTriples = []struct {
func TestParse(t *testing.T) {
for _, test := range testNTriples {
got := Parse(test.input)
got, err := Parse(test.input)
if err != test.err {
t.Errorf("Unexpected error when %s: got:%v expect:%v", test.message, err, test.err)
}
if !reflect.DeepEqual(got, test.expect) {
t.Errorf("Failed to %s, %q, got:%q expect:%q", test.message, test.input, got, test.expect)
}
@ -121,6 +133,6 @@ var result *graph.Triple
func BenchmarkParser(b *testing.B) {
for n := 0; n < b.N; n++ {
result = Parse("<http://example/s> <http://example/p> \"object of some real\\tlength\"@en . # comment")
result, _ = Parse("<http://example/s> <http://example/p> \"object of some real\\tlength\"@en . # comment")
}
}