diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index ddd8c93..35c2120 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -6,10 +6,6 @@ ], "Deps": [ { - "ImportPath": "github.com/pborman/uuid", - "Rev": "ca53cad383cad2479bbba7f7a1a05797ec1386e4" - }, - { "ImportPath": "github.com/badgerodon/peg", "Rev": "9e5f7f4d07ca576562618c23e8abadda278b684f" }, @@ -31,6 +27,15 @@ "Rev": "b59a38004596b696aca7aa2adccfa68760864d86" }, { + "ImportPath": "github.com/lib/pq", + "Comment": "go1.0-cutoff-58-g0dad96c", + "Rev": "0dad96c0b94f8dee039aa40467f767467392a0af" + }, + { + "ImportPath": "github.com/pborman/uuid", + "Rev": "ca53cad383cad2479bbba7f7a1a05797ec1386e4" + }, + { "ImportPath": "github.com/peterh/liner", "Rev": "1bb0d1c1a25ed393d8feb09bab039b2b1b1fbced" }, diff --git a/cmd/cayley/cayley.go b/cmd/cayley/cayley.go index 302d9e7..7f76559 100644 --- a/cmd/cayley/cayley.go +++ b/cmd/cayley/cayley.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "runtime" + "runtime/pprof" "time" "github.com/barakmich/glog" @@ -36,6 +37,7 @@ import ( _ "github.com/google/cayley/graph/leveldb" _ "github.com/google/cayley/graph/memstore" _ "github.com/google/cayley/graph/mongo" + _ "github.com/google/cayley/graph/sql" // Load writer registry _ "github.com/google/cayley/writer" @@ -147,6 +149,15 @@ func main() { os.Args = append(os.Args[:1], os.Args[2:]...) flag.Parse() + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + glog.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + var buildString string if Version != "" { buildString = fmt.Sprint("Cayley ", Version, " built ", BuildDate) diff --git a/graph/iterator.go b/graph/iterator.go index 0f3b76e..c2d46d9 100644 --- a/graph/iterator.go +++ b/graph/iterator.go @@ -67,8 +67,10 @@ func (t *Tagger) Fixed() map[string]Value { } func (t *Tagger) CopyFrom(src Iterator) { - st := src.Tagger() + t.CopyFromTagger(src.Tagger()) +} +func (t *Tagger) CopyFromTagger(st *Tagger) { t.tags = append(t.tags, st.tags...) if t.fixedTags == nil { @@ -331,16 +333,16 @@ func DumpStats(it Iterator) StatsContainer { func ContainsLogIn(it Iterator, val Value) { if glog.V(4) { - glog.V(4).Infof("%s %d CHECK CONTAINS %d", strings.ToUpper(it.Type().String()), it.UID(), val) + glog.V(4).Infof("%s %d CHECK CONTAINS %v", strings.ToUpper(it.Type().String()), it.UID(), val) } } func ContainsLogOut(it Iterator, val Value, good bool) bool { if glog.V(4) { if good { - glog.V(4).Infof("%s %d CHECK CONTAINS %d GOOD", strings.ToUpper(it.Type().String()), it.UID(), val) + glog.V(4).Infof("%s %d CHECK CONTAINS %v GOOD", strings.ToUpper(it.Type().String()), it.UID(), val) } else { - glog.V(4).Infof("%s %d CHECK CONTAINS %d BAD", strings.ToUpper(it.Type().String()), it.UID(), val) + glog.V(4).Infof("%s %d CHECK CONTAINS %v BAD", strings.ToUpper(it.Type().String()), it.UID(), val) } } return good @@ -355,7 +357,7 @@ func NextLogIn(it Iterator) { func NextLogOut(it Iterator, val Value, ok bool) bool { if glog.V(4) { if ok { - glog.V(4).Infof("%s %d NEXT IS %d", strings.ToUpper(it.Type().String()), it.UID(), val) + glog.V(4).Infof("%s %d NEXT IS %v", strings.ToUpper(it.Type().String()), it.UID(), val) } else { glog.V(4).Infof("%s %d NEXT DONE", strings.ToUpper(it.Type().String()), it.UID()) } diff --git a/graph/iterator/and_iterator_optimize.go b/graph/iterator/and_iterator_optimize.go index db841dd..10aa803 100644 --- a/graph/iterator/and_iterator_optimize.go +++ b/graph/iterator/and_iterator_optimize.go @@ -103,6 +103,7 @@ func (it *And) Optimize() (graph.Iterator, bool) { newReplacement, hasOne := it.qs.OptimizeIterator(newAnd) if hasOne { newAnd.Close() + glog.V(3).Infoln(it.UID(), "became", newReplacement.UID(), "from quadstore") return newReplacement, true } } diff --git a/graph/iterator/hasa_iterator.go b/graph/iterator/hasa_iterator.go index 9547c54..32be43b 100644 --- a/graph/iterator/hasa_iterator.go +++ b/graph/iterator/hasa_iterator.go @@ -105,6 +105,14 @@ func (it *HasA) Optimize() (graph.Iterator, bool) { return it.primaryIt, true } } + // Ask the graph.QuadStore if we can be replaced. Often times, this is a great + // optimization opportunity (there's a fixed iterator underneath us, for + // example). + newReplacement, hasOne := it.qs.OptimizeIterator(it) + if hasOne { + it.Close() + return newReplacement, true + } return it, false } diff --git a/graph/iterator/not_iterator.go b/graph/iterator/not_iterator.go index 4d393e8..6813a5d 100644 --- a/graph/iterator/not_iterator.go +++ b/graph/iterator/not_iterator.go @@ -140,6 +140,7 @@ func (it *Not) Optimize() (graph.Iterator, bool) { if optimized { it.primaryIt = optimizedPrimaryIt } + it.primaryIt = NewMaterialize(it.primaryIt) return it, false } diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index f12937a..9fec5fa 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -269,6 +269,9 @@ func (qs *QuadStore) ValueOf(name string) graph.Value { } func (qs *QuadStore) NameOf(id graph.Value) string { + if id == nil { + return "" + } return qs.revIDMap[id.(int64)] } diff --git a/graph/sql/all_iterator.go b/graph/sql/all_iterator.go new file mode 100644 index 0000000..c91c65d --- /dev/null +++ b/graph/sql/all_iterator.go @@ -0,0 +1,215 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "database/sql" + + "github.com/barakmich/glog" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +type AllIterator struct { + uid uint64 + tags graph.Tagger + qs *QuadStore + dir quad.Direction + val graph.Value + table string + cursor *sql.Rows + result graph.Value + err error +} + +func (it *AllIterator) makeCursor() { + var cursor *sql.Rows + var err error + if it.cursor != nil { + it.cursor.Close() + } + if it.table == "quads" { + cursor, err = it.qs.db.Query(`SELECT subject, predicate, object, label FROM quads;`) + if err != nil { + glog.Errorln("Couldn't get cursor from SQL database: %v", err) + cursor = nil + } + } else { + glog.V(4).Infoln("sql: getting node query") + cursor, err = it.qs.db.Query(`SELECT node FROM + ( + SELECT subject FROM quads + UNION + SELECT predicate FROM quads + UNION + SELECT object FROM quads + UNION + SELECT label FROM quads + ) AS DistinctNodes (node) WHERE node IS NOT NULL;`) + if err != nil { + glog.Errorln("Couldn't get cursor from SQL database: %v", err) + cursor = nil + } + glog.V(4).Infoln("sql: got node query") + } + it.cursor = cursor +} + +func NewAllIterator(qs *QuadStore, table string) *AllIterator { + it := &AllIterator{ + uid: iterator.NextUID(), + qs: qs, + table: table, + } + return it +} + +func (it *AllIterator) UID() uint64 { + return it.uid +} + +func (it *AllIterator) Reset() { + it.err = nil + it.Close() +} + +func (it *AllIterator) Err() error { + return it.err +} + +func (it *AllIterator) Close() error { + if it.cursor != nil { + err := it.cursor.Close() + if err != nil { + return err + } + it.cursor = nil + } + return nil +} + +func (it *AllIterator) Tagger() *graph.Tagger { + return &it.tags +} + +func (it *AllIterator) TagResults(dst map[string]graph.Value) { + for _, tag := range it.tags.Tags() { + dst[tag] = it.Result() + } + + for tag, value := range it.tags.Fixed() { + dst[tag] = value + } +} + +func (it *AllIterator) Clone() graph.Iterator { + var m *AllIterator + m = NewAllIterator(it.qs, it.table) + m.tags.CopyFrom(it) + return m +} + +func (it *AllIterator) SubIterators() []graph.Iterator { + return nil +} + +func (it *AllIterator) Next() bool { + graph.NextLogIn(it) + if it.cursor == nil { + it.makeCursor() + if it.cursor == nil { + return false + } + } + if !it.cursor.Next() { + glog.V(4).Infoln("sql: No next") + err := it.cursor.Err() + if err != nil { + glog.Errorf("Cursor error in SQL: %v", err) + it.err = err + } + it.cursor.Close() + return false + } + if it.table == "nodes" { + var node string + err := it.cursor.Scan(&node) + if err != nil { + glog.Errorf("Error nexting node iterator: %v", err) + it.err = err + return false + } + it.result = node + return true + } + var q quad.Quad + err := it.cursor.Scan(&q.Subject, &q.Predicate, &q.Object, &q.Label) + if err != nil { + glog.Errorf("Error scanning sql iterator: %v", err) + it.err = err + return false + } + it.result = q + return graph.NextLogOut(it, it.result, true) +} + +func (it *AllIterator) Contains(v graph.Value) bool { + graph.ContainsLogIn(it, v) + it.result = v + return graph.ContainsLogOut(it, v, true) +} + +func (it *AllIterator) Size() (int64, bool) { + return it.qs.Size(), true +} + +func (it *AllIterator) Result() graph.Value { + if it.result == nil { + glog.Fatalln("result was nil", it) + } + return it.result +} + +func (it *AllIterator) NextPath() bool { + return false +} + +func (it *AllIterator) Type() graph.Type { + return graph.All +} + +func (it *AllIterator) Sorted() bool { return false } +func (it *AllIterator) Optimize() (graph.Iterator, bool) { return it, false } + +func (it *AllIterator) Describe() graph.Description { + size, _ := it.Size() + return graph.Description{ + UID: it.UID(), + Name: "sql/all", + Type: it.Type(), + Size: size, + } +} + +func (it *AllIterator) Stats() graph.IteratorStats { + size, _ := it.Size() + return graph.IteratorStats{ + ContainsCost: 1, + NextCost: 9999, + Size: size, + } +} diff --git a/graph/sql/lru.go b/graph/sql/lru.go new file mode 100644 index 0000000..e3aca7f --- /dev/null +++ b/graph/sql/lru.go @@ -0,0 +1,63 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "container/list" +) + +// cache implements an LRU cache. +type cache struct { + cache map[string]*list.Element + priority *list.List + maxSize int +} + +type kv struct { + key string + value int64 +} + +func newCache(size int) *cache { + var lru cache + lru.maxSize = size + lru.priority = list.New() + lru.cache = make(map[string]*list.Element) + return &lru +} + +func (lru *cache) Put(key string, value int64) { + if _, ok := lru.Get(key); ok { + return + } + if len(lru.cache) == lru.maxSize { + lru.removeOldest() + } + lru.priority.PushFront(kv{key: key, value: value}) + lru.cache[key] = lru.priority.Front() +} + +func (lru *cache) Get(key string) (int64, bool) { + if element, ok := lru.cache[key]; ok { + lru.priority.MoveToFront(element) + return element.Value.(kv).value, true + } + return 0, false +} + +func (lru *cache) removeOldest() { + last := lru.priority.Remove(lru.priority.Back()) + delete(lru.cache, last.(kv).key) +} diff --git a/graph/sql/optimizers.go b/graph/sql/optimizers.go new file mode 100644 index 0000000..538ddd7 --- /dev/null +++ b/graph/sql/optimizers.go @@ -0,0 +1,261 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "errors" + + "github.com/barakmich/glog" + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +func intersect(a sqlIterator, b sqlIterator, qs *QuadStore) (*SQLIterator, error) { + if anew, ok := a.(*SQLNodeIterator); ok { + if bnew, ok := b.(*SQLNodeIterator); ok { + return intersectNode(anew, bnew, qs) + } + if bnew, ok := b.(*SQLNodeIntersection); ok { + return appendNodeIntersection(bnew, anew, qs) + } + } else if anew, ok := a.(*SQLNodeIntersection); ok { + if bnew, ok := b.(*SQLNodeIterator); ok { + return appendNodeIntersection(anew, bnew, qs) + } + if bnew, ok := b.(*SQLNodeIntersection); ok { + return combineNodeIntersection(anew, bnew, qs) + } + } else if anew, ok := a.(*SQLLinkIterator); ok { + if bnew, ok := b.(*SQLLinkIterator); ok { + return intersectLink(anew, bnew, qs) + } + + } else { + return nil, errors.New("Unknown iterator types") + } + return nil, errors.New("Cannot combine SQL iterators of two different types") +} + +func intersectNode(a *SQLNodeIterator, b *SQLNodeIterator, qs *QuadStore) (*SQLIterator, error) { + m := &SQLNodeIntersection{ + tableName: newTableName(), + nodeIts: []sqlIterator{a, b}, + } + m.Tagger().CopyFromTagger(a.Tagger()) + m.Tagger().CopyFromTagger(b.Tagger()) + it := NewSQLIterator(qs, m) + return it, nil +} + +func appendNodeIntersection(a *SQLNodeIntersection, b *SQLNodeIterator, qs *QuadStore) (*SQLIterator, error) { + m := &SQLNodeIntersection{ + tableName: newTableName(), + nodeIts: append(a.nodeIts, b), + } + m.Tagger().CopyFromTagger(a.Tagger()) + m.Tagger().CopyFromTagger(b.Tagger()) + it := NewSQLIterator(qs, m) + return it, nil +} + +func combineNodeIntersection(a *SQLNodeIntersection, b *SQLNodeIntersection, qs *QuadStore) (*SQLIterator, error) { + m := &SQLNodeIntersection{ + tableName: newTableName(), + nodeIts: append(a.nodeIts, b.nodeIts...), + } + m.Tagger().CopyFromTagger(a.Tagger()) + m.Tagger().CopyFromTagger(b.Tagger()) + it := NewSQLIterator(qs, m) + return it, nil +} + +func intersectLink(a *SQLLinkIterator, b *SQLLinkIterator, qs *QuadStore) (*SQLIterator, error) { + m := &SQLLinkIterator{ + tableName: newTableName(), + nodeIts: append(a.nodeIts, b.nodeIts...), + constraints: append(a.constraints, b.constraints...), + tagdirs: append(a.tagdirs, b.tagdirs...), + } + m.Tagger().CopyFromTagger(a.Tagger()) + m.Tagger().CopyFromTagger(b.Tagger()) + it := NewSQLIterator(qs, m) + return it, nil +} + +func hasa(aIn sqlIterator, d quad.Direction, qs *QuadStore) (*SQLIterator, error) { + a, ok := aIn.(*SQLLinkIterator) + if !ok { + return nil, errors.New("Can't take the HASA of a link SQL iterator") + } + + out := &SQLNodeIterator{ + tableName: newTableName(), + linkIt: sqlItDir{ + it: a, + dir: d, + }, + } + it := NewSQLIterator(qs, out) + return it, nil +} + +func linksto(aIn sqlIterator, d quad.Direction, qs *QuadStore) (*SQLIterator, error) { + var a sqlIterator + a, ok := aIn.(*SQLNodeIterator) + if !ok { + a, ok = aIn.(*SQLNodeIntersection) + if !ok { + return nil, errors.New("Can't take the LINKSTO of a node SQL iterator") + } + } + + out := &SQLLinkIterator{ + tableName: newTableName(), + nodeIts: []sqlItDir{ + sqlItDir{ + it: a, + dir: d, + }, + }, + } + it := NewSQLIterator(qs, out) + return it, nil +} + +func (qs *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { + switch it.Type() { + case graph.LinksTo: + return qs.optimizeLinksTo(it.(*iterator.LinksTo)) + case graph.HasA: + return qs.optimizeHasA(it.(*iterator.HasA)) + case graph.And: + return qs.optimizeAnd(it.(*iterator.And)) + } + return it, false +} + +func (qs *QuadStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { + subs := it.SubIterators() + if len(subs) != 1 { + return it, false + } + primary := subs[0] + switch primary.Type() { + case graph.Fixed: + size, _ := primary.Size() + if size == 1 { + if !graph.Next(primary) { + panic("unexpected size during optimize") + } + val := primary.Result() + newIt := qs.QuadIterator(it.Direction(), val) + nt := newIt.Tagger() + nt.CopyFrom(it) + for _, tag := range primary.Tagger().Tags() { + nt.AddFixed(tag, val) + } + it.Close() + return newIt, true + } + case sqlType: + p := primary.(*SQLIterator) + newit, err := linksto(p.sql, it.Direction(), qs) + if err != nil { + glog.Errorln(err) + return it, false + } + newit.Tagger().CopyFrom(it) + return newit, true + case graph.All: + linkit := &SQLLinkIterator{ + tableName: newTableName(), + size: qs.Size(), + } + for _, t := range primary.Tagger().Tags() { + linkit.tagdirs = append(linkit.tagdirs, tagDir{ + dir: it.Direction(), + tag: t, + }) + } + for k, v := range primary.Tagger().Fixed() { + linkit.tagger.AddFixed(k, v) + } + linkit.tagger.CopyFrom(it) + newit := NewSQLIterator(qs, linkit) + return newit, true + } + return it, false +} + +func (qs *QuadStore) optimizeAnd(it *iterator.And) (graph.Iterator, bool) { + subs := it.SubIterators() + var unusedIts []graph.Iterator + var newit *SQLIterator + newit = nil + changed := false + var err error + + for _, it := range subs { + if it.Type() == sqlType { + if newit == nil { + newit = it.(*SQLIterator) + } else { + changed = true + newit, err = intersect(newit.sql, it.(*SQLIterator).sql, qs) + if err != nil { + glog.Error(err) + return it, false + } + } + } else { + unusedIts = append(unusedIts, it) + } + } + + if !changed { + return it, false + } + if len(unusedIts) == 0 { + newit.Tagger().CopyFrom(it) + return newit, true + } + newAnd := iterator.NewAnd(qs) + newAnd.Tagger().CopyFrom(it) + newAnd.AddSubIterator(newit) + for _, i := range unusedIts { + newAnd.AddSubIterator(i) + } + return newAnd.Optimize() +} + +func (qs *QuadStore) optimizeHasA(it *iterator.HasA) (graph.Iterator, bool) { + subs := it.SubIterators() + if len(subs) != 1 { + return it, false + } + primary := subs[0] + if primary.Type() == sqlType { + p := primary.(*SQLIterator) + newit, err := hasa(p.sql, it.Direction(), qs) + if err != nil { + glog.Errorln(err) + return it, false + } + newit.Tagger().CopyFrom(it) + return newit, true + } + return it, false +} diff --git a/graph/sql/optimizers_test.go b/graph/sql/optimizers_test.go new file mode 100644 index 0000000..8903bb7 --- /dev/null +++ b/graph/sql/optimizers_test.go @@ -0,0 +1,128 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "testing" + + "github.com/google/cayley/graph" + "github.com/google/cayley/quad" +) + +func TestBuildIntersect(t *testing.T) { + a := NewSQLLinkIterator(nil, quad.Subject, "Foo") + b := NewSQLLinkIterator(nil, quad.Predicate, "is_equivalent_to") + it, err := intersect(a.sql, b.sql, nil) + if err != nil { + t.Error(err) + } + s, v := it.sql.buildSQL(true, nil) + t.Log(s, v) +} + +func TestBuildHasa(t *testing.T) { + a := NewSQLLinkIterator(nil, quad.Subject, "Foo") + a.Tagger().Add("foo") + b := NewSQLLinkIterator(nil, quad.Predicate, "is_equivalent_to") + it1, err := intersect(a.sql, b.sql, nil) + if err != nil { + t.Error(err) + } + it2, err := hasa(it1.sql, quad.Object, nil) + if err != nil { + t.Error(err) + } + s, v := it2.sql.buildSQL(true, nil) + t.Log(s, v) +} + +func TestBuildLinksTo(t *testing.T) { + a := NewSQLLinkIterator(nil, quad.Subject, "Foo") + b := NewSQLLinkIterator(nil, quad.Predicate, "is_equivalent_to") + it1, err := intersect(a.sql, b.sql, nil) + if err != nil { + t.Error(err) + } + it2, err := hasa(it1.sql, quad.Object, nil) + it2.Tagger().Add("foo") + if err != nil { + t.Error(err) + } + it3, err := linksto(it2.sql, quad.Subject, nil) + if err != nil { + t.Error(err) + } + s, v := it3.sql.buildSQL(true, nil) + t.Log(s, v) +} + +func TestInterestingQuery(t *testing.T) { + if *postgres_path == "" { + t.SkipNow() + } + db, err := newQuadStore(*postgres_path, nil) + if err != nil { + t.Fatal(err) + } + qs := db.(*QuadStore) + a := NewSQLLinkIterator(qs, quad.Object, "Humphrey Bogart") + b := NewSQLLinkIterator(qs, quad.Predicate, "name") + it1, err := intersect(a.sql, b.sql, qs) + if err != nil { + t.Error(err) + } + it2, err := hasa(it1.sql, quad.Subject, qs) + if err != nil { + t.Error(err) + } + it2.Tagger().Add("hb") + it3, err := linksto(it2.sql, quad.Object, qs) + if err != nil { + t.Error(err) + } + b = NewSQLLinkIterator(db.(*QuadStore), quad.Predicate, "/film/performance/actor") + it4, err := intersect(it3.sql, b.sql, qs) + if err != nil { + t.Error(err) + } + it5, err := hasa(it4.sql, quad.Subject, qs) + if err != nil { + t.Error(err) + } + it6, err := linksto(it5.sql, quad.Object, qs) + if err != nil { + t.Error(err) + } + b = NewSQLLinkIterator(db.(*QuadStore), quad.Predicate, "/film/film/starring") + it7, err := intersect(it6.sql, b.sql, qs) + if err != nil { + t.Error(err) + } + it8, err := hasa(it7.sql, quad.Subject, qs) + if err != nil { + t.Error(err) + } + s, v := it8.sql.buildSQL(true, nil) + it8.Tagger().Add("id") + t.Log(s, v) + for graph.Next(it8) { + t.Log(it8.Result()) + out := make(map[string]graph.Value) + it8.TagResults(out) + for k, v := range out { + t.Log("%s: %v\n", k, v.(string)) + } + } +} diff --git a/graph/sql/quadstore.go b/graph/sql/quadstore.go new file mode 100644 index 0000000..b8ace88 --- /dev/null +++ b/graph/sql/quadstore.go @@ -0,0 +1,341 @@ +package sql + +import ( + "crypto/sha1" + "database/sql" + "encoding/hex" + "errors" + "fmt" + "hash" + "sync" + + "github.com/lib/pq" + + "github.com/barakmich/glog" + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +const QuadStoreType = "sql" + +func init() { + graph.RegisterQuadStore(QuadStoreType, true, newQuadStore, createSQLTables, nil) +} + +var ( + hashPool = sync.Pool{ + New: func() interface{} { return sha1.New() }, + } + hashSize = sha1.Size +) + +type QuadStore struct { + db *sql.DB + sqlFlavor string + size int64 + lru *cache + noSizes bool +} + +func connectSQLTables(addr string, _ graph.Options) (*sql.DB, error) { + // TODO(barakmich): Parse options for more friendly addr, other SQLs. + conn, err := sql.Open("postgres", addr) + if err != nil { + glog.Errorf("Couldn't open database at %s: %#v", addr, err) + return nil, err + } + // "Open may just validate its arguments without creating a connection to the database." + // "To verify that the data source name is valid, call Ping." + // Source: http://golang.org/pkg/database/sql/#Open + if err := conn.Ping(); err != nil { + glog.Errorf("Couldn't open database at %s: %#v", addr, err) + return nil, err + } + return conn, nil +} + +func createSQLTables(addr string, options graph.Options) error { + conn, err := connectSQLTables(addr, options) + if err != nil { + return err + } + tx, err := conn.Begin() + if err != nil { + glog.Errorf("Couldn't begin creation transaction: %s", err) + return err + } + + quadTable, err := tx.Exec(` + CREATE TABLE quads ( + subject TEXT NOT NULL, + predicate TEXT NOT NULL, + object TEXT NOT NULL, + label TEXT, + horizon BIGSERIAL PRIMARY KEY, + id BIGINT, + ts timestamp, + subject_hash TEXT NOT NULL, + predicate_hash TEXT NOT NULL, + object_hash TEXT NOT NULL, + label_hash TEXT, + UNIQUE(subject_hash, predicate_hash, object_hash, label_hash) + );`) + if err != nil { + glog.Errorf("Cannot create quad table: %v", quadTable) + return err + } + factor, factorOk, err := options.IntKey("db_fill_factor") + if !factorOk { + factor = 50 + } + var index sql.Result + + index, err = tx.Exec(fmt.Sprintf(` + CREATE INDEX spo_index ON quads (subject_hash) WITH (FILLFACTOR = %d); + CREATE INDEX pos_index ON quads (predicate_hash) WITH (FILLFACTOR = %d); + CREATE INDEX osp_index ON quads (object_hash) WITH (FILLFACTOR = %d); + `, factor, factor, factor)) + if err != nil { + glog.Errorf("Cannot create indices: %v", index) + return err + } + tx.Commit() + return nil +} + +func newQuadStore(addr string, options graph.Options) (graph.QuadStore, error) { + var qs QuadStore + conn, err := connectSQLTables(addr, options) + if err != nil { + return nil, err + } + localOpt, localOptOk, err := options.BoolKey("local_optimize") + if err != nil { + return nil, err + } + qs.db = conn + qs.sqlFlavor = "postgres" + qs.size = -1 + qs.lru = newCache(1024) + + // Skip size checking by default. + qs.noSizes = true + if localOptOk { + if localOpt { + qs.noSizes = false + } + } + return &qs, nil +} + +func hashOf(s string) string { + h := hashPool.Get().(hash.Hash) + h.Reset() + defer hashPool.Put(h) + key := make([]byte, 0, hashSize) + h.Write([]byte(s)) + key = h.Sum(key) + return hex.EncodeToString(key) +} + +func (qs *QuadStore) copyFrom(tx *sql.Tx, in []graph.Delta) error { + stmt, err := tx.Prepare(pq.CopyIn("quads", "subject", "predicate", "object", "label", "id", "ts", "subject_hash", "predicate_hash", "object_hash", "label_hash")) + if err != nil { + return err + } + for _, d := range in { + _, err := stmt.Exec( + d.Quad.Subject, + d.Quad.Predicate, + d.Quad.Object, + d.Quad.Label, + d.ID.Int(), + d.Timestamp, + hashOf(d.Quad.Subject), + hashOf(d.Quad.Predicate), + hashOf(d.Quad.Object), + hashOf(d.Quad.Label), + ) + if err != nil { + glog.Errorf("couldn't prepare COPY statement: %v", err) + return err + } + } + _, err = stmt.Exec() + if err != nil { + return err + } + return stmt.Close() +} + +func (qs *QuadStore) runTxPostgres(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error { + allAdds := true + for _, d := range in { + if d.Action != graph.Add { + allAdds = false + } + } + if allAdds { + return qs.copyFrom(tx, in) + } + + insert, err := tx.Prepare(`INSERT INTO quads(subject, predicate, object, label, id, ts, subject_hash, predicate_hash, object_hash, label_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`) + defer insert.Close() + if err != nil { + glog.Errorf("Cannot prepare insert statement: %v", err) + return err + } + for _, d := range in { + switch d.Action { + case graph.Add: + _, err := insert.Exec( + d.Quad.Subject, + d.Quad.Predicate, + d.Quad.Object, + d.Quad.Label, + d.ID.Int(), + d.Timestamp, + hashOf(d.Quad.Subject), + hashOf(d.Quad.Predicate), + hashOf(d.Quad.Object), + hashOf(d.Quad.Label), + ) + if err != nil { + glog.Errorf("couldn't prepare INSERT statement: %v", err) + return err + } + case graph.Delete: + result, err := tx.Exec(`DELETE FROM quads WHERE subject=$1 and predicate=$2 and object=$3 and label=$4;`, + d.Quad.Subject, d.Quad.Predicate, d.Quad.Object, d.Quad.Label) + if err != nil { + glog.Errorf("couldn't exec DELETE statement: %v", err) + } + affected, err := result.RowsAffected() + if err != nil { + glog.Errorf("couldn't get DELETE RowsAffected: %v", err) + } + if affected != 1 && !opts.IgnoreMissing { + return errors.New("deleting non-existent triple; rolling back") + } + default: + panic("unknown action") + } + } + return nil +} + +func (qs *QuadStore) ApplyDeltas(in []graph.Delta, opts graph.IgnoreOpts) error { + // TODO(barakmich): Support more ignoreOpts? "ON CONFLICT IGNORE" + tx, err := qs.db.Begin() + if err != nil { + glog.Errorf("couldn't begin write transaction: %v", err) + return err + } + switch qs.sqlFlavor { + case "postgres": + err = qs.runTxPostgres(tx, in, opts) + if err != nil { + tx.Rollback() + return err + } + default: + panic("no support for flavor: " + qs.sqlFlavor) + } + return tx.Commit() +} + +func (qs *QuadStore) Quad(val graph.Value) quad.Quad { + return val.(quad.Quad) +} + +func (qs *QuadStore) QuadIterator(d quad.Direction, val graph.Value) graph.Iterator { + return NewSQLLinkIterator(qs, d, val.(string)) +} + +func (qs *QuadStore) NodesAllIterator() graph.Iterator { + return NewAllIterator(qs, "nodes") +} + +func (qs *QuadStore) QuadsAllIterator() graph.Iterator { + return NewAllIterator(qs, "quads") +} + +func (qs *QuadStore) ValueOf(s string) graph.Value { + return s +} + +func (qs *QuadStore) NameOf(v graph.Value) string { + if v == nil { + glog.V(2).Info("NameOf was nil") + return "" + } + return v.(string) +} + +func (qs *QuadStore) Size() int64 { + // TODO(barakmich): Sync size with writes. + if qs.size != -1 { + return qs.size + } + c := qs.db.QueryRow("SELECT COUNT(*) FROM quads;") + err := c.Scan(&qs.size) + if err != nil { + glog.Errorf("Couldn't execute COUNT: %v", err) + return 0 + } + return qs.size +} + +func (qs *QuadStore) Horizon() graph.PrimaryKey { + var horizon int64 + err := qs.db.QueryRow("SELECT horizon FROM quads ORDER BY horizon DESC LIMIT 1;").Scan(&horizon) + if err != nil { + glog.Errorf("Couldn't execute horizon: %v", err) + return graph.NewSequentialKey(0) + } + return graph.NewSequentialKey(horizon) +} + +func (qs *QuadStore) FixedIterator() graph.FixedIterator { + return iterator.NewFixed(iterator.Identity) +} + +func (qs *QuadStore) Close() { + qs.db.Close() +} + +func (qs *QuadStore) QuadDirection(in graph.Value, d quad.Direction) graph.Value { + q := in.(quad.Quad) + return q.Get(d) +} + +func (qs *QuadStore) Type() string { + return QuadStoreType +} + +func (qs *QuadStore) sizeForIterator(isAll bool, dir quad.Direction, val string) int64 { + var err error + if isAll { + return qs.Size() + } + if qs.noSizes { + if dir == quad.Predicate { + return (qs.Size() / 100) + 1 + } + return (qs.Size() / 1000) + 1 + } + if val, ok := qs.lru.Get(val + string(dir.Prefix())); ok { + return val + } + var size int64 + glog.V(4).Infoln("sql: getting size for select %s, %s", dir.String(), val) + err = qs.db.QueryRow( + fmt.Sprintf("SELECT count(*) FROM quads WHERE %s_hash = $1;", dir.String()), hashOf(val)).Scan(&size) + if err != nil { + glog.Errorln("Error getting size from SQL database: %v", err) + return 0 + } + qs.lru.Put(val+string(dir.Prefix()), size) + return size +} diff --git a/graph/sql/sql_iterator.go b/graph/sql/sql_iterator.go new file mode 100644 index 0000000..79df549 --- /dev/null +++ b/graph/sql/sql_iterator.go @@ -0,0 +1,348 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "database/sql" + "fmt" + "strings" + + "github.com/barakmich/glog" + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +var sqlType graph.Type + +func init() { + sqlType = graph.RegisterIterator("sql") +} + +type SQLIterator struct { + uid uint64 + qs *QuadStore + cursor *sql.Rows + err error + + sql sqlIterator + + result map[string]string + resultIndex int + resultList [][]string + resultNext [][]string + cols []string +} + +func (it *SQLIterator) Clone() graph.Iterator { + m := &SQLIterator{ + uid: iterator.NextUID(), + qs: it.qs, + sql: it.sql.sqlClone(), + } + return m +} + +func (it *SQLIterator) UID() uint64 { + return it.uid +} + +func (it *SQLIterator) Reset() { + it.err = nil + it.Close() +} + +func (it *SQLIterator) Err() error { + return it.err +} + +func (it *SQLIterator) Close() error { + if it.cursor != nil { + err := it.cursor.Close() + if err != nil { + return err + } + it.cursor = nil + } + return nil +} + +func (it *SQLIterator) Tagger() *graph.Tagger { + return it.sql.Tagger() +} + +func (it *SQLIterator) Result() graph.Value { + return it.sql.Result() +} + +func (it *SQLIterator) TagResults(dst map[string]graph.Value) { + for tag, value := range it.result { + if tag == "__execd" { + for _, tag := range it.Tagger().Tags() { + dst[tag] = value + } + continue + } + dst[tag] = value + } + + for tag, value := range it.Tagger().Fixed() { + dst[tag] = value + } +} + +func (it *SQLIterator) Type() graph.Type { + return sqlType +} + +func (it *SQLIterator) SubIterators() []graph.Iterator { + return nil +} + +func (it *SQLIterator) Sorted() bool { return false } +func (it *SQLIterator) Optimize() (graph.Iterator, bool) { return it, false } + +func (it *SQLIterator) Size() (int64, bool) { + return it.sql.Size(it.qs) +} + +func (it *SQLIterator) Describe() graph.Description { + size, _ := it.Size() + return graph.Description{ + UID: it.UID(), + Name: it.sql.Describe(), + Type: it.Type(), + Size: size, + } +} + +func (it *SQLIterator) Stats() graph.IteratorStats { + size, _ := it.Size() + return graph.IteratorStats{ + ContainsCost: 1, + NextCost: 5, + Size: size, + } +} + +func (it *SQLIterator) NextPath() bool { + it.resultIndex += 1 + if it.resultIndex >= len(it.resultList) { + return false + } + it.buildResult(it.resultIndex) + return true +} + +func (it *SQLIterator) Next() bool { + var err error + graph.NextLogIn(it) + if it.cursor == nil { + err = it.makeCursor(true, nil) + if err != nil { + glog.Errorf("Couldn't make query: %v", err) + it.err = err + return false + } + it.cols, err = it.cursor.Columns() + if err != nil { + glog.Errorf("Couldn't get columns") + it.err = err + it.cursor.Close() + return false + } + // iterate the first one + if !it.cursor.Next() { + glog.V(4).Infoln("sql: No next") + err := it.cursor.Err() + if err != nil { + glog.Errorf("Cursor error in SQL: %v", err) + it.err = err + } + it.cursor.Close() + return false + } + s, err := scan(it.cursor, len(it.cols)) + if err != nil { + it.err = err + it.cursor.Close() + return false + } + it.resultNext = append(it.resultNext, s) + } + if it.resultList != nil && it.resultNext == nil { + // We're on something and there's no next + return false + } + it.resultList = it.resultNext + it.resultNext = nil + it.resultIndex = 0 + for { + if !it.cursor.Next() { + glog.V(4).Infoln("sql: No next") + err := it.cursor.Err() + if err != nil { + glog.Errorf("Cursor error in SQL: %v", err) + it.err = err + } + it.cursor.Close() + break + } + s, err := scan(it.cursor, len(it.cols)) + if err != nil { + it.err = err + it.cursor.Close() + return false + } + + if it.sql.sameTopResult(it.resultList[0], s) { + it.resultList = append(it.resultList, s) + } else { + it.resultNext = append(it.resultNext, s) + break + } + } + + if len(it.resultList) == 0 { + return graph.NextLogOut(it, nil, false) + } + it.buildResult(0) + return graph.NextLogOut(it, it.Result(), true) +} + +func (it *SQLIterator) Contains(v graph.Value) bool { + var err error + if ok, res := it.sql.quickContains(v); ok { + return res + } + err = it.makeCursor(false, v) + if err != nil { + glog.Errorf("Couldn't make query: %v", err) + it.err = err + if it.cursor != nil { + it.cursor.Close() + } + return false + } + it.cols, err = it.cursor.Columns() + if err != nil { + glog.Errorf("Couldn't get columns") + it.err = err + it.cursor.Close() + return false + } + it.resultList = nil + for { + if !it.cursor.Next() { + glog.V(4).Infoln("sql: No next") + err := it.cursor.Err() + if err != nil { + glog.Errorf("Cursor error in SQL: %v", err) + it.err = err + } + it.cursor.Close() + break + } + s, err := scan(it.cursor, len(it.cols)) + if err != nil { + it.err = err + it.cursor.Close() + return false + } + it.resultList = append(it.resultList, s) + } + it.cursor.Close() + it.cursor = nil + if len(it.resultList) != 0 { + it.resultIndex = 0 + it.buildResult(0) + return true + } + return false +} + +func scan(cursor *sql.Rows, nCols int) ([]string, error) { + pointers := make([]interface{}, nCols) + container := make([]string, nCols) + for i, _ := range pointers { + pointers[i] = &container[i] + } + err := cursor.Scan(pointers...) + if err != nil { + glog.Errorf("Error scanning iterator: %v", err) + return nil, err + } + return container, nil +} + +func (it *SQLIterator) buildResult(i int) { + it.result = it.sql.buildResult(it.resultList[i], it.cols) +} + +func (it *SQLIterator) makeCursor(next bool, value graph.Value) error { + if it.cursor != nil { + it.cursor.Close() + } + var q string + var values []string + q, values = it.sql.buildSQL(next, value) + q = convertToPostgres(q, values) + ivalues := make([]interface{}, 0, len(values)) + for _, v := range values { + ivalues = append(ivalues, v) + } + cursor, err := it.qs.db.Query(q, ivalues...) + if err != nil { + glog.Errorf("Couldn't get cursor from SQL database: %v", err) + cursor = nil + return err + } + it.cursor = cursor + return nil +} + +func convertToPostgres(query string, values []string) string { + for i := 1; i <= len(values); i++ { + query = strings.Replace(query, "?", fmt.Sprintf("$%d", i), 1) + } + return query +} + +func NewSQLLinkIterator(qs *QuadStore, d quad.Direction, val string) *SQLIterator { + l := &SQLIterator{ + uid: iterator.NextUID(), + qs: qs, + sql: &SQLLinkIterator{ + constraints: []constraint{ + constraint{ + dir: d, + vals: []string{val}, + }, + }, + tableName: newTableName(), + size: 0, + }, + } + return l +} + +func NewSQLIterator(qs *QuadStore, sql sqlIterator) *SQLIterator { + l := &SQLIterator{ + uid: iterator.NextUID(), + qs: qs, + sql: sql, + } + return l +} diff --git a/graph/sql/sql_link_iterator.go b/graph/sql/sql_link_iterator.go new file mode 100644 index 0000000..f2750c3 --- /dev/null +++ b/graph/sql/sql_link_iterator.go @@ -0,0 +1,310 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "fmt" + "strings" + "sync/atomic" + + "github.com/barakmich/glog" + "github.com/google/cayley/graph" + "github.com/google/cayley/quad" +) + +var sqlTableID uint64 + +func init() { + atomic.StoreUint64(&sqlTableID, 0) +} + +func newTableName() string { + id := atomic.AddUint64(&sqlTableID, 1) + return fmt.Sprintf("t_%d", id) +} + +type constraint struct { + dir quad.Direction + vals []string +} + +type tagDir struct { + tag string + dir quad.Direction + table string + justLocal bool +} + +func (t tagDir) String() string { + if t.dir == quad.Any { + if t.justLocal { + return fmt.Sprintf("%s.__execd as \"%s\", %s.__execd_hash as %s_hash", t.table, t.tag, t.table, t.tag) + } + return fmt.Sprintf("%s.\"%s\" as \"%s\", %s.%s_hash as %s_hash", t.table, t.tag, t.tag, t.table, t.tag, t.tag) + } + return fmt.Sprintf("%s.%s as \"%s\", %s.%s_hash as %s_hash", t.table, t.dir, t.tag, t.table, t.dir, t.tag) +} + +type tableDef struct { + table string + name string + values []string +} + +type sqlItDir struct { + dir quad.Direction + it sqlIterator +} + +type sqlIterator interface { + sqlClone() sqlIterator + + buildSQL(next bool, val graph.Value) (string, []string) + getTables() []tableDef + getTags() []tagDir + buildWhere() (string, []string) + tableID() tagDir + + quickContains(graph.Value) (ok bool, result bool) + buildResult(result []string, cols []string) map[string]string + sameTopResult(target []string, test []string) bool + + Result() graph.Value + Size(*QuadStore) (int64, bool) + Describe() string + Type() sqlQueryType + Tagger() *graph.Tagger +} + +type SQLLinkIterator struct { + tagger graph.Tagger + + nodeIts []sqlItDir + constraints []constraint + tableName string + size int64 + tagdirs []tagDir + + resultQuad quad.Quad +} + +func (l *SQLLinkIterator) sqlClone() sqlIterator { + m := &SQLLinkIterator{ + tableName: l.tableName, + size: l.size, + constraints: make([]constraint, len(l.constraints)), + tagdirs: make([]tagDir, len(l.tagdirs)), + } + for _, i := range l.nodeIts { + m.nodeIts = append(m.nodeIts, sqlItDir{ + dir: i.dir, + it: i.it.sqlClone(), + }) + } + copy(m.constraints, l.constraints) + copy(m.tagdirs, l.tagdirs) + m.tagger.CopyFromTagger(l.Tagger()) + return m +} + +func (l *SQLLinkIterator) Tagger() *graph.Tagger { + return &l.tagger +} + +func (l *SQLLinkIterator) Result() graph.Value { + return l.resultQuad +} + +func (l *SQLLinkIterator) Size(qs *QuadStore) (int64, bool) { + if l.size != 0 { + return l.size, true + } + if len(l.constraints) > 0 { + l.size = qs.sizeForIterator(false, l.constraints[0].dir, l.constraints[0].vals[0]) + } else if len(l.nodeIts) > 1 { + subsize, _ := l.nodeIts[0].it.(*SQLNodeIterator).Size(qs) + return subsize * 20, false + } else { + return qs.Size(), false + } + return l.size, true +} + +func (l *SQLLinkIterator) Describe() string { + s, _ := l.buildSQL(true, nil) + return fmt.Sprintf("SQL_LINK_QUERY: %s", s) +} + +func (l *SQLLinkIterator) Type() sqlQueryType { + return link +} + +func (l *SQLLinkIterator) quickContains(v graph.Value) (bool, bool) { + for _, c := range l.constraints { + none := true + desired := v.(quad.Quad).Get(c.dir) + for _, s := range c.vals { + if s == desired { + none = false + break + } + } + if none { + return true, false + } + } + if len(l.nodeIts) == 0 { + return true, true + } + return false, false +} + +func (l *SQLLinkIterator) buildResult(result []string, cols []string) map[string]string { + var q quad.Quad + q.Subject = result[0] + q.Predicate = result[1] + q.Object = result[2] + q.Label = result[3] + l.resultQuad = q + m := make(map[string]string) + for i, c := range cols[4:] { + m[c] = result[i+4] + } + return m +} + +func (l *SQLLinkIterator) getTables() []tableDef { + out := []tableDef{tableDef{table: "quads", name: l.tableName}} + for _, i := range l.nodeIts { + out = append(out, i.it.getTables()...) + } + return out +} + +func (l *SQLLinkIterator) getTags() []tagDir { + var out []tagDir + for _, tag := range l.tagger.Tags() { + out = append(out, tagDir{ + dir: quad.Any, + table: l.tableName, + tag: tag, + }) + } + for _, tag := range l.tagdirs { + out = append(out, tagDir{ + dir: tag.dir, + table: l.tableName, + tag: tag.tag, + }) + + } + for _, i := range l.nodeIts { + out = append(out, i.it.getTags()...) + } + return out +} + +func (l *SQLLinkIterator) buildWhere() (string, []string) { + var q []string + var vals []string + for _, c := range l.constraints { + q = append(q, fmt.Sprintf("%s.%s_hash = ?", l.tableName, c.dir)) + vals = append(vals, hashOf(c.vals[0])) + } + for _, i := range l.nodeIts { + t := i.it.tableID() + dir := t.dir.String() + if t.dir == quad.Any { + dir = t.tag + } + q = append(q, fmt.Sprintf("%s.%s_hash = %s.%s_hash", l.tableName, i.dir, t.table, dir)) + } + for _, i := range l.nodeIts { + s, v := i.it.buildWhere() + q = append(q, s) + vals = append(vals, v...) + } + query := strings.Join(q, " AND ") + return query, vals +} + +func (l *SQLLinkIterator) tableID() tagDir { + return tagDir{ + dir: quad.Any, + table: l.tableName, + } +} + +func (l *SQLLinkIterator) buildSQL(next bool, val graph.Value) (string, []string) { + query := "SELECT " + t := []string{ + fmt.Sprintf("%s.subject", l.tableName), + fmt.Sprintf("%s.predicate", l.tableName), + fmt.Sprintf("%s.object", l.tableName), + fmt.Sprintf("%s.label", l.tableName), + } + for _, v := range l.getTags() { + t = append(t, v.String()) + } + query += strings.Join(t, ", ") + query += " FROM " + t = []string{} + var values []string + for _, k := range l.getTables() { + values = append(values, k.values...) + t = append(t, fmt.Sprintf("%s as %s", k.table, k.name)) + } + query += strings.Join(t, ", ") + constraint, wherevalues := l.buildWhere() + if constraint != "" { + query += " WHERE " + } + + values = append(values, wherevalues...) + if !next { + v := val.(quad.Quad) + if constraint != "" { + constraint += " AND " + } else { + constraint += " WHERE " + } + t = []string{ + fmt.Sprintf("%s.subject_hash = ?", l.tableName), + fmt.Sprintf("%s.predicate_hash = ?", l.tableName), + fmt.Sprintf("%s.object_hash = ?", l.tableName), + fmt.Sprintf("%s.label_hash = ?", l.tableName), + } + constraint += strings.Join(t, " AND ") + values = append(values, hashOf(v.Subject)) + values = append(values, hashOf(v.Predicate)) + values = append(values, hashOf(v.Object)) + values = append(values, hashOf(v.Label)) + } + query += constraint + query += ";" + + if glog.V(4) { + dstr := query + for i := 1; i <= len(values); i++ { + dstr = strings.Replace(dstr, "?", fmt.Sprintf("'%s'", values[i-1]), 1) + } + glog.V(4).Infoln(dstr) + } + return query, values +} + +func (l *SQLLinkIterator) sameTopResult(target []string, test []string) bool { + return target[0] == test[0] && target[1] == test[1] && target[2] == test[2] && target[3] == test[3] +} diff --git a/graph/sql/sql_link_iterator_test.go b/graph/sql/sql_link_iterator_test.go new file mode 100644 index 0000000..2eda766 --- /dev/null +++ b/graph/sql/sql_link_iterator_test.go @@ -0,0 +1,92 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "flag" + "fmt" + "testing" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +var postgres_path = flag.String("postgres_path", "", "Path to running DB") + +func TestSQLLink(t *testing.T) { + it := NewSQLLinkIterator(nil, quad.Object, "cool") + s, v := it.sql.buildSQL(true, nil) + t.Log(s, v) +} + +func TestSQLLinkIteration(t *testing.T) { + if *postgres_path == "" { + t.SkipNow() + } + db, err := newQuadStore(*postgres_path, nil) + qs := db.(*QuadStore) + if err != nil { + t.Fatal(err) + } + it := NewSQLLinkIterator(qs, quad.Object, "Humphrey Bogart") + for graph.Next(it) { + fmt.Println(it.Result()) + } + it = NewSQLLinkIterator(qs, quad.Subject, "/en/casablanca_1942") + s, v := it.sql.buildSQL(true, nil) + t.Log(s, v) + c := 0 + for graph.Next(it) { + fmt.Println(it.Result()) + c += 1 + } + if c != 18 { + t.Errorf("Not enough results, got %d expected 18", c) + } +} + +func TestSQLNodeIteration(t *testing.T) { + if *postgres_path == "" { + t.SkipNow() + } + db, err := newQuadStore(*postgres_path, nil) + if err != nil { + t.Fatal(err) + } + link := NewSQLLinkIterator(db.(*QuadStore), quad.Object, "/en/humphrey_bogart") + it := &SQLIterator{ + uid: iterator.NextUID(), + qs: db.(*QuadStore), + sql: &SQLNodeIterator{ + tableName: newTableName(), + linkIt: sqlItDir{ + it: link.sql, + dir: quad.Subject, + }, + }, + } + s, v := it.sql.buildSQL(true, nil) + t.Log(s, v) + c := 0 + for graph.Next(it) { + t.Log(it.Result()) + c += 1 + } + if c != 56 { + t.Errorf("Not enough results, got %d expected 56", c) + } + +} diff --git a/graph/sql/sql_node_intersection.go b/graph/sql/sql_node_intersection.go new file mode 100644 index 0000000..a9e26e2 --- /dev/null +++ b/graph/sql/sql_node_intersection.go @@ -0,0 +1,211 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "fmt" + "strings" + + "github.com/barakmich/glog" + "github.com/google/cayley/graph" + "github.com/google/cayley/quad" +) + +type SQLNodeIntersection struct { + tableName string + + nodeIts []sqlIterator + nodetables []string + size int64 + tagger graph.Tagger + + result string +} + +func (n *SQLNodeIntersection) sqlClone() sqlIterator { + m := &SQLNodeIntersection{ + tableName: n.tableName, + size: n.size, + } + for _, i := range n.nodeIts { + m.nodeIts = append(m.nodeIts, i.sqlClone()) + } + m.tagger.CopyFromTagger(n.Tagger()) + return m +} + +func (n *SQLNodeIntersection) Tagger() *graph.Tagger { + return &n.tagger +} + +func (n *SQLNodeIntersection) Result() graph.Value { + return n.result +} + +func (n *SQLNodeIntersection) Type() sqlQueryType { + return nodeIntersect +} + +func (n *SQLNodeIntersection) Size(qs *QuadStore) (int64, bool) { + return qs.Size() / int64(len(n.nodeIts)+1), true +} + +func (n *SQLNodeIntersection) Describe() string { + s, _ := n.buildSQL(true, nil) + return fmt.Sprintf("SQL_NODE_INTERSECTION: %s", s) +} + +func (n *SQLNodeIntersection) buildResult(result []string, cols []string) map[string]string { + m := make(map[string]string) + for i, c := range cols { + if strings.HasSuffix(c, "_hash") { + continue + } + if c == "__execd" { + n.result = result[i] + } + m[c] = result[i] + } + return m +} + +func (n *SQLNodeIntersection) makeNodeTableNames() { + if n.nodetables != nil { + return + } + n.nodetables = make([]string, len(n.nodeIts)) + for i, _ := range n.nodetables { + n.nodetables[i] = newNodeTableName() + } +} + +func (n *SQLNodeIntersection) getTables() []tableDef { + if len(n.nodeIts) == 0 { + panic("Combined no subnode queries") + } + return n.buildSubqueries() +} + +func (n *SQLNodeIntersection) buildSubqueries() []tableDef { + var out []tableDef + n.makeNodeTableNames() + for i, it := range n.nodeIts { + var td tableDef + var table string + table, td.values = it.buildSQL(true, nil) + td.table = fmt.Sprintf("\n(%s)", table[:len(table)-1]) + td.name = n.nodetables[i] + out = append(out, td) + } + return out +} + +func (n *SQLNodeIntersection) tableID() tagDir { + n.makeNodeTableNames() + return tagDir{ + table: n.nodetables[0], + dir: quad.Any, + tag: "__execd", + } +} + +func (n *SQLNodeIntersection) getLocalTags() []tagDir { + myTag := n.tableID() + var out []tagDir + for _, tag := range n.tagger.Tags() { + out = append(out, tagDir{ + dir: myTag.dir, + table: myTag.table, + tag: tag, + justLocal: true, + }) + } + return out +} + +func (n *SQLNodeIntersection) getTags() []tagDir { + out := n.getLocalTags() + n.makeNodeTableNames() + for i, it := range n.nodeIts { + for _, v := range it.getTags() { + out = append(out, tagDir{ + tag: v.tag, + dir: quad.Any, + table: n.nodetables[i], + }) + } + } + return out +} + +func (n *SQLNodeIntersection) buildWhere() (string, []string) { + var q []string + var vals []string + for _, tb := range n.nodetables[1:] { + q = append(q, fmt.Sprintf("%s.__execd_hash = %s.__execd_hash", n.nodetables[0], tb)) + } + query := strings.Join(q, " AND ") + return query, vals +} + +func (n *SQLNodeIntersection) buildSQL(next bool, val graph.Value) (string, []string) { + topData := n.tableID() + tags := []tagDir{topData} + tags = append(tags, n.getTags()...) + query := "SELECT " + var t []string + for _, v := range tags { + t = append(t, v.String()) + } + query += strings.Join(t, ", ") + query += " FROM " + t = []string{} + var values []string + for _, k := range n.getTables() { + values = append(values, k.values...) + t = append(t, fmt.Sprintf("%s as %s", k.table, k.name)) + } + query += strings.Join(t, ", ") + query += " WHERE " + + constraint, wherevalues := n.buildWhere() + values = append(values, wherevalues...) + + if !next { + v := val.(string) + if constraint != "" { + constraint += " AND " + } + constraint += fmt.Sprintf("%s.%s_hash = ?", topData.table, topData.dir) + values = append(values, hashOf(v)) + } + query += constraint + query += ";" + + if glog.V(4) { + dstr := query + for i := 1; i <= len(values); i++ { + dstr = strings.Replace(dstr, "?", fmt.Sprintf("'%s'", values[i-1]), 1) + } + glog.V(4).Infoln(dstr) + } + return query, values +} + +func (n *SQLNodeIntersection) sameTopResult(target []string, test []string) bool { + return target[0] == test[0] +} + +func (n *SQLNodeIntersection) quickContains(_ graph.Value) (bool, bool) { return false, false } diff --git a/graph/sql/sql_node_iterator.go b/graph/sql/sql_node_iterator.go new file mode 100644 index 0000000..811121a --- /dev/null +++ b/graph/sql/sql_node_iterator.go @@ -0,0 +1,213 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "fmt" + "strings" + "sync/atomic" + + "github.com/barakmich/glog" + "github.com/google/cayley/graph" + "github.com/google/cayley/quad" +) + +var sqlNodeTableID uint64 + +type sqlQueryType int + +const ( + node sqlQueryType = iota + link + nodeIntersect +) + +func init() { + atomic.StoreUint64(&sqlNodeTableID, 0) +} + +func newNodeTableName() string { + id := atomic.AddUint64(&sqlNodeTableID, 1) + return fmt.Sprintf("n_%d", id) +} + +type SQLNodeIterator struct { + tableName string + + linkIt sqlItDir + size int64 + tagger graph.Tagger + + result string +} + +func (n *SQLNodeIterator) sqlClone() sqlIterator { + m := &SQLNodeIterator{ + tableName: n.tableName, + size: n.size, + linkIt: sqlItDir{ + dir: n.linkIt.dir, + it: n.linkIt.it.sqlClone(), + }, + } + m.tagger.CopyFromTagger(n.Tagger()) + return m +} + +func (n *SQLNodeIterator) Tagger() *graph.Tagger { + return &n.tagger +} + +func (n *SQLNodeIterator) Result() graph.Value { + return n.result +} + +func (n *SQLNodeIterator) Type() sqlQueryType { + return node +} + +func (n *SQLNodeIterator) Size(qs *QuadStore) (int64, bool) { + return qs.Size() / 2, true +} + +func (n *SQLNodeIterator) Describe() string { + s, _ := n.buildSQL(true, nil) + return fmt.Sprintf("SQL_NODE_QUERY: %s", s) +} + +func (n *SQLNodeIterator) buildResult(result []string, cols []string) map[string]string { + m := make(map[string]string) + for i, c := range cols { + if strings.HasSuffix(c, "_hash") { + continue + } + if c == "__execd" { + n.result = result[i] + } + m[c] = result[i] + } + return m +} + +func (n *SQLNodeIterator) getTables() []tableDef { + var out []tableDef + if n.linkIt.it != nil { + out = n.linkIt.it.getTables() + } + if len(out) == 0 { + out = append(out, tableDef{table: "quads", name: n.tableName}) + } + return out +} + +func (n *SQLNodeIterator) tableID() tagDir { + if n.linkIt.it != nil { + return tagDir{ + table: n.linkIt.it.tableID().table, + dir: n.linkIt.dir, + tag: "__execd", + } + } + return tagDir{ + table: n.tableName, + dir: quad.Any, + tag: "__execd", + } +} + +func (n *SQLNodeIterator) getLocalTags() []tagDir { + myTag := n.tableID() + var out []tagDir + for _, tag := range n.tagger.Tags() { + out = append(out, tagDir{ + dir: myTag.dir, + table: myTag.table, + tag: tag, + justLocal: true, + }) + } + return out +} + +func (n *SQLNodeIterator) getTags() []tagDir { + out := n.getLocalTags() + if n.linkIt.it != nil { + out = append(out, n.linkIt.it.getTags()...) + } + return out +} + +func (n *SQLNodeIterator) buildWhere() (string, []string) { + var q []string + var vals []string + if n.linkIt.it != nil { + s, v := n.linkIt.it.buildWhere() + q = append(q, s) + vals = append(vals, v...) + } + query := strings.Join(q, " AND ") + return query, vals +} + +func (n *SQLNodeIterator) buildSQL(next bool, val graph.Value) (string, []string) { + topData := n.tableID() + tags := []tagDir{topData} + tags = append(tags, n.getTags()...) + query := "SELECT " + + var t []string + for _, v := range tags { + t = append(t, v.String()) + } + query += strings.Join(t, ", ") + query += " FROM " + t = []string{} + var values []string + for _, k := range n.getTables() { + values = append(values, k.values...) + t = append(t, fmt.Sprintf("%s as %s", k.table, k.name)) + } + query += strings.Join(t, ", ") + query += " WHERE " + + constraint, wherevalues := n.buildWhere() + values = append(values, wherevalues...) + + if !next { + v := val.(string) + if constraint != "" { + constraint += " AND " + } + constraint += fmt.Sprintf("%s.%s_hash = ?", topData.table, topData.dir) + values = append(values, hashOf(v)) + } + query += constraint + query += ";" + + if glog.V(4) { + dstr := query + for i := 1; i <= len(values); i++ { + dstr = strings.Replace(dstr, "?", fmt.Sprintf("'%s'", values[i-1]), 1) + } + glog.V(4).Infoln(dstr) + } + return query, values +} + +func (n *SQLNodeIterator) sameTopResult(target []string, test []string) bool { + return target[0] == test[0] +} + +func (n *SQLNodeIterator) quickContains(_ graph.Value) (bool, bool) { return false, false } diff --git a/integration/integration_test.go b/integration/integration_test.go index 4b5b726..51ab30a 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -37,19 +37,23 @@ import ( _ "github.com/google/cayley/graph/leveldb" _ "github.com/google/cayley/graph/memstore" _ "github.com/google/cayley/graph/mongo" + _ "github.com/google/cayley/graph/sql" // Load writer registry _ "github.com/google/cayley/writer" ) var backend = flag.String("backend", "memstore", "Which backend to test. Loads test data to /tmp if not present.") +var backendPath = flag.String("backend_path", "", "Path to the chosen backend. Will have sane testing defaults if not specified") var benchmarkQueries = []struct { message string long bool query string tag string - expect []interface{} + // for testing + skip bool + expect []interface{} }{ // Easy one to get us started. How quick is the most straightforward retrieval? { @@ -422,6 +426,7 @@ var ( ) func prepare(t testing.TB) { + var remote bool cfg.DatabaseType = *backend switch *backend { case "memstore": @@ -436,14 +441,21 @@ func prepare(t testing.TB) { cfg.DatabaseOptions = map[string]interface{}{ "database_name": "cayley_test", // provide a default test database } + remote = true + case "sql": + cfg.DatabasePath = "postgres://localhost/cayley_test" + remote = true default: t.Fatalf("Untestable backend store %s", *backend) } + if *backendPath != "" { + cfg.DatabasePath = *backendPath + } var err error create.Do(func() { needsLoad := true - if graph.IsPersistent(cfg.DatabaseType) { + if graph.IsPersistent(cfg.DatabaseType) && !remote { if _, err := os.Stat(cfg.DatabasePath); os.IsNotExist(err) { err = db.Init(cfg) if err != nil { @@ -459,7 +471,7 @@ func prepare(t testing.TB) { t.Fatalf("Failed to open %q: %v", cfg.DatabasePath, err) } - if needsLoad { + if needsLoad && !remote { err = internal.Load(handle.QuadWriter, cfg, "../data/30kmoviedata.nq.gz", "cquad") if err != nil { t.Fatalf("Failed to load %q: %v", cfg.DatabasePath, err) @@ -524,6 +536,11 @@ func checkQueries(t *testing.T) { if testing.Short() && test.long { continue } + if test.skip { + continue + } + tInit := time.Now() + t.Logf("Now testing %s ", test.message) ses := gremlin.NewSession(handle.QuadStore, cfg.Timeout, true) _, err := ses.Parse(test.query) if err != nil { @@ -552,6 +569,7 @@ func checkQueries(t *testing.T) { t.Error("Query timed out: skipping validation.") continue } + t.Logf("(%v)\n", time.Since(tInit)) if len(got) != len(test.expect) { t.Errorf("Unexpected number of results, got:%d expect:%d on %s.", len(got), len(test.expect), test.message) diff --git a/internal/db/repl.go b/internal/db/repl.go index 20ec017..134a8ff 100644 --- a/internal/db/repl.go +++ b/internal/db/repl.go @@ -161,7 +161,10 @@ func Repl(h *graph.Handle, queryLanguage string, cfg *config.Config) error { fmt.Printf("Error: not a valid quad: %v\n", err) continue } - h.QuadWriter.RemoveQuad(quad) + err = h.QuadWriter.RemoveQuad(quad) + if err != nil { + fmt.Printf("error deleting: %v\n", err) + } continue case "exit": diff --git a/quad/quad.go b/quad/quad.go index e448cf2..62e22ca 100644 --- a/quad/quad.go +++ b/quad/quad.go @@ -66,6 +66,8 @@ const ( Label ) +var Directions = []Direction{Subject, Predicate, Object, Label} + func (d Direction) Prefix() byte { switch d { case Any: diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index 583b8d0..0631d41 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -281,9 +281,9 @@ func (wk *worker) runIterator(it graph.Iterator) { if glog.V(2) { b, err := json.MarshalIndent(it.Describe(), "", " ") if err != nil { - glog.Infof("failed to format description: %v", err) + glog.V(2).Infof("failed to format description: %v", err) } else { - glog.Infof("%s", b) + glog.V(2).Infof("%s", b) } } for {