diff --git a/30kmoviedata.nt.gz b/30kmoviedata.nt.gz index 0120770..444d3d2 100644 Binary files a/30kmoviedata.nt.gz and b/30kmoviedata.nt.gz differ diff --git a/appengine/appengine.go b/appengine/appengine.go index c594af8..4631765 100644 --- a/appengine/appengine.go +++ b/appengine/appengine.go @@ -15,14 +15,12 @@ package cayleyappengine import ( - "os" - "github.com/barakmich/glog" "github.com/google/cayley/config" + "github.com/google/cayley/db" "github.com/google/cayley/graph" "github.com/google/cayley/http" - "github.com/google/cayley/nquads" _ "github.com/google/cayley/graph/memstore" ) @@ -32,37 +30,6 @@ func init() { cfg := config.ParseConfigFromFile("cayley_appengine.cfg") ts, _ := graph.NewTripleStore("memstore", "", nil) glog.Errorln(cfg) - LoadTriplesFromFileInto(ts, cfg.DatabasePath, cfg.LoadSize) + db.Load(ts, cfg, cfg.DatabasePath) http.SetupRoutes(ts, cfg) } - -func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) { - f, err := os.Open(tripleFile) - if err != nil { - glog.Fatalln("Couldn't open file", tripleFile) - } - - defer func() { - if err := f.Close(); err != nil { - glog.Fatalln(err) - } - }() - - nquads.ReadNQuadsFromReader(c, f) -} - -func LoadTriplesFromFileInto(ts graph.TripleStore, filename string, loadSize int) { - tChan := make(chan *graph.Triple) - go ReadTriplesFromFile(tChan, filename) - tripleblock := make([]*graph.Triple, loadSize) - i := 0 - for t := range tChan { - tripleblock[i] = t - i++ - if i == loadSize { - ts.AddTripleSet(tripleblock) - i = 0 - } - } - ts.AddTripleSet(tripleblock[0:i]) -} diff --git a/cayley.go b/cayley.go index d5dea02..24af7d3 100644 --- a/cayley.go +++ b/cayley.go @@ -58,37 +58,62 @@ func main() { Usage() os.Exit(1) } + cmd := os.Args[1] newargs := make([]string, 0) newargs = append(newargs, os.Args[0]) newargs = append(newargs, os.Args[2:]...) os.Args = newargs flag.Parse() - var ts graph.TripleStore + cfg := config.ParseConfigFromFlagsAndFile(*configFile) + if os.Getenv("GOMAXPROCS") == "" { runtime.GOMAXPROCS(runtime.NumCPU()) glog.Infoln("Setting GOMAXPROCS to", runtime.NumCPU()) } else { glog.Infoln("GOMAXPROCS currently", os.Getenv("GOMAXPROCS"), " -- not adjusting") } + + var ( + ts graph.TripleStore + err error + ) switch cmd { case "init": - db.Init(cfg, *tripleFile) + err = db.Init(cfg, *tripleFile) case "load": - ts = db.Open(cfg) - db.Load(ts, cfg, *tripleFile) + ts, err = db.Open(cfg) + if err != nil { + break + } + err = db.Load(ts, cfg, *tripleFile) + if err != nil { + break + } ts.Close() case "repl": - ts = db.Open(cfg) - db.Repl(ts, *queryLanguage, cfg) + ts, err = db.Open(cfg) + if err != nil { + break + } + err = db.Repl(ts, *queryLanguage, cfg) + if err != nil { + break + } ts.Close() case "http": - ts = db.Open(cfg) + ts, err = db.Open(cfg) + if err != nil { + break + } http.Serve(ts, cfg) ts.Close() default: fmt.Println("No command", cmd) flag.Usage() } + if err != nil { + glog.Fatalln(err) + } } diff --git a/db/init.go b/db/init.go index a8b3897..a791a8f 100644 --- a/db/init.go +++ b/db/init.go @@ -19,16 +19,21 @@ import ( "github.com/google/cayley/graph" ) -func Init(cfg *config.Config, triplePath string) bool { +func Init(cfg *config.Config, triplePath string) error { err := graph.InitTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions) if err != nil { - return false + return err } - if triplePath != "" { - ts := Open(cfg) - Load(ts, cfg, triplePath) + ts, err := Open(cfg) + if err != nil { + return err + } + err = Load(ts, cfg, triplePath) + if err != nil { + return err + } ts.Close() } - return true + return err } diff --git a/db/load.go b/db/load.go index d81ee35..457229e 100644 --- a/db/load.go +++ b/db/load.go @@ -15,58 +15,54 @@ package db import ( + "fmt" + "io" "os" - "github.com/barakmich/glog" - "github.com/google/cayley/config" "github.com/google/cayley/graph" "github.com/google/cayley/nquads" ) -func Load(ts graph.TripleStore, cfg *config.Config, triplePath string) { - tChan := make(chan *graph.Triple) - go ReadTriplesFromFile(tChan, triplePath) +func Load(ts graph.TripleStore, cfg *config.Config, path string) error { + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("could not open file %q: %v", path, err) + } + defer f.Close() + + dec := nquads.NewDecoder(f) bulker, canBulk := ts.(graph.BulkLoader) if canBulk { - err := bulker.BulkLoad(tChan) + err = bulker.BulkLoad(dec) if err == nil { - return + return nil } - if err != graph.ErrCannotBulkLoad { - glog.Errorln("Error attempting to bulk load: ", err) + if err == graph.ErrCannotBulkLoad { + err = nil } } - - LoadTriplesInto(tChan, ts, cfg.LoadSize) -} - -func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) { - f, err := os.Open(tripleFile) if err != nil { - glog.Fatalln("Couldn't open file", tripleFile) + return err } - defer func() { - if err := f.Close(); err != nil { - glog.Fatalln(err) + block := make([]*graph.Triple, 0, cfg.LoadSize) + for { + t, err := dec.Unmarshal() + if err != nil { + if err == io.EOF { + break + } + return err } - }() - - nquads.ReadNQuadsFromReader(c, f) -} - -func LoadTriplesInto(tChan chan *graph.Triple, ts graph.TripleStore, loadSize int) { - tripleblock := make([]*graph.Triple, loadSize) - i := 0 - for t := range tChan { - tripleblock[i] = t - i++ - if i == loadSize { - ts.AddTripleSet(tripleblock) - i = 0 + block = append(block, t) + if len(block) == cap(block) { + ts.AddTripleSet(block) + block = block[:0] } } - ts.AddTripleSet(tripleblock[0:i]) + ts.AddTripleSet(block) + + return nil } diff --git a/db/open.go b/db/open.go index 6b78e52..962f3be 100644 --- a/db/open.go +++ b/db/open.go @@ -21,17 +21,20 @@ import ( "github.com/google/cayley/graph" ) -func Open(cfg *config.Config) graph.TripleStore { +func Open(cfg *config.Config) (graph.TripleStore, error) { glog.Infof("Opening database \"%s\" at %s", cfg.DatabaseType, cfg.DatabasePath) ts, err := graph.NewTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions) if err != nil { - glog.Fatalln(err.Error()) + return nil, err } // Memstore is not persistent, so it MUST be loaded. if cfg.DatabaseType == "memstore" { - Load(ts, cfg, cfg.DatabasePath) + err = Load(ts, cfg, cfg.DatabasePath) + if err != nil { + return nil, err + } } - return ts + return ts, nil } diff --git a/db/repl.go b/db/repl.go index 563ff75..9b918ed 100644 --- a/db/repl.go +++ b/db/repl.go @@ -16,10 +16,11 @@ package db import ( "bufio" + "bytes" + "errors" "fmt" "io" "os" - "strings" "time" "github.com/google/cayley/config" @@ -60,7 +61,7 @@ func Run(query string, ses graph.Session) { } } -func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) { +func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) error { var ses graph.Session switch queryLanguage { case "sexp": @@ -72,72 +73,75 @@ func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) { default: ses = gremlin.NewSession(ts, cfg.GremlinTimeout, true) } - inputBf := bufio.NewReader(os.Stdin) - line := "" + buf := bufio.NewReader(os.Stdin) + var line []byte for { - if line == "" { + if len(line) == 0 { fmt.Print("cayley> ") } else { fmt.Print("... ") } - l, pre, err := inputBf.ReadLine() + l, prefix, err := buf.ReadLine() if err == io.EOF { - if line != "" { - line = "" + if len(line) != 0 { + line = line[:0] } else { - break + return nil } } if err != nil { - line = "" + line = line[:0] } - if pre { - panic("Line too long") + if prefix { + return errors.New("line too long") } - line += string(l) - if line == "" { + line = append(line, l...) + if len(line) == 0 { continue } - if strings.HasPrefix(line, ":debug") { + if bytes.HasPrefix(line, []byte(":debug")) { ses.ToggleDebug() fmt.Println("Debug Toggled") - line = "" + line = line[:0] continue } - if strings.HasPrefix(line, ":a") { + if bytes.HasPrefix(line, []byte(":a")) { var tripleStmt = line[3:] - triple := nquads.Parse(tripleStmt) + triple, err := nquads.Parse(string(tripleStmt)) if triple == nil { - fmt.Println("Not a valid triple.") - line = "" + if err != nil { + fmt.Printf("not a valid triple: %v\n", err) + } + line = line[:0] continue } ts.AddTriple(triple) - line = "" + line = line[:0] continue } - if strings.HasPrefix(line, ":d") { + if bytes.HasPrefix(line, []byte(":d")) { var tripleStmt = line[3:] - triple := nquads.Parse(tripleStmt) + triple, err := nquads.Parse(string(tripleStmt)) if triple == nil { - fmt.Println("Not a valid triple.") - line = "" + if err != nil { + fmt.Printf("not a valid triple: %v\n", err) + } + line = line[:0] continue } ts.RemoveTriple(triple) - line = "" + line = line[:0] continue } - result, err := ses.InputParses(line) + result, err := ses.InputParses(string(line)) switch result { case graph.Parsed: - Run(line, ses) - line = "" + Run(string(line), ses) + line = line[:0] case graph.ParseFail: fmt.Println("Error: ", err) - line = "" + line = line[:0] case graph.ParseMore: - default: } } } diff --git a/graph/triplestore.go b/graph/triplestore.go index 9690bdb..a640b1d 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -122,10 +122,14 @@ func (d Options) StringKey(key string) (string, bool) { var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load") type BulkLoader interface { - // BulkLoad loads Triples from a channel in bulk to the TripleStore. It - // returns ErrCannotBulkLoad if bulk loading is not possible (i.e. if you - // cannot load in bulk to a non-empty database, and the db is non-empty) - BulkLoad(chan *Triple) error + // BulkLoad loads Triples from a TripleUnmarshaler in bulk to the TripleStore. + // It returns ErrCannotBulkLoad if bulk loading is not possible. For example if + // you cannot load in bulk to a non-empty database, and the db is non-empty. + BulkLoad(TripleUnmarshaler) error +} + +type TripleUnmarshaler interface { + Unmarshal() (*Triple, error) } type NewStoreFunc func(string, Options) (TripleStore, error) diff --git a/http/write.go b/http/write.go index 959875c..cb1851e 100644 --- a/http/write.go +++ b/http/write.go @@ -17,6 +17,7 @@ package http import ( "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "strconv" @@ -77,22 +78,32 @@ func (api *Api) ServeV1WriteNQuad(w http.ResponseWriter, r *http.Request, params blockSize = int64(api.config.LoadSize) } - tChan := make(chan *graph.Triple) - go nquads.ReadNQuadsFromReader(tChan, formFile) - tripleblock := make([]*graph.Triple, blockSize) - nTriples := 0 - i := int64(0) - for t := range tChan { - tripleblock[i] = t - i++ - nTriples++ - if i == blockSize { - api.ts.AddTripleSet(tripleblock) - i = 0 + dec := nquads.NewDecoder(formFile) + + var ( + n int + + block = make([]*graph.Triple, 0, blockSize) + ) + for { + t, err := dec.Unmarshal() + if err != nil { + if err == io.EOF { + break + } + panic("what can do this here?") // FIXME(kortschak) + } + block = append(block, t) + n++ + if len(block) == cap(block) { + api.ts.AddTripleSet(block) + block = block[:0] } } - api.ts.AddTripleSet(tripleblock[0:i]) - fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", nTriples) + api.ts.AddTripleSet(block) + + fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", n) + return 200 } diff --git a/nquads/nquads.go b/nquads/nquads.go index 64b0d53..9127e1e 100644 --- a/nquads/nquads.go +++ b/nquads/nquads.go @@ -16,109 +16,112 @@ package nquads import ( "bufio" + "errors" + "fmt" "io" "strings" - "github.com/barakmich/glog" - "github.com/google/cayley/graph" ) -func isWhitespace(s uint8) bool { - return (s == '\t' || s == '\r' || s == ' ') -} -func Parse(str string) *graph.Triple { +var ( + ErrAbsentSubject = errors.New("absent subject") + ErrAbsentPredicate = errors.New("absent predicate") + ErrAbsentObject = errors.New("absent object") + ErrUnterminated = errors.New("unterminated quad") +) + +func Parse(str string) (*graph.Triple, error) { // Skip leading whitespace. - str = skipWhitespace(str) + str = trimSpace(str) // Check for a comment if str != "" && str[0] == '#' { - return nil + return nil, nil } sub, remainder := getTripleComponent(str) - if sub == nil { - return nil + if sub == "" { + return nil, ErrAbsentSubject } - str = skipWhitespace(remainder) + str = trimSpace(remainder) pred, remainder := getTripleComponent(str) - if pred == nil { - return nil + if pred == "" { + return nil, ErrAbsentPredicate } - str = skipWhitespace(remainder) + str = trimSpace(remainder) obj, remainder := getTripleComponent(str) - if obj == nil { - return nil + if obj == "" { + return nil, ErrAbsentObject } - str = skipWhitespace(remainder) - prov_ptr, remainder := getTripleComponent(str) - var prov string - if prov_ptr == nil { - prov = "" - } else { - prov = *prov_ptr - } - str = skipWhitespace(remainder) + str = trimSpace(remainder) + prov, remainder := getTripleComponent(str) + str = trimSpace(remainder) if str != "" && str[0] == '.' { - return &graph.Triple{*sub, *pred, *obj, prov} + return &graph.Triple{sub, pred, obj, prov}, nil } - return nil + return nil, ErrUnterminated } -func skipWhitespace(str string) string { +func isSpace(s uint8) bool { + return s == ' ' || s == '\t' || s == '\r' +} + +func trimSpace(str string) string { i := 0 - for i < len(str) && isWhitespace(str[i]) { + for i < len(str) && isSpace(str[i]) { i += 1 } return str[i:] } -func getTripleComponent(str string) (*string, string) { +func getTripleComponent(str string) (head, tail string) { if len(str) == 0 { - return nil, str + return "", str } if str[0] == '<' { return getUriPart(str[1:]) } else if str[0] == '"' { return getQuotedPart(str[1:]) } else if str[0] == '.' { - return nil, str + return "", str } else { // Technically not part of the spec. But we do it anyway for convenience. return getUnquotedPart(str) } } -func getUriPart(str string) (*string, string) { +func getUriPart(str string) (head, tail string) { i := 0 for i < len(str) && str[i] != '>' { i += 1 } if i == len(str) { - return nil, str + return "", str } - part := str[0:i] - return &part, str[i+1:] + head = str[0:i] + return head, str[i+1:] } -func getQuotedPart(str string) (*string, string) { - i := 0 - start := 0 - out := "" +func getQuotedPart(str string) (head, tail string) { + var ( + i int + start int + ) for i < len(str) && str[i] != '"' { if str[i] == '\\' { - out += str[start:i] + head += str[start:i] switch str[i+1] { case '\\': - out += "\\" + head += "\\" case 'r': - out += "\r" + head += "\r" case 'n': - out += "\n" + head += "\n" case 't': - out += "\t" + head += "\t" case '"': - out += "\"" + head += "\"" default: - return nil, str + return "", str } i += 2 start = i @@ -127,70 +130,74 @@ func getQuotedPart(str string) (*string, string) { i += 1 } if i == len(str) { - return nil, str + return "", str } - out += str[start:i] + head += str[start:i] i += 1 - var remainder string - if strings.HasPrefix(str[i:], "^^<") { + switch { + case strings.HasPrefix(str[i:], "^^<"): // Ignore type, for now - _, remainder = getUriPart(str[i+3:]) - } else if strings.HasPrefix(str[i:], "@") { - _, remainder = getUnquotedPart(str[i+1:]) - } else { - remainder = str[i:] + _, tail = getUriPart(str[i+3:]) + case str[i] == '@': + _, tail = getUnquotedPart(str[i+1:]) + default: + tail = str[i:] } - return &out, remainder + return head, tail } -func getUnquotedPart(str string) (*string, string) { - i := 0 - initStr := str - out := "" - start := 0 - for i < len(str) && !isWhitespace(str[i]) { +func getUnquotedPart(str string) (head, tail string) { + var ( + i int + initStr = str + start int + ) + for i < len(str) && !isSpace(str[i]) { if str[i] == '"' { part, remainder := getQuotedPart(str[i+1:]) - if part == nil { + if part == "" { return part, initStr } - out += str[start:i] + head += str[start:i] str = remainder i = 0 start = 0 - out += *part + head += part } i += 1 } - out += str[start:i] - return &out, str[i:] + head += str[start:i] + return head, str[i:] } -func ReadNQuadsFromReader(c chan *graph.Triple, reader io.Reader) { - bf := bufio.NewReader(reader) +type Decoder struct { + r *bufio.Reader + line []byte +} - nTriples := 0 - line := "" +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{r: bufio.NewReader(r)} +} + +func (dec *Decoder) Unmarshal() (*graph.Triple, error) { + dec.line = dec.line[:0] for { - l, pre, err := bf.ReadLine() - if err == io.EOF { + l, pre, err := dec.r.ReadLine() + if err != nil { + return nil, err + } + dec.line = append(dec.line, l...) + if !pre { break } - if err != nil { - glog.Fatalln("Something bad happened while reading file " + err.Error()) - } - line += string(l) - if pre { - continue - } - triple := Parse(line) - line = "" - if triple != nil { - nTriples++ - c <- triple - } } - glog.Infoln("Read", nTriples, "triples") - close(c) + triple, err := Parse(string(dec.line)) + if err != nil { + return nil, fmt.Errorf("failed to parse %q: %v", dec.line, err) + } + if triple == nil { + return dec.Unmarshal() + } + return triple, nil } diff --git a/nquads/nquads_test.go b/nquads/nquads_test.go index 3fdfc22..8b2fa6b 100644 --- a/nquads/nquads_test.go +++ b/nquads/nquads_test.go @@ -25,17 +25,26 @@ var testNTriples = []struct { message string input string expect *graph.Triple + err error }{ // NTriple tests. { message: "not parse invalid triples", input: "invalid", expect: nil, + err: ErrAbsentPredicate, + }, + { + message: "invalid internal quote", + input: `":103032" "/film/performance/character" "Walter "Teacher" Cole" .`, + expect: nil, + err: ErrUnterminated, }, { message: "not parse comments", input: "# nominally valid triple .", expect: nil, + err: nil, }, { message: "parse simple triples", @@ -110,7 +119,10 @@ var testNTriples = []struct { func TestParse(t *testing.T) { for _, test := range testNTriples { - got := Parse(test.input) + got, err := Parse(test.input) + if err != test.err { + t.Errorf("Unexpected error when %s: got:%v expect:%v", test.message, err, test.err) + } if !reflect.DeepEqual(got, test.expect) { t.Errorf("Failed to %s, %q, got:%q expect:%q", test.message, test.input, got, test.expect) } @@ -121,6 +133,6 @@ var result *graph.Triple func BenchmarkParser(b *testing.B) { for n := 0; n < b.N; n++ { - result = Parse(" \"object of some real\\tlength\"@en . # comment") + result, _ = Parse(" \"object of some real\\tlength\"@en . # comment") } }