From 5be1df3be3f31802ac2fa6310b3daeee281512e2 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sat, 18 Apr 2015 19:55:39 -0400 Subject: [PATCH] initial mongo indexed linksto --- graph/iterator.go | 5 + graph/iterator/and_iterator.go | 6 +- graph/iterator/and_iterator_optimize.go | 12 +- graph/mongo/indexed_linksto.go | 264 +++++++++++++++++++++++++++++ graph/mongo/lru.go | 4 + graph/mongo/quadstore_iterator_optimize.go | 59 +++++++ query/gremlin/build_iterator.go | 22 +-- query/mql/build_iterator.go | 4 +- query/sexp/parser.go | 4 +- 9 files changed, 362 insertions(+), 18 deletions(-) create mode 100644 graph/mongo/indexed_linksto.go diff --git a/graph/iterator.go b/graph/iterator.go index 281ea64..65bbcc8 100644 --- a/graph/iterator.go +++ b/graph/iterator.go @@ -30,6 +30,11 @@ type Tagger struct { fixedTags map[string]Value } +type LinkageSet struct { + Dir quad.Direction + Values []Value +} + // Add a tag to the iterator. func (t *Tagger) Add(tag string) { t.tags = append(t.tags, tag) diff --git a/graph/iterator/and_iterator.go b/graph/iterator/and_iterator.go index 418a469..4e57672 100644 --- a/graph/iterator/and_iterator.go +++ b/graph/iterator/and_iterator.go @@ -31,13 +31,15 @@ type And struct { result graph.Value runstats graph.IteratorStats err error + qs graph.QuadStore } // Creates a new And iterator. -func NewAnd() *And { +func NewAnd(qs graph.QuadStore) *And { return &And{ uid: NextUID(), internalIterators: make([]graph.Iterator, 0, 20), + qs: qs, } } @@ -79,7 +81,7 @@ func (it *And) TagResults(dst map[string]graph.Value) { } func (it *And) Clone() graph.Iterator { - and := NewAnd() + and := NewAnd(it.qs) and.AddSubIterator(it.primaryIt.Clone()) and.tags.CopyFrom(it) for _, sub := range it.internalIterators { diff --git a/graph/iterator/and_iterator_optimize.go b/graph/iterator/and_iterator_optimize.go index 276774e..062df52 100644 --- a/graph/iterator/and_iterator_optimize.go +++ b/graph/iterator/and_iterator_optimize.go @@ -78,7 +78,7 @@ func (it *And) Optimize() (graph.Iterator, bool) { // The easiest thing to do at this point is merely to create a new And iterator // and replace ourselves with our (reordered, optimized) clone. - newAnd := NewAnd() + newAnd := NewAnd(it.qs) // Add the subiterators in order. for _, sub := range its { @@ -95,6 +95,16 @@ func (it *And) Optimize() (graph.Iterator, bool) { // the new And (they were unchanged upon calling Optimize() on them, at the // start). it.cleanUp() + + // Ask the graph.QuadStore if we can be replaced. Often times, this is a great + // optimization opportunity (there's a fixed iterator underneath us, for + // example). + newReplacement, hasOne := it.qs.OptimizeIterator(newAnd) + if hasOne { + newAnd.Close() + return newReplacement, true + } + return newAnd, true } diff --git a/graph/mongo/indexed_linksto.go b/graph/mongo/indexed_linksto.go new file mode 100644 index 0000000..249ad0b --- /dev/null +++ b/graph/mongo/indexed_linksto.go @@ -0,0 +1,264 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongo + +import ( + "github.com/barakmich/glog" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +type LinksTo struct { + uid uint64 + collection string + tags graph.Tagger + qs *QuadStore + primaryIt graph.Iterator + dir quad.Direction + nextIt *mgo.Iter + result graph.Value + runstats graph.IteratorStats + err error + lset []graph.LinkageSet +} + +// NewLinksTo constructs a new indexed LinksTo iterator for Mongo around a direction +// and a subiterator of nodes. +func NewLinksTo(qs *QuadStore, it graph.Iterator, collection string, d quad.Direction, lset []graph.LinkageSet) *LinksTo { + return &LinksTo{ + uid: iterator.NextUID(), + qs: qs, + primaryIt: it, + dir: d, + nextIt: nil, + lset: lset, + collection: collection, + } +} + +func (it *LinksTo) buildIteratorFor(d quad.Direction, val graph.Value) *mgo.Iter { + name := it.qs.NameOf(val) + constraint := bson.M{d.String(): name} + for _, set := range it.lset { + var s []string + for _, v := range set.Values { + s = append(s, it.qs.NameOf(v)) + } + constraint[set.Dir.String()] = bson.M{"$in": s} + } + glog.V(4).Infof("%#v", constraint) + return it.qs.db.C(it.collection).Find(constraint).Iter() +} + +func (it *LinksTo) UID() uint64 { + return it.uid +} + +func (it *LinksTo) Tagger() *graph.Tagger { + return &it.tags +} + +// Return the direction under consideration. +func (it *LinksTo) Direction() quad.Direction { return it.dir } + +// Tag these results, and our subiterator's results. +func (it *LinksTo) TagResults(dst map[string]graph.Value) { + for _, tag := range it.tags.Tags() { + dst[tag] = it.Result() + } + + for tag, value := range it.tags.Fixed() { + dst[tag] = value + } + + it.primaryIt.TagResults(dst) +} + +// DEPRECATED +func (it *LinksTo) ResultTree() *graph.ResultTree { + tree := graph.NewResultTree(it.Result()) + tree.AddSubtree(it.primaryIt.ResultTree()) + return tree +} + +// Optimize the LinksTo, by replacing it if it can be. +func (it *LinksTo) Optimize() (graph.Iterator, bool) { + return it, false +} + +func (it *LinksTo) Next() bool { + var result struct { + ID string `bson:"_id"` + Added []int64 `bson:"Added"` + Deleted []int64 `bson:"Deleted"` + } + graph.NextLogIn(it) + it.runstats.Next += 1 + if it.nextIt != nil && it.nextIt.Next(&result) { + it.runstats.ContainsNext += 1 + if it.collection == "quads" && len(result.Added) <= len(result.Deleted) { + return it.Next() + } + it.result = result.ID + return graph.NextLogOut(it, it.result, true) + } + + if it.nextIt != nil { + // If there's an error in the 'next' iterator, we save it and we're done. + it.err = it.nextIt.Err() + if it.err != nil { + return false + } + + } + // Subiterator is empty, get another one + if !graph.Next(it.primaryIt) { + // Possibly save error + it.err = it.primaryIt.Err() + + // We're out of nodes in our subiterator, so we're done as well. + return graph.NextLogOut(it, 0, false) + } + if it.nextIt != nil { + it.nextIt.Close() + } + it.nextIt = it.buildIteratorFor(it.dir, it.primaryIt.Result()) + + // Recurse -- return the first in the next set. + return it.Next() +} + +func (it *LinksTo) Err() error { + return it.err +} + +func (it *LinksTo) Result() graph.Value { + return it.result +} + +func (it *LinksTo) Close() error { + var err error + if it.nextIt != nil { + err = it.nextIt.Close() + } + + _err := it.primaryIt.Close() + if _err != nil && err == nil { + err = _err + } + + return err +} + +func (it *LinksTo) NextPath() bool { + ok := it.primaryIt.NextPath() + if !ok { + it.err = it.primaryIt.Err() + } + return ok +} + +var mongoIndexedLinksToType graph.Type + +func init() { + mongoIndexedLinksToType = graph.RegisterIterator("mongo-indexed-linksto") +} + +func (it *LinksTo) Type() graph.Type { + return mongoIndexedLinksToType +} + +var _ graph.Nexter = &LinksTo{} + +func (it *LinksTo) Clone() graph.Iterator { + m := NewLinksTo(it.qs, it.primaryIt.Clone(), it.collection, it.dir, it.lset) + m.tags.CopyFrom(it) + return m +} + +func (it *LinksTo) Contains(val graph.Value) bool { + graph.ContainsLogIn(it, val) + it.runstats.Contains += 1 + + for _, set := range it.lset { + dval := it.qs.QuadDirection(val, set.Dir) + good := false + for _, val := range set.Values { + if val == dval { + good = true + break + } + } + if !good { + return graph.ContainsLogOut(it, val, false) + } + } + + node := it.qs.QuadDirection(val, it.dir) + if it.primaryIt.Contains(node) { + it.result = val + return graph.ContainsLogOut(it, val, true) + } + it.err = it.primaryIt.Err() + return graph.ContainsLogOut(it, val, false) +} + +func (it *LinksTo) Describe() graph.Description { + primary := it.primaryIt.Describe() + return graph.Description{ + UID: it.UID(), + Type: it.Type(), + Direction: it.dir, + Iterator: &primary, + } +} + +func (it *LinksTo) Reset() { + it.primaryIt.Reset() + if it.nextIt != nil { + it.nextIt.Close() + } + it.nextIt = nil +} + +// Return a guess as to how big or costly it is to next the iterator. +func (it *LinksTo) Stats() graph.IteratorStats { + subitStats := it.primaryIt.Stats() + // TODO(barakmich): These should really come from the quadstore itself + fanoutFactor := int64(20) + checkConstant := int64(1) + nextConstant := int64(2) + return graph.IteratorStats{ + NextCost: nextConstant + subitStats.NextCost, + ContainsCost: checkConstant + subitStats.ContainsCost, + Size: fanoutFactor * subitStats.Size, + Next: it.runstats.Next, + Contains: it.runstats.Contains, + ContainsNext: it.runstats.ContainsNext, + } +} + +func (it *LinksTo) Size() (int64, bool) { + return it.Stats().Size, false +} + +// Return a list containing only our subiterator. +func (it *LinksTo) SubIterators() []graph.Iterator { + return []graph.Iterator{it.primaryIt} +} diff --git a/graph/mongo/lru.go b/graph/mongo/lru.go index aba2372..f22ef76 100644 --- a/graph/mongo/lru.go +++ b/graph/mongo/lru.go @@ -16,6 +16,7 @@ package mongo import ( "container/list" + "fmt" ) // TODO(kortschak) Reimplement without container/list. @@ -48,6 +49,9 @@ func (lru *cache) Put(key string, value string) { lru.removeOldest() } lru.priority.PushFront(kv{key: key, value: value}) + if lru.priority == nil { + fmt.Println("wat") + } lru.cache[key] = lru.priority.Front() } diff --git a/graph/mongo/quadstore_iterator_optimize.go b/graph/mongo/quadstore_iterator_optimize.go index 1fdd936..2120e13 100644 --- a/graph/mongo/quadstore_iterator_optimize.go +++ b/graph/mongo/quadstore_iterator_optimize.go @@ -15,6 +15,8 @@ package mongo import ( + "github.com/barakmich/glog" + "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" ) @@ -23,11 +25,68 @@ func (qs *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) switch it.Type() { case graph.LinksTo: return qs.optimizeLinksTo(it.(*iterator.LinksTo)) + case graph.And: + return qs.optimizeAndIterator(it.(*iterator.And)) } return it, false } +func (qs *QuadStore) optimizeAndIterator(it *iterator.And) (graph.Iterator, bool) { + // Fail fast if nothing can happen + glog.V(4).Infoln("Entering optimizeAndIterator", it.UID()) + found := false + for _, it := range it.SubIterators() { + glog.V(4).Infoln(it.Type()) + if it.Type() == mongoType { + found = true + } + } + if !found { + glog.V(4).Infoln("Aborting optimizeAndIterator") + return it, false + } + + newAnd := iterator.NewAnd(qs) + var firstmongo *Iterator + for _, it := range it.SubIterators() { + switch it.Type() { + case mongoType: + if firstmongo == nil { + firstmongo = it.(*Iterator) + } else { + newAnd.AddSubIterator(it) + } + case graph.LinksTo: + continue + default: + newAnd.AddSubIterator(it) + } + } + + lset := []graph.LinkageSet{ + { + Dir: firstmongo.dir, + Values: []graph.Value{qs.ValueOf(firstmongo.name)}, + }, + } + + ltocount := 0 + for _, it := range it.SubIterators() { + if it.Type() == graph.LinksTo { + lto := it.(*iterator.LinksTo) + newLto := NewLinksTo(qs, lto.SubIterators()[0], "quads", lto.Direction(), lset) + newAnd.AddSubIterator(newLto) + ltocount++ + } + } + if ltocount == 0 { + return it, false + } + + return newAnd.Optimize() +} + func (qs *QuadStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { subs := it.SubIterators() if len(subs) != 1 { diff --git a/query/gremlin/build_iterator.go b/query/gremlin/build_iterator.go index f92d473..e49c2d4 100644 --- a/query/gremlin/build_iterator.go +++ b/query/gremlin/build_iterator.go @@ -133,7 +133,7 @@ func buildInOutIterator(obj *otto.Object, qs graph.QuadStore, base graph.Iterato in, out = out, in } lto := iterator.NewLinksTo(qs, base, in) - and := iterator.NewAnd() + and := iterator.NewAnd(qs) and.AddSubIterator(iterator.NewLinksTo(qs, predicateNodeIterator, quad.Predicate)) and.AddSubIterator(lto) return iterator.NewHasA(qs, and, out) @@ -182,11 +182,11 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It } predFixed := qs.FixedIterator() predFixed.Add(qs.ValueOf(stringArgs[0])) - subAnd := iterator.NewAnd() + subAnd := iterator.NewAnd(qs) subAnd.AddSubIterator(iterator.NewLinksTo(qs, predFixed, quad.Predicate)) subAnd.AddSubIterator(iterator.NewLinksTo(qs, all, quad.Object)) hasa := iterator.NewHasA(qs, subAnd, quad.Subject) - and := iterator.NewAnd() + and := iterator.NewAnd(qs) and.AddSubIterator(hasa) and.AddSubIterator(subIt) it = and @@ -202,11 +202,11 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It } predFixed := qs.FixedIterator() predFixed.Add(qs.ValueOf(stringArgs[0])) - subAnd := iterator.NewAnd() + subAnd := iterator.NewAnd(qs) subAnd.AddSubIterator(iterator.NewLinksTo(qs, predFixed, quad.Predicate)) subAnd.AddSubIterator(iterator.NewLinksTo(qs, all, quad.Subject)) hasa := iterator.NewHasA(qs, subAnd, quad.Object) - and := iterator.NewAnd() + and := iterator.NewAnd(qs) and.AddSubIterator(hasa) and.AddSubIterator(subIt) it = and @@ -220,11 +220,11 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It } predFixed := qs.FixedIterator() predFixed.Add(qs.ValueOf(stringArgs[0])) - subAnd := iterator.NewAnd() + subAnd := iterator.NewAnd(qs) subAnd.AddSubIterator(iterator.NewLinksTo(qs, predFixed, quad.Predicate)) subAnd.AddSubIterator(iterator.NewLinksTo(qs, fixed, quad.Object)) hasa := iterator.NewHasA(qs, subAnd, quad.Subject) - and := iterator.NewAnd() + and := iterator.NewAnd(qs) and.AddSubIterator(hasa) and.AddSubIterator(subIt) it = and @@ -238,14 +238,14 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It } argIt := buildIteratorTree(firstArg.Object(), qs) - and := iterator.NewAnd() + and := iterator.NewAnd(qs) and.AddSubIterator(subIt) and.AddSubIterator(argIt) it = and case "back": arg, _ := obj.Get("_gremlin_back_chain") argIt := buildIteratorTree(arg.Object(), qs) - and := iterator.NewAnd() + and := iterator.NewAnd(qs) and.AddSubIterator(subIt) and.AddSubIterator(argIt) it = and @@ -254,7 +254,7 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It for _, name := range stringArgs { fixed.Add(qs.ValueOf(name)) } - and := iterator.NewAnd() + and := iterator.NewAnd(qs) and.AddSubIterator(fixed) and.AddSubIterator(subIt) it = and @@ -311,7 +311,7 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It toComplementIt := buildIteratorTree(firstArg.Object(), qs) notIt := iterator.NewNot(toComplementIt, allIt) - and := iterator.NewAnd() + and := iterator.NewAnd(qs) and.AddSubIterator(subIt) and.AddSubIterator(notIt) it = and diff --git a/query/mql/build_iterator.go b/query/mql/build_iterator.go index 8d81717..57ab8af 100644 --- a/query/mql/build_iterator.go +++ b/query/mql/build_iterator.go @@ -102,7 +102,7 @@ func (q *Query) buildIteratorTreeInternal(query interface{}, path Path) (it grap } func (q *Query) buildIteratorTreeMapInternal(query map[string]interface{}, path Path) (graph.Iterator, error) { - it := iterator.NewAnd() + it := iterator.NewAnd(q.ses.qs) it.AddSubIterator(q.ses.qs.NodesAllIterator()) var err error err = nil @@ -136,7 +136,7 @@ func (q *Query) buildIteratorTreeMapInternal(query map[string]interface{}, path if err != nil { return nil, err } - subAnd := iterator.NewAnd() + subAnd := iterator.NewAnd(q.ses.qs) predFixed := q.ses.qs.FixedIterator() predFixed.Add(q.ses.qs.ValueOf(pred)) subAnd.AddSubIterator(iterator.NewLinksTo(q.ses.qs, predFixed, quad.Predicate)) diff --git a/query/sexp/parser.go b/query/sexp/parser.go index 71e80d7..6f3adbc 100644 --- a/query/sexp/parser.go +++ b/query/sexp/parser.go @@ -213,7 +213,7 @@ func buildIteratorTree(tree *peg.ExpressionTree, qs graph.QuadStore) graph.Itera return lto case "RootConstraint": constraintCount := 0 - and := iterator.NewAnd() + and := iterator.NewAnd(qs) for _, c := range tree.Children { switch c.Name { case "NodeIdentifier": @@ -232,7 +232,7 @@ func buildIteratorTree(tree *peg.ExpressionTree, qs graph.QuadStore) graph.Itera var hasa *iterator.HasA topLevelDir := quad.Subject subItDir := quad.Object - subAnd := iterator.NewAnd() + subAnd := iterator.NewAnd(qs) isOptional := false for _, c := range tree.Children { switch c.Name {