initial mongo indexed linksto

This commit is contained in:
Barak Michener 2015-04-18 19:55:39 -04:00
parent 6764ea0295
commit 5be1df3be3
9 changed files with 362 additions and 18 deletions

View file

@ -30,6 +30,11 @@ type Tagger struct {
fixedTags map[string]Value
}
type LinkageSet struct {
Dir quad.Direction
Values []Value
}
// Add a tag to the iterator.
func (t *Tagger) Add(tag string) {
t.tags = append(t.tags, tag)

View file

@ -31,13 +31,15 @@ type And struct {
result graph.Value
runstats graph.IteratorStats
err error
qs graph.QuadStore
}
// Creates a new And iterator.
func NewAnd() *And {
func NewAnd(qs graph.QuadStore) *And {
return &And{
uid: NextUID(),
internalIterators: make([]graph.Iterator, 0, 20),
qs: qs,
}
}
@ -79,7 +81,7 @@ func (it *And) TagResults(dst map[string]graph.Value) {
}
func (it *And) Clone() graph.Iterator {
and := NewAnd()
and := NewAnd(it.qs)
and.AddSubIterator(it.primaryIt.Clone())
and.tags.CopyFrom(it)
for _, sub := range it.internalIterators {

View file

@ -78,7 +78,7 @@ func (it *And) Optimize() (graph.Iterator, bool) {
// The easiest thing to do at this point is merely to create a new And iterator
// and replace ourselves with our (reordered, optimized) clone.
newAnd := NewAnd()
newAnd := NewAnd(it.qs)
// Add the subiterators in order.
for _, sub := range its {
@ -95,6 +95,16 @@ func (it *And) Optimize() (graph.Iterator, bool) {
// the new And (they were unchanged upon calling Optimize() on them, at the
// start).
it.cleanUp()
// Ask the graph.QuadStore if we can be replaced. Often times, this is a great
// optimization opportunity (there's a fixed iterator underneath us, for
// example).
newReplacement, hasOne := it.qs.OptimizeIterator(newAnd)
if hasOne {
newAnd.Close()
return newReplacement, true
}
return newAnd, true
}

View file

@ -0,0 +1,264 @@
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mongo
import (
"github.com/barakmich/glog"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
type LinksTo struct {
uid uint64
collection string
tags graph.Tagger
qs *QuadStore
primaryIt graph.Iterator
dir quad.Direction
nextIt *mgo.Iter
result graph.Value
runstats graph.IteratorStats
err error
lset []graph.LinkageSet
}
// NewLinksTo constructs a new indexed LinksTo iterator for Mongo around a direction
// and a subiterator of nodes.
func NewLinksTo(qs *QuadStore, it graph.Iterator, collection string, d quad.Direction, lset []graph.LinkageSet) *LinksTo {
return &LinksTo{
uid: iterator.NextUID(),
qs: qs,
primaryIt: it,
dir: d,
nextIt: nil,
lset: lset,
collection: collection,
}
}
func (it *LinksTo) buildIteratorFor(d quad.Direction, val graph.Value) *mgo.Iter {
name := it.qs.NameOf(val)
constraint := bson.M{d.String(): name}
for _, set := range it.lset {
var s []string
for _, v := range set.Values {
s = append(s, it.qs.NameOf(v))
}
constraint[set.Dir.String()] = bson.M{"$in": s}
}
glog.V(4).Infof("%#v", constraint)
return it.qs.db.C(it.collection).Find(constraint).Iter()
}
func (it *LinksTo) UID() uint64 {
return it.uid
}
func (it *LinksTo) Tagger() *graph.Tagger {
return &it.tags
}
// Return the direction under consideration.
func (it *LinksTo) Direction() quad.Direction { return it.dir }
// Tag these results, and our subiterator's results.
func (it *LinksTo) TagResults(dst map[string]graph.Value) {
for _, tag := range it.tags.Tags() {
dst[tag] = it.Result()
}
for tag, value := range it.tags.Fixed() {
dst[tag] = value
}
it.primaryIt.TagResults(dst)
}
// DEPRECATED
func (it *LinksTo) ResultTree() *graph.ResultTree {
tree := graph.NewResultTree(it.Result())
tree.AddSubtree(it.primaryIt.ResultTree())
return tree
}
// Optimize the LinksTo, by replacing it if it can be.
func (it *LinksTo) Optimize() (graph.Iterator, bool) {
return it, false
}
func (it *LinksTo) Next() bool {
var result struct {
ID string `bson:"_id"`
Added []int64 `bson:"Added"`
Deleted []int64 `bson:"Deleted"`
}
graph.NextLogIn(it)
it.runstats.Next += 1
if it.nextIt != nil && it.nextIt.Next(&result) {
it.runstats.ContainsNext += 1
if it.collection == "quads" && len(result.Added) <= len(result.Deleted) {
return it.Next()
}
it.result = result.ID
return graph.NextLogOut(it, it.result, true)
}
if it.nextIt != nil {
// If there's an error in the 'next' iterator, we save it and we're done.
it.err = it.nextIt.Err()
if it.err != nil {
return false
}
}
// Subiterator is empty, get another one
if !graph.Next(it.primaryIt) {
// Possibly save error
it.err = it.primaryIt.Err()
// We're out of nodes in our subiterator, so we're done as well.
return graph.NextLogOut(it, 0, false)
}
if it.nextIt != nil {
it.nextIt.Close()
}
it.nextIt = it.buildIteratorFor(it.dir, it.primaryIt.Result())
// Recurse -- return the first in the next set.
return it.Next()
}
func (it *LinksTo) Err() error {
return it.err
}
func (it *LinksTo) Result() graph.Value {
return it.result
}
func (it *LinksTo) Close() error {
var err error
if it.nextIt != nil {
err = it.nextIt.Close()
}
_err := it.primaryIt.Close()
if _err != nil && err == nil {
err = _err
}
return err
}
func (it *LinksTo) NextPath() bool {
ok := it.primaryIt.NextPath()
if !ok {
it.err = it.primaryIt.Err()
}
return ok
}
var mongoIndexedLinksToType graph.Type
func init() {
mongoIndexedLinksToType = graph.RegisterIterator("mongo-indexed-linksto")
}
func (it *LinksTo) Type() graph.Type {
return mongoIndexedLinksToType
}
var _ graph.Nexter = &LinksTo{}
func (it *LinksTo) Clone() graph.Iterator {
m := NewLinksTo(it.qs, it.primaryIt.Clone(), it.collection, it.dir, it.lset)
m.tags.CopyFrom(it)
return m
}
func (it *LinksTo) Contains(val graph.Value) bool {
graph.ContainsLogIn(it, val)
it.runstats.Contains += 1
for _, set := range it.lset {
dval := it.qs.QuadDirection(val, set.Dir)
good := false
for _, val := range set.Values {
if val == dval {
good = true
break
}
}
if !good {
return graph.ContainsLogOut(it, val, false)
}
}
node := it.qs.QuadDirection(val, it.dir)
if it.primaryIt.Contains(node) {
it.result = val
return graph.ContainsLogOut(it, val, true)
}
it.err = it.primaryIt.Err()
return graph.ContainsLogOut(it, val, false)
}
func (it *LinksTo) Describe() graph.Description {
primary := it.primaryIt.Describe()
return graph.Description{
UID: it.UID(),
Type: it.Type(),
Direction: it.dir,
Iterator: &primary,
}
}
func (it *LinksTo) Reset() {
it.primaryIt.Reset()
if it.nextIt != nil {
it.nextIt.Close()
}
it.nextIt = nil
}
// Return a guess as to how big or costly it is to next the iterator.
func (it *LinksTo) Stats() graph.IteratorStats {
subitStats := it.primaryIt.Stats()
// TODO(barakmich): These should really come from the quadstore itself
fanoutFactor := int64(20)
checkConstant := int64(1)
nextConstant := int64(2)
return graph.IteratorStats{
NextCost: nextConstant + subitStats.NextCost,
ContainsCost: checkConstant + subitStats.ContainsCost,
Size: fanoutFactor * subitStats.Size,
Next: it.runstats.Next,
Contains: it.runstats.Contains,
ContainsNext: it.runstats.ContainsNext,
}
}
func (it *LinksTo) Size() (int64, bool) {
return it.Stats().Size, false
}
// Return a list containing only our subiterator.
func (it *LinksTo) SubIterators() []graph.Iterator {
return []graph.Iterator{it.primaryIt}
}

View file

@ -16,6 +16,7 @@ package mongo
import (
"container/list"
"fmt"
)
// TODO(kortschak) Reimplement without container/list.
@ -48,6 +49,9 @@ func (lru *cache) Put(key string, value string) {
lru.removeOldest()
}
lru.priority.PushFront(kv{key: key, value: value})
if lru.priority == nil {
fmt.Println("wat")
}
lru.cache[key] = lru.priority.Front()
}

View file

@ -15,6 +15,8 @@
package mongo
import (
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
)
@ -23,11 +25,68 @@ func (qs *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool)
switch it.Type() {
case graph.LinksTo:
return qs.optimizeLinksTo(it.(*iterator.LinksTo))
case graph.And:
return qs.optimizeAndIterator(it.(*iterator.And))
}
return it, false
}
func (qs *QuadStore) optimizeAndIterator(it *iterator.And) (graph.Iterator, bool) {
// Fail fast if nothing can happen
glog.V(4).Infoln("Entering optimizeAndIterator", it.UID())
found := false
for _, it := range it.SubIterators() {
glog.V(4).Infoln(it.Type())
if it.Type() == mongoType {
found = true
}
}
if !found {
glog.V(4).Infoln("Aborting optimizeAndIterator")
return it, false
}
newAnd := iterator.NewAnd(qs)
var firstmongo *Iterator
for _, it := range it.SubIterators() {
switch it.Type() {
case mongoType:
if firstmongo == nil {
firstmongo = it.(*Iterator)
} else {
newAnd.AddSubIterator(it)
}
case graph.LinksTo:
continue
default:
newAnd.AddSubIterator(it)
}
}
lset := []graph.LinkageSet{
{
Dir: firstmongo.dir,
Values: []graph.Value{qs.ValueOf(firstmongo.name)},
},
}
ltocount := 0
for _, it := range it.SubIterators() {
if it.Type() == graph.LinksTo {
lto := it.(*iterator.LinksTo)
newLto := NewLinksTo(qs, lto.SubIterators()[0], "quads", lto.Direction(), lset)
newAnd.AddSubIterator(newLto)
ltocount++
}
}
if ltocount == 0 {
return it, false
}
return newAnd.Optimize()
}
func (qs *QuadStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) {
subs := it.SubIterators()
if len(subs) != 1 {

View file

@ -133,7 +133,7 @@ func buildInOutIterator(obj *otto.Object, qs graph.QuadStore, base graph.Iterato
in, out = out, in
}
lto := iterator.NewLinksTo(qs, base, in)
and := iterator.NewAnd()
and := iterator.NewAnd(qs)
and.AddSubIterator(iterator.NewLinksTo(qs, predicateNodeIterator, quad.Predicate))
and.AddSubIterator(lto)
return iterator.NewHasA(qs, and, out)
@ -182,11 +182,11 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It
}
predFixed := qs.FixedIterator()
predFixed.Add(qs.ValueOf(stringArgs[0]))
subAnd := iterator.NewAnd()
subAnd := iterator.NewAnd(qs)
subAnd.AddSubIterator(iterator.NewLinksTo(qs, predFixed, quad.Predicate))
subAnd.AddSubIterator(iterator.NewLinksTo(qs, all, quad.Object))
hasa := iterator.NewHasA(qs, subAnd, quad.Subject)
and := iterator.NewAnd()
and := iterator.NewAnd(qs)
and.AddSubIterator(hasa)
and.AddSubIterator(subIt)
it = and
@ -202,11 +202,11 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It
}
predFixed := qs.FixedIterator()
predFixed.Add(qs.ValueOf(stringArgs[0]))
subAnd := iterator.NewAnd()
subAnd := iterator.NewAnd(qs)
subAnd.AddSubIterator(iterator.NewLinksTo(qs, predFixed, quad.Predicate))
subAnd.AddSubIterator(iterator.NewLinksTo(qs, all, quad.Subject))
hasa := iterator.NewHasA(qs, subAnd, quad.Object)
and := iterator.NewAnd()
and := iterator.NewAnd(qs)
and.AddSubIterator(hasa)
and.AddSubIterator(subIt)
it = and
@ -220,11 +220,11 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It
}
predFixed := qs.FixedIterator()
predFixed.Add(qs.ValueOf(stringArgs[0]))
subAnd := iterator.NewAnd()
subAnd := iterator.NewAnd(qs)
subAnd.AddSubIterator(iterator.NewLinksTo(qs, predFixed, quad.Predicate))
subAnd.AddSubIterator(iterator.NewLinksTo(qs, fixed, quad.Object))
hasa := iterator.NewHasA(qs, subAnd, quad.Subject)
and := iterator.NewAnd()
and := iterator.NewAnd(qs)
and.AddSubIterator(hasa)
and.AddSubIterator(subIt)
it = and
@ -238,14 +238,14 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It
}
argIt := buildIteratorTree(firstArg.Object(), qs)
and := iterator.NewAnd()
and := iterator.NewAnd(qs)
and.AddSubIterator(subIt)
and.AddSubIterator(argIt)
it = and
case "back":
arg, _ := obj.Get("_gremlin_back_chain")
argIt := buildIteratorTree(arg.Object(), qs)
and := iterator.NewAnd()
and := iterator.NewAnd(qs)
and.AddSubIterator(subIt)
and.AddSubIterator(argIt)
it = and
@ -254,7 +254,7 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It
for _, name := range stringArgs {
fixed.Add(qs.ValueOf(name))
}
and := iterator.NewAnd()
and := iterator.NewAnd(qs)
and.AddSubIterator(fixed)
and.AddSubIterator(subIt)
it = and
@ -311,7 +311,7 @@ func buildIteratorTreeHelper(obj *otto.Object, qs graph.QuadStore, base graph.It
toComplementIt := buildIteratorTree(firstArg.Object(), qs)
notIt := iterator.NewNot(toComplementIt, allIt)
and := iterator.NewAnd()
and := iterator.NewAnd(qs)
and.AddSubIterator(subIt)
and.AddSubIterator(notIt)
it = and

View file

@ -102,7 +102,7 @@ func (q *Query) buildIteratorTreeInternal(query interface{}, path Path) (it grap
}
func (q *Query) buildIteratorTreeMapInternal(query map[string]interface{}, path Path) (graph.Iterator, error) {
it := iterator.NewAnd()
it := iterator.NewAnd(q.ses.qs)
it.AddSubIterator(q.ses.qs.NodesAllIterator())
var err error
err = nil
@ -136,7 +136,7 @@ func (q *Query) buildIteratorTreeMapInternal(query map[string]interface{}, path
if err != nil {
return nil, err
}
subAnd := iterator.NewAnd()
subAnd := iterator.NewAnd(q.ses.qs)
predFixed := q.ses.qs.FixedIterator()
predFixed.Add(q.ses.qs.ValueOf(pred))
subAnd.AddSubIterator(iterator.NewLinksTo(q.ses.qs, predFixed, quad.Predicate))

View file

@ -213,7 +213,7 @@ func buildIteratorTree(tree *peg.ExpressionTree, qs graph.QuadStore) graph.Itera
return lto
case "RootConstraint":
constraintCount := 0
and := iterator.NewAnd()
and := iterator.NewAnd(qs)
for _, c := range tree.Children {
switch c.Name {
case "NodeIdentifier":
@ -232,7 +232,7 @@ func buildIteratorTree(tree *peg.ExpressionTree, qs graph.QuadStore) graph.Itera
var hasa *iterator.HasA
topLevelDir := quad.Subject
subItDir := quad.Object
subAnd := iterator.NewAnd()
subAnd := iterator.NewAnd(qs)
isOptional := false
for _, c := range tree.Children {
switch c.Name {