diff --git a/cayley.go b/cayley.go index 7e58749..0516da5 100644 --- a/cayley.go +++ b/cayley.go @@ -44,6 +44,9 @@ import ( _ "github.com/google/cayley/graph/leveldb" _ "github.com/google/cayley/graph/memstore" _ "github.com/google/cayley/graph/mongo" + + // Load writer registry + _ "github.com/google/cayley/writer" ) var ( @@ -105,8 +108,8 @@ func main() { } var ( - ts graph.TripleStore - err error + handle *graph.Handle + err error ) switch cmd { case "version": @@ -123,60 +126,60 @@ func main() { break } if *tripleFile != "" { - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { break } - err = load(ts, cfg, *tripleFile, *tripleType) + err = load(handle.QuadWriter, cfg, *tripleFile, *tripleType) if err != nil { break } - ts.Close() + handle.Close() } case "load": - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { break } - err = load(ts, cfg, *tripleFile, *tripleType) + err = load(handle.QuadWriter, cfg, *tripleFile, *tripleType) if err != nil { break } - ts.Close() + handle.Close() case "repl": - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { break } if !graph.IsPersistent(cfg.DatabaseType) { - err = load(ts, cfg, "", *tripleType) + err = load(handle.QuadWriter, cfg, "", *tripleType) if err != nil { break } } - err = db.Repl(ts, *queryLanguage, cfg) + err = db.Repl(handle, *queryLanguage, cfg) - ts.Close() + handle.Close() case "http": - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { break } if !graph.IsPersistent(cfg.DatabaseType) { - err = load(ts, cfg, "", *tripleType) + err = load(handle.QuadWriter, cfg, "", *tripleType) if err != nil { break } } - http.Serve(ts, cfg) + http.Serve(handle, cfg) - ts.Close() + handle.Close() default: fmt.Println("No command", cmd) @@ -187,7 +190,7 @@ func main() { } } -func load(ts graph.TripleStore, cfg *config.Config, path, typ string) error { +func load(qw graph.QuadWriter, cfg *config.Config, path, typ string) error { var r io.Reader if path == "" { @@ -230,7 +233,7 @@ func load(ts graph.TripleStore, cfg *config.Config, path, typ string) error { return fmt.Errorf("unknown quad format %q", typ) } - return db.Load(ts, cfg, dec) + return db.Load(qw, cfg, dec) } const ( diff --git a/cayley_test.go b/cayley_test.go index d108f85..b0839e7 100644 --- a/cayley_test.go +++ b/cayley_test.go @@ -298,24 +298,25 @@ var m2_actors = movie2.Save("name","movie2").Follow(filmToActor) var ( once sync.Once cfg = &config.Config{ - DatabasePath: "30kmoviedata.nq.gz", - DatabaseType: "memstore", - Timeout: 300 * time.Second, + DatabasePath: "30kmoviedata.nq.gz", + DatabaseType: "memstore", + ReplicationType: "single", + Timeout: 300 * time.Second, } - ts graph.TripleStore + handle *graph.Handle ) func prepare(t testing.TB) { var err error once.Do(func() { - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { t.Fatalf("Failed to open %q: %v", cfg.DatabasePath, err) } if !graph.IsPersistent(cfg.DatabaseType) { - err = load(ts, cfg, "", "cquad") + err = load(handle.QuadWriter, cfg, "", "cquad") if err != nil { t.Fatalf("Failed to load %q: %v", cfg.DatabasePath, err) } @@ -329,7 +330,7 @@ func TestQueries(t *testing.T) { if testing.Short() && test.long { continue } - ses := gremlin.NewSession(ts, cfg.Timeout, true) + ses := gremlin.NewSession(handle.QuadStore, cfg.Timeout, true) _, err := ses.InputParses(test.query) if err != nil { t.Fatalf("Failed to parse benchmark gremlin %s: %v", test.message, err) @@ -374,7 +375,7 @@ func runBench(n int, b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { c := make(chan interface{}, 5) - ses := gremlin.NewSession(ts, cfg.Timeout, true) + ses := gremlin.NewSession(handle.QuadStore, cfg.Timeout, true) // Do the parsing we know works. ses.InputParses(benchmarkQueries[n].query) b.StartTimer() diff --git a/db/db.go b/db/db.go index 10f6898..e5370ee 100644 --- a/db/db.go +++ b/db/db.go @@ -68,19 +68,7 @@ func OpenQuadWriter(qs graph.TripleStore, cfg *config.Config) (graph.QuadWriter, return w, nil } -func Load(ts graph.TripleStore, cfg *config.Config, dec quad.Unmarshaler) error { - bulker, canBulk := ts.(graph.BulkLoader) - if canBulk { - switch err := bulker.BulkLoad(dec); err { - case nil: - return nil - case graph.ErrCannotBulkLoad: - // Try individual loading. - default: - return err - } - } - +func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error { block := make([]quad.Quad, 0, cfg.LoadSize) for { t, err := dec.Unmarshal() @@ -92,11 +80,11 @@ func Load(ts graph.TripleStore, cfg *config.Config, dec quad.Unmarshaler) error } block = append(block, t) if len(block) == cap(block) { - ts.AddTripleSet(block) + qw.AddQuadSet(block) block = block[:0] } } - ts.AddTripleSet(block) + qw.AddQuadSet(block) return nil } diff --git a/db/repl.go b/db/repl.go index 11ecfea..ac36ea4 100644 --- a/db/repl.go +++ b/db/repl.go @@ -70,17 +70,17 @@ const ( history = ".cayley_history" ) -func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) error { +func Repl(h *graph.Handle, queryLanguage string, cfg *config.Config) error { var ses query.Session switch queryLanguage { case "sexp": - ses = sexp.NewSession(ts) + ses = sexp.NewSession(h.QuadStore) case "mql": - ses = mql.NewSession(ts) + ses = mql.NewSession(h.QuadStore) case "gremlin": fallthrough default: - ses = gremlin.NewSession(ts, cfg.Timeout, true) + ses = gremlin.NewSession(h.QuadStore, cfg.Timeout, true) } term, err := terminal(history) @@ -124,25 +124,25 @@ func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) error continue case strings.HasPrefix(line, ":a"): - triple, err := cquads.Parse(line[3:]) - if !triple.IsValid() { + quad, err := cquads.Parse(line[3:]) + if !quad.IsValid() { if err != nil { - fmt.Printf("not a valid triple: %v\n", err) + fmt.Printf("not a valid quad: %v\n", err) } continue } - ts.AddTriple(triple) + h.QuadWriter.AddQuad(quad) continue case strings.HasPrefix(line, ":d"): - triple, err := cquads.Parse(line[3:]) - if !triple.IsValid() { + quad, err := cquads.Parse(line[3:]) + if !quad.IsValid() { if err != nil { - fmt.Printf("not a valid triple: %v\n", err) + fmt.Printf("not a valid quad: %v\n", err) } continue } - ts.RemoveTriple(triple) + h.QuadWriter.RemoveQuad(quad) continue } } diff --git a/graph/iterator/mock_ts_test.go b/graph/iterator/mock_ts_test.go index 08483c7..4202ffa 100644 --- a/graph/iterator/mock_ts_test.go +++ b/graph/iterator/mock_ts_test.go @@ -36,9 +36,7 @@ func (qs *store) ValueOf(s string) graph.Value { return nil } -func (qs *store) AddTriple(quad.Quad) {} - -func (qs *store) AddTripleSet([]quad.Quad) {} +func (qs *store) ApplyDeltas([]*graph.Delta) error { return nil } func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} } @@ -60,6 +58,8 @@ func (qs *store) NameOf(v graph.Value) string { func (qs *store) Size() int64 { return 0 } +func (qs *store) Horizon() int64 { return 0 } + func (qs *store) DebugPrint() {} func (qs *store) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { diff --git a/graph/leveldb/leveldb_test.go b/graph/leveldb/leveldb_test.go index 3c3a057..45a7404 100644 --- a/graph/leveldb/leveldb_test.go +++ b/graph/leveldb/leveldb_test.go @@ -137,7 +137,7 @@ func TestLoadDatabase(t *testing.T) { } w, _ := writer.NewSingleReplication(qs, nil) - qs.AddQuad(quad.Quad{"Something", "points_to", "Something Else", "context"}) + w.AddQuad(quad.Quad{"Something", "points_to", "Something Else", "context"}) for _, pq := range []string{"Something", "points_to", "Something Else", "context"} { if got := qs.NameOf(qs.ValueOf(pq)); got != pq { t.Errorf("Failed to roundtrip %q, got:%q expect:%q", pq, got, pq) diff --git a/graph/quadwriter.go b/graph/quadwriter.go index bc34555..dddc19a 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -48,6 +48,11 @@ type Handle struct { QuadWriter QuadWriter } +func (h *Handle) Close() { + h.QuadStore.Close() + h.QuadWriter.Close() +} + var ErrQuadExists = errors.New("Quad exists") var ErrQuadNotExist = errors.New("Quad doesn't exist") @@ -61,6 +66,9 @@ type QuadWriter interface { // Removes a quad matching the given one from the database, // if it exists. Does nothing otherwise. RemoveQuad(quad.Quad) error + + // Cleans up replication and closes the writing aspect of the database. + Close() error } type NewQuadWriterFunc func(TripleStore, Options) (QuadWriter, error) diff --git a/http/http.go b/http/http.go index 70321dd..d5f9351 100644 --- a/http/http.go +++ b/http/http.go @@ -107,7 +107,7 @@ func (h *TemplateRequestHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques type Api struct { config *config.Config - ts graph.TripleStore + handle *graph.Handle } func (api *Api) ApiV1(r *httprouter.Router) { @@ -119,7 +119,7 @@ func (api *Api) ApiV1(r *httprouter.Router) { r.POST("/api/v1/delete", LogRequest(api.ServeV1Delete)) } -func SetupRoutes(ts graph.TripleStore, cfg *config.Config) { +func SetupRoutes(handle *graph.Handle, cfg *config.Config) { r := httprouter.New() assets := findAssetsPath() if glog.V(2) { @@ -129,7 +129,7 @@ func SetupRoutes(ts graph.TripleStore, cfg *config.Config) { templates.ParseGlob(fmt.Sprint(assets, "/templates/*.html")) root := &TemplateRequestHandler{templates: templates} docs := &DocRequestHandler{assets: assets} - api := &Api{config: cfg, ts: ts} + api := &Api{config: cfg, handle: handle} api.ApiV1(r) //m.Use(martini.Static("static", martini.StaticOptions{Prefix: "/static", SkipLogging: true})) @@ -141,8 +141,8 @@ func SetupRoutes(ts graph.TripleStore, cfg *config.Config) { http.Handle("/", r) } -func Serve(ts graph.TripleStore, cfg *config.Config) { - SetupRoutes(ts, cfg) +func Serve(handle *graph.Handle, cfg *config.Config) { + SetupRoutes(handle, cfg) glog.Infof("Cayley now listening on %s:%s\n", cfg.ListenHost, cfg.ListenPort) fmt.Printf("Cayley now listening on %s:%s\n", cfg.ListenHost, cfg.ListenPort) err := http.ListenAndServe(fmt.Sprintf("%s:%s", cfg.ListenHost, cfg.ListenPort), nil) diff --git a/http/query.go b/http/query.go index f4f5a34..9d6460a 100644 --- a/http/query.go +++ b/http/query.go @@ -71,9 +71,9 @@ func (api *Api) ServeV1Query(w http.ResponseWriter, r *http.Request, params http var ses query.HttpSession switch params.ByName("query_lang") { case "gremlin": - ses = gremlin.NewSession(api.ts, api.config.Timeout, false) + ses = gremlin.NewSession(api.handle.QuadStore, api.config.Timeout, false) case "mql": - ses = mql.NewSession(api.ts) + ses = mql.NewSession(api.handle.QuadStore) default: return FormatJson400(w, "Need a query language.") } @@ -110,18 +110,15 @@ func (api *Api) ServeV1Query(w http.ResponseWriter, r *http.Request, params http ses = nil return FormatJsonError(w, 500, "Incomplete data?") } - http.Error(w, "", http.StatusNotFound) - ses = nil - return http.StatusNotFound } func (api *Api) ServeV1Shape(w http.ResponseWriter, r *http.Request, params httprouter.Params) int { var ses query.HttpSession switch params.ByName("query_lang") { case "gremlin": - ses = gremlin.NewSession(api.ts, api.config.Timeout, false) + ses = gremlin.NewSession(api.handle.QuadStore, api.config.Timeout, false) case "mql": - ses = mql.NewSession(api.ts) + ses = mql.NewSession(api.handle.QuadStore) default: return FormatJson400(w, "Need a query language.") } @@ -146,6 +143,4 @@ func (api *Api) ServeV1Shape(w http.ResponseWriter, r *http.Request, params http default: return FormatJsonError(w, 500, "Incomplete data?") } - http.Error(w, "", http.StatusNotFound) - return http.StatusNotFound } diff --git a/http/write.go b/http/write.go index 54e5537..018b339 100644 --- a/http/write.go +++ b/http/write.go @@ -55,7 +55,7 @@ func (api *Api) ServeV1Write(w http.ResponseWriter, r *http.Request, _ httproute if terr != nil { return FormatJson400(w, terr) } - api.ts.AddTripleSet(tripleList) + api.handle.QuadWriter.AddQuadSet(tripleList) fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", len(tripleList)) return 200 } @@ -97,11 +97,11 @@ func (api *Api) ServeV1WriteNQuad(w http.ResponseWriter, r *http.Request, params block = append(block, t) n++ if len(block) == cap(block) { - api.ts.AddTripleSet(block) + api.handle.QuadWriter.AddQuadSet(block) block = block[:0] } } - api.ts.AddTripleSet(block) + api.handle.QuadWriter.AddQuadSet(block) fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", n) @@ -122,7 +122,7 @@ func (api *Api) ServeV1Delete(w http.ResponseWriter, r *http.Request, params htt } count := 0 for _, triple := range tripleList { - api.ts.RemoveTriple(triple) + api.handle.QuadWriter.RemoveQuad(triple) count++ } fmt.Fprintf(w, "{\"result\": \"Successfully deleted %d triples.\"}", count) diff --git a/query/gremlin/gremlin_test.go b/query/gremlin/gremlin_test.go index a638eb7..566305a 100644 --- a/query/gremlin/gremlin_test.go +++ b/query/gremlin/gremlin_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/cayley/quad" _ "github.com/google/cayley/graph/memstore" + _ "github.com/google/cayley/writer" ) // This is a simple test graph. @@ -54,8 +55,9 @@ var simpleGraph = []quad.Quad{ func makeTestSession(data []quad.Quad) *Session { ts, _ := graph.NewTripleStore("memstore", "", nil) + w, _ := graph.NewQuadWriter("single", ts, nil) for _, t := range data { - ts.AddTriple(t) + w.AddQuad(t) } return NewSession(ts, -1, false) } diff --git a/query/mql/mql_test.go b/query/mql/mql_test.go index 619ae54..d7e8ef4 100644 --- a/query/mql/mql_test.go +++ b/query/mql/mql_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/cayley/graph" _ "github.com/google/cayley/graph/memstore" "github.com/google/cayley/quad" + _ "github.com/google/cayley/writer" ) // This is a simple test graph. @@ -53,8 +54,9 @@ var simpleGraph = []quad.Quad{ func makeTestSession(data []quad.Quad) *Session { ts, _ := graph.NewTripleStore("memstore", "", nil) + w, _ := graph.NewQuadWriter("single", ts, nil) for _, t := range data { - ts.AddTriple(t) + w.AddQuad(t) } return NewSession(ts) } diff --git a/query/sexp/parser_test.go b/query/sexp/parser_test.go index d4d16af..2026a2f 100644 --- a/query/sexp/parser_test.go +++ b/query/sexp/parser_test.go @@ -21,6 +21,7 @@ import ( "github.com/google/cayley/quad" _ "github.com/google/cayley/graph/memstore" + _ "github.com/google/cayley/writer" ) func TestBadParse(t *testing.T) { @@ -55,13 +56,14 @@ var testQueries = []struct { func TestMemstoreBackedSexp(t *testing.T) { ts, _ := graph.NewTripleStore("memstore", "", nil) + w, _ := graph.NewQuadWriter("single", ts, nil) it := BuildIteratorTreeForQuery(ts, "()") if it.Type() != graph.Null { t.Errorf(`Incorrect type for empty query, got:%q expect: "null"`, it.Type()) } for _, test := range testQueries { if test.add.IsValid() { - ts.AddTriple(test.add) + w.AddQuad(test.add) } it := BuildIteratorTreeForQuery(ts, test.query) if it.Type() != test.typ { @@ -79,8 +81,9 @@ func TestMemstoreBackedSexp(t *testing.T) { func TestTreeConstraintParse(t *testing.T) { ts, _ := graph.NewTripleStore("memstore", "", nil) - ts.AddTriple(quad.Quad{"i", "like", "food", ""}) - ts.AddTriple(quad.Quad{"food", "is", "good", ""}) + w, _ := graph.NewQuadWriter("single", ts, nil) + w.AddQuad(quad.Quad{"i", "like", "food", ""}) + w.AddQuad(quad.Quad{"food", "is", "good", ""}) query := "(\"i\"\n" + "(:like\n" + "($a (:is :good))))" @@ -99,8 +102,9 @@ func TestTreeConstraintParse(t *testing.T) { func TestTreeConstraintTagParse(t *testing.T) { ts, _ := graph.NewTripleStore("memstore", "", nil) - ts.AddTriple(quad.Quad{"i", "like", "food", ""}) - ts.AddTriple(quad.Quad{"food", "is", "good", ""}) + w, _ := graph.NewQuadWriter("single", ts, nil) + w.AddQuad(quad.Quad{"i", "like", "food", ""}) + w.AddQuad(quad.Quad{"food", "is", "good", ""}) query := "(\"i\"\n" + "(:like\n" + "($a (:is :good))))" @@ -118,12 +122,13 @@ func TestTreeConstraintTagParse(t *testing.T) { func TestMultipleConstraintParse(t *testing.T) { ts, _ := graph.NewTripleStore("memstore", "", nil) + w, _ := graph.NewQuadWriter("single", ts, nil) for _, tv := range []quad.Quad{ {"i", "like", "food", ""}, {"i", "like", "beer", ""}, {"you", "like", "beer", ""}, } { - ts.AddTriple(tv) + w.AddQuad(tv) } query := `( $a diff --git a/writer/single.go b/writer/single.go index 0dc4882..794f1a8 100644 --- a/writer/single.go +++ b/writer/single.go @@ -83,3 +83,8 @@ func (s *Single) RemoveQuad(q quad.Quad) error { } return s.ts.ApplyDeltas(deltas) } + +func (s *Single) Close() error { + // Nothing to clean up locally. + return nil +}