diff --git a/graph/gaedatastore/iterator.go b/graph/gaedatastore/iterator.go new file mode 100644 index 0000000..1586be8 --- /dev/null +++ b/graph/gaedatastore/iterator.go @@ -0,0 +1,322 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build appengine + +package gaedatastore + +import ( + "fmt" + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" + + "appengine/datastore" + "github.com/barakmich/glog" +) + +type Iterator struct { + uid uint64 + size int64 + tags graph.Tagger + dir quad.Direction + qs *QuadStore + name string + isAll bool + kind string + hash string + done bool + buffer []string + offset int + last string + result graph.Value +} + +var ( + bufferSize = 50 +) + +func NewIterator(qs *QuadStore, k string, d quad.Direction, val graph.Value) *Iterator { + t := val.(*Token) + if t == nil { + glog.Error("Token == nil") + } + if t.Kind != nodeKind { + glog.Error("Cannot create an iterator from a non-node value") + return &Iterator{done: true} + } + if k != nodeKind && k != quadKind { + glog.Error("Cannot create iterator for unknown kind") + return &Iterator{done: true} + } + if qs.context == nil { + glog.Error("Cannot create iterator without a valid context") + return &Iterator{done: true} + } + name := qs.NameOf(t) + + // The number of references to this node is held in the nodes entity + key := qs.createKeyFromToken(t) + foundNode := new(NodeEntry) + err := datastore.Get(qs.context, key, foundNode) + if err != nil && err != datastore.ErrNoSuchEntity { + glog.Errorf("Error: %v", err) + return &Iterator{done: true} + } + size := foundNode.Size + + return &Iterator{ + uid: iterator.NextUID(), + name: name, + dir: d, + qs: qs, + size: size, + isAll: false, + kind: k, + hash: t.Hash, + done: false, + } +} + +func NewAllIterator(qs *QuadStore, kind string) *Iterator { + if kind != nodeKind && kind != quadKind { + glog.Error("Cannot create iterator for an unknown kind") + return &Iterator{done: true} + } + if qs.context == nil { + glog.Error("Cannot create iterator without a valid context") + return &Iterator{done: true} + } + + var size int64 + if kind == nodeKind { + size = qs.NodeSize() + } else { + size = qs.Size() + } + + return &Iterator{ + uid: iterator.NextUID(), + qs: qs, + size: size, + dir: quad.Any, + isAll: true, + kind: kind, + done: false, + } +} + +func (it *Iterator) UID() uint64 { + return it.uid +} + +func (it *Iterator) Reset() { + it.buffer = nil + it.offset = 0 + it.done = false + it.last = "" + it.result = nil +} + +func (it *Iterator) Close() { + it.buffer = nil + it.offset = 0 + it.done = true + it.last = "" + it.result = nil +} + +func (it *Iterator) Tagger() *graph.Tagger { + return &it.tags +} +func (it *Iterator) Contains(v graph.Value) bool { + graph.ContainsLogIn(it, v) + if it.isAll { + // The result needs to be set, so when contains is called, the result can be retrieved + it.result = v + return graph.ContainsLogOut(it, v, true) + } + t := v.(*Token) + if t == nil { + glog.Error("Could not cast to token") + return graph.ContainsLogOut(it, v, false) + } + if t.Kind == nodeKind { + glog.Error("Contains does not work with node values") + return graph.ContainsLogOut(it, v, false) + } + // Contains is for when you want to know that an iterator refers to a quad + var offset int + switch it.dir { + case quad.Subject: + offset = 0 + case quad.Predicate: + offset = (it.qs.hashSize * 2) + case quad.Object: + offset = (it.qs.hashSize * 2) * 2 + case quad.Label: + offset = (it.qs.hashSize * 2) * 3 + } + val := t.Hash[offset : offset+(it.qs.hashSize*2)] + if val == it.hash { + return graph.ContainsLogOut(it, v, true) + } + return graph.ContainsLogOut(it, v, false) +} + +func (it *Iterator) TagResults(dst map[string]graph.Value) { + for _, tag := range it.tags.Tags() { + dst[tag] = it.Result() + } + for tag, value := range it.tags.Fixed() { + dst[tag] = value + } +} + +func (it *Iterator) Clone() graph.Iterator { + if it.isAll { + m := NewAllIterator(it.qs, it.kind) + m.tags.CopyFrom(it) + return m + } + m := NewIterator(it.qs, it.kind, it.dir, it.hash) + m.tags.CopyFrom(it) + return m +} + +func (it *Iterator) NextPath() bool { + return false +} + +// No subiterators. +func (it *Iterator) SubIterators() []graph.Iterator { + return nil +} + +func (it *Iterator) ResultTree() *graph.ResultTree { + return graph.NewResultTree(it.Result()) +} + +func (it *Iterator) Result() graph.Value { + return it.result +} + +func (it *Iterator) Next() bool { + if it.offset+1 < len(it.buffer) { + it.offset++ + it.result = &Token{Kind: it.kind, Hash: it.buffer[it.offset]} + return true + } + if it.done { + return false + } + // Reset buffer and offset + it.offset = 0 + it.buffer = make([]string, 0, bufferSize) + // Create query + // TODO (stefankoshiw) Keys only query? + q := datastore.NewQuery(it.kind).Limit(bufferSize) + if !it.isAll { + // Filter on the direction {subject,objekt...} + q = q.Filter(it.dir.String()+" =", it.name) + } + // Get last cursor position + cursor, err := datastore.DecodeCursor(it.last) + if err == nil { + q = q.Start(cursor) + } + // Buffer the keys of the next 50 matches + t := q.Run(it.qs.context) + for { + // Quirk of the datastore, you cannot pass a nil value to to Next() + // even if you just want the keys + var k *datastore.Key + skip := false + if it.kind == quadKind { + temp := new(QuadEntry) + k, err = t.Next(temp) + // Skip if quad has been deleted + if len(temp.Added) <= len(temp.Deleted) { + skip = true + } + } else { + temp := new(NodeEntry) + k, err = t.Next(temp) + // Skip if node has been deleted + if temp.Size == 0 { + skip = true + } + } + if err == datastore.Done { + it.done = true + break + } + if err != nil { + glog.Errorf("Error fetching next entry %v", err) + break + } + if !skip { + it.buffer = append(it.buffer, k.StringID()) + } + } + // Save cursor position + cursor, err = t.Cursor() + if err == nil { + it.last = cursor.String() + } + // Protect against bad queries + if it.done && len(it.buffer) == 0 { + glog.Warningf("Query did not return any results") + return false + } + // First result + it.result = &Token{Kind: it.kind, Hash: it.buffer[it.offset]} + return true +} + +func (it *Iterator) Size() (int64, bool) { + return it.size, true +} + +var gaedatastoreType graph.Type + +func init() { + gaedatastoreType = graph.RegisterIterator("gaedatastore") +} + +func Type() graph.Type { return gaedatastoreType } +func (it *Iterator) Type() graph.Type { return gaedatastoreType } +func (it *Iterator) Sorted() bool { return false } +func (it *Iterator) Optimize() (graph.Iterator, bool) { return it, false } +func (it *Iterator) Describe() graph.Description { + size, _ := it.Size() + return graph.Description{ + UID: it.UID(), + Name: fmt.Sprintf("%s/%s", it.name, it.hash), + Type: it.Type(), + Size: size, + Tags: it.tags.Tags(), + Direction: it.dir, + } +} + +// TODO (stefankoshiw) calculate costs +func (it *Iterator) Stats() graph.IteratorStats { + size, _ := it.Size() + return graph.IteratorStats{ + ContainsCost: 1, + NextCost: 5, + Size: size, + } +} diff --git a/graph/gaedatastore/quadstore.go b/graph/gaedatastore/quadstore.go new file mode 100644 index 0000000..f38047a --- /dev/null +++ b/graph/gaedatastore/quadstore.go @@ -0,0 +1,550 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build appengine + +package gaedatastore + +import ( + "crypto/sha1" + "encoding/hex" + "errors" + "hash" + "math" + "net/http" + + "appengine" + "appengine/datastore" + "github.com/barakmich/glog" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +const ( + QuadStoreType = "gaedatastore" + quadKind = "quad" + nodeKind = "node" +) + +var ( + // Order of quad fields + spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} +) + +type QuadStore struct { + hashSize int + makeHasher func() hash.Hash + context appengine.Context +} + +type MetadataEntry struct { + NodeCount int64 + QuadCount int64 +} + +type Token struct { + Kind string + Hash string +} + +type QuadEntry struct { + Hash string + Added []int64 `datastore:",noindex"` + Deleted []int64 `datastore:",noindex"` + Subject string `datastore:"subject"` + Predicate string `datastore:"predicate"` + Object string `datastore:"object"` + Label string `datastore:"label"` +} + +type NodeEntry struct { + Name string + Size int64 +} + +type LogEntry struct { + LogID int64 + Action string + Key string + Timestamp int64 +} + +func init() { + graph.RegisterQuadStore("gaedatastore", true, newQuadStore, initQuadStore, newQuadStoreForRequest) +} + +func initQuadStore(_ string, _ graph.Options) error { + // TODO (stefankoshiw) check appengine datastore for consistency + return nil +} + +func newQuadStore(_ string, options graph.Options) (graph.QuadStore, error) { + var qs QuadStore + qs.hashSize = sha1.Size + qs.makeHasher = sha1.New + return &qs, nil +} + +func newQuadStoreForRequest(qs graph.QuadStore, options graph.Options) (graph.QuadStore, error) { + newQs, err := newQuadStore("", options) + if err != nil { + return nil, err + } + t := newQs.(*QuadStore) + t.context, err = getContext(options) + return newQs, err +} + +func (qs *QuadStore) createKeyForQuad(q quad.Quad) *datastore.Key { + hasher := qs.makeHasher() + id := qs.hashOf(q.Subject, hasher) + id += qs.hashOf(q.Predicate, hasher) + id += qs.hashOf(q.Object, hasher) + id += qs.hashOf(q.Label, hasher) + return qs.createKeyFromToken(&Token{quadKind, id}) +} + +func (qs *QuadStore) createKeyForNode(n string) *datastore.Key { + hasher := qs.makeHasher() + id := qs.hashOf(n, hasher) + return qs.createKeyFromToken(&Token{nodeKind, id}) +} + +func (qs *QuadStore) createKeyForMetadata() *datastore.Key { + return qs.createKeyFromToken(&Token{"metadata", "metadataentry"}) +} + +func (qs *QuadStore) createKeyForLog(deltaID int64) *datastore.Key { + return datastore.NewKey(qs.context, "logentry", "", deltaID, nil) +} + +func (qs *QuadStore) createKeyFromToken(t *Token) *datastore.Key { + return datastore.NewKey(qs.context, t.Kind, t.Hash, 0, nil) +} + +func (qs *QuadStore) checkValid(k *datastore.Key) (bool, error) { + var q quad.Quad + err := datastore.Get(qs.context, k, &q) + if err == datastore.ErrNoSuchEntity { + return false, nil + } + if _, ok := err.(*datastore.ErrFieldMismatch); ok { + return true, nil + } + if err != nil { + glog.Warningf("Error occured when getting quad/node %s %v", k, err) + return false, err + } + return true, nil +} + +func getContext(opts graph.Options) (appengine.Context, error) { + req := opts["HTTPRequest"].(*http.Request) + if req == nil { + err := errors.New("HTTP Request needed") + glog.Fatalln(err) + return nil, err + } + return appengine.NewContext(req), nil +} + +func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { + if qs.context == nil { + return errors.New("No context, graph not correctly initialised") + } + toKeep := make([]graph.Delta, 0) + for _, d := range in { + key := qs.createKeyForQuad(d.Quad) + keep := false + switch d.Action { + case graph.Add: + if found, err := qs.checkValid(key); !found && err == nil { + keep = true + } else if err != nil { + return err + } else { + glog.Warningf("Quad exists already: %v", d) + } + case graph.Delete: + if found, err := qs.checkValid(key); found && err == nil { + keep = true + } else if err != nil { + return err + } else { + glog.Warningf("Quad does not exist and so cannot be deleted: %v", d) + } + default: + keep = true + } + + if keep { + toKeep = append(toKeep, d) + } + } + err := qs.updateLog(toKeep) + if err != nil { + glog.Errorf("Updating log failed %v", err) + return err + } + + if glog.V(2) { + glog.Infoln("Existence verified. Proceeding.") + } + + quadsAdded, err := qs.updateQuads(toKeep) + if err != nil { + glog.Errorf("UpdateQuads failed %v", err) + return err + } + nodesAdded, err := qs.updateNodes(toKeep) + if err != nil { + glog.Warningf("UpdateNodes failed %v", err) + return err + } + err = qs.updateMetadata(quadsAdded, nodesAdded) + if err != nil { + glog.Warningf("UpdateMetadata failed %v", err) + return err + } + return nil +} + +func (qs *QuadStore) updateNodes(in []graph.Delta) (int64, error) { + // Collate changes to each node + var countDelta int64 + var nodesAdded int64 + nodeDeltas := make(map[string]int64) + for _, d := range in { + if d.Action == graph.Add { + countDelta = 1 + } else { + countDelta = -1 + } + nodeDeltas[d.Quad.Subject] += countDelta + nodeDeltas[d.Quad.Object] += countDelta + nodeDeltas[d.Quad.Predicate] += countDelta + if d.Quad.Label != "" { + nodeDeltas[d.Quad.Label] += countDelta + } + nodesAdded += countDelta + } + // Create keys and new nodes + keys := make([]*datastore.Key, 0, len(nodeDeltas)) + tempNodes := make([]NodeEntry, 0, len(nodeDeltas)) + for k, v := range nodeDeltas { + keys = append(keys, qs.createKeyForNode(k)) + tempNodes = append(tempNodes, NodeEntry{k, v}) + } + // In accordance with the appengine datastore spec, cross group transactions + // like these can only be done in batches of 5 + for i := 0; i < len(nodeDeltas); i += 5 { + j := int(math.Min(float64(len(nodeDeltas)-i), 5)) + foundNodes := make([]NodeEntry, j) + err := datastore.RunInTransaction(qs.context, func(c appengine.Context) error { + err := datastore.GetMulti(c, keys[i:i+j], foundNodes) + // Sift through for errors + if me, ok := err.(appengine.MultiError); ok { + for _, merr := range me { + if merr != nil && merr != datastore.ErrNoSuchEntity { + glog.Errorf("Error: %v", merr) + return merr + } + } + } + // Carry forward the sizes of the nodes from the datastore + for k, _ := range foundNodes { + if foundNodes[k].Name != "" { + tempNodes[i+k].Size += foundNodes[k].Size + } + } + _, err = datastore.PutMulti(c, keys[i:i+j], tempNodes[i:i+j]) + return err + }, &datastore.TransactionOptions{XG: true}) + if err != nil { + glog.Errorf("Error: %v", err) + return 0, err + } + } + + return nodesAdded, nil +} + +func (qs *QuadStore) updateQuads(in []graph.Delta) (int64, error) { + keys := make([]*datastore.Key, 0, len(in)) + for _, d := range in { + keys = append(keys, qs.createKeyForQuad(d.Quad)) + } + var quadCount int64 + for i := 0; i < len(in); i += 5 { + // Find the closest batch of 5 + j := int(math.Min(float64(len(in)-i), 5)) + err := datastore.RunInTransaction(qs.context, func(c appengine.Context) error { + foundQuads := make([]QuadEntry, j) + // We don't process errors from GetMulti as they don't mean anything, + // we've handled existing quad conflicts above and we overwrite everything again anyways + datastore.GetMulti(c, keys, foundQuads) + for k, _ := range foundQuads { + x := i + k + foundQuads[k].Hash = keys[x].StringID() + foundQuads[k].Subject = in[x].Quad.Subject + foundQuads[k].Predicate = in[x].Quad.Predicate + foundQuads[k].Object = in[x].Quad.Object + foundQuads[k].Label = in[x].Quad.Label + + // If the quad exists the Added[] will be non-empty + if in[x].Action == graph.Add { + foundQuads[k].Added = append(foundQuads[k].Added, in[x].ID) + quadCount += 1 + } else { + foundQuads[k].Deleted = append(foundQuads[k].Deleted, in[x].ID) + quadCount -= 1 + } + } + _, err := datastore.PutMulti(c, keys[i:i+j], foundQuads) + return err + }, &datastore.TransactionOptions{XG: true}) + if err != nil { + return 0, err + } + } + return quadCount, nil +} + +func (qs *QuadStore) updateMetadata(quadsAdded int64, nodesAdded int64) error { + key := qs.createKeyForMetadata() + foundMetadata := new(MetadataEntry) + err := datastore.RunInTransaction(qs.context, func(c appengine.Context) error { + err := datastore.Get(c, key, foundMetadata) + if err != nil && err != datastore.ErrNoSuchEntity { + glog.Errorf("Error: %v", err) + return err + } + foundMetadata.QuadCount += quadsAdded + foundMetadata.NodeCount += nodesAdded + _, err = datastore.Put(c, key, foundMetadata) + if err != nil { + glog.Errorf("Error: %v", err) + } + return err + }, nil) + return err +} + +func (qs *QuadStore) updateLog(in []graph.Delta) error { + if qs.context == nil { + err := errors.New("Error updating log, context is nil, graph not correctly initialised") + return err + } + if len(in) == 0 { + return errors.New("Nothing to log") + } + logEntries := make([]LogEntry, 0, len(in)) + logKeys := make([]*datastore.Key, 0, len(in)) + for _, d := range in { + var action string + if d.Action == graph.Add { + action = "Add" + } else { + action = "Delete" + } + + entry := LogEntry{ + LogID: d.ID, + Action: action, + Key: qs.createKeyForQuad(d.Quad).String(), + Timestamp: d.Timestamp.UnixNano(), + } + logEntries = append(logEntries, entry) + logKeys = append(logKeys, qs.createKeyForLog(d.ID)) + } + + _, err := datastore.PutMulti(qs.context, logKeys, logEntries) + if err != nil { + glog.Errorf("Error updating log: %v", err) + } + return err +} + +func (qs *QuadStore) QuadIterator(dir quad.Direction, v graph.Value) graph.Iterator { + return NewIterator(qs, quadKind, dir, v) +} + +func (qs *QuadStore) NodesAllIterator() graph.Iterator { + return NewAllIterator(qs, nodeKind) +} + +func (qs *QuadStore) QuadsAllIterator() graph.Iterator { + return NewAllIterator(qs, quadKind) +} + +func (qs *QuadStore) ValueOf(s string) graph.Value { + hasher := qs.makeHasher() + id := qs.hashOf(s, hasher) + return &Token{Kind: nodeKind, Hash: id} +} + +func (qs *QuadStore) NameOf(val graph.Value) string { + if qs.context == nil { + glog.Error("Error in NameOf, context is nil, graph not correctly initialised") + return "" + } + var key *datastore.Key + if t, ok := val.(*Token); ok && t.Kind == nodeKind { + key = qs.createKeyFromToken(t) + } else { + glog.Error("Token not valid") + return "" + } + + // TODO (stefankoshiw) implement a cache + + node := new(NodeEntry) + err := datastore.Get(qs.context, key, node) + if err != nil { + glog.Errorf("Error: %v", err) + return "" + } + return node.Name +} + +func (qs *QuadStore) Quad(val graph.Value) quad.Quad { + if qs.context == nil { + glog.Error("Error fetching quad, context is nil, graph not correctly initialised") + return quad.Quad{} + } + var key *datastore.Key + if t, ok := val.(*Token); ok && t.Kind == quadKind { + key = qs.createKeyFromToken(t) + } else { + glog.Error("Token not valid") + return quad.Quad{} + } + + q := new(QuadEntry) + err := datastore.Get(qs.context, key, q) + if err != nil { + // Red herring error : ErrFieldMismatch can happen when a quad exists but a field is empty + if _, ok := err.(*datastore.ErrFieldMismatch); !ok { + glog.Errorf("Error: %v", err) + } + } + return quad.Quad{ + Subject: q.Subject, + Predicate: q.Predicate, + Object: q.Object, + Label: q.Label} +} + +func (qs *QuadStore) Size() int64 { + if qs.context == nil { + glog.Error("Error fetching size, context is nil, graph not correctly initialised") + return 0 + } + key := qs.createKeyForMetadata() + foundMetadata := new(MetadataEntry) + err := datastore.Get(qs.context, key, foundMetadata) + if err != nil { + glog.Warningf("Error: %v", err) + return 0 + } + return foundMetadata.QuadCount +} + +func (qs *QuadStore) NodeSize() int64 { + if qs.context == nil { + glog.Error("Error fetching node size, context is nil, graph not correctly initialised") + return 0 + } + key := qs.createKeyForMetadata() + foundMetadata := new(MetadataEntry) + err := datastore.Get(qs.context, key, foundMetadata) + if err != nil { + glog.Warningf("Error: %v", err) + return 0 + } + return foundMetadata.NodeCount +} + +func (qs *QuadStore) Horizon() int64 { + if qs.context == nil { + glog.Warning("Error fetching horizon, context is nil, graph not correctly initialised") + return 0 + } + // Query log for last entry... + q := datastore.NewQuery("logentry").Order("-Timestamp").Limit(1) + var logEntries []LogEntry + _, err := q.GetAll(qs.context, &logEntries) + if err != nil || len(logEntries) == 0 { + // Error fetching horizon, probably graph is empty + return 0 + } + return logEntries[0].LogID +} + +func compareTokens(a, b graph.Value) bool { + atok := a.(*Token) + btok := b.(*Token) + return atok.Kind == btok.Kind && atok.Hash == btok.Hash +} + +func (qs *QuadStore) FixedIterator() graph.FixedIterator { + return iterator.NewFixed(compareTokens) +} + +func (qs *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { + return nil, false +} + +func (qs *QuadStore) Close() { + qs.context = nil +} + +func (qs *QuadStore) QuadDirection(val graph.Value, dir quad.Direction) graph.Value { + t, ok := val.(*Token) + if !ok { + glog.Error("Token not valid") + return nil + } + if t.Kind == nodeKind { + glog.Error("Node tokens not valid") + return nil + } + var offset int + switch dir { + case quad.Subject: + offset = 0 + case quad.Predicate: + offset = (qs.hashSize * 2) + case quad.Object: + offset = (qs.hashSize * 2) * 2 + case quad.Label: + offset = (qs.hashSize * 2) * 3 + } + sub := t.Hash[offset : offset+(qs.hashSize*2)] + return &Token{Kind: nodeKind, Hash: sub} +} + +func (qs *QuadStore) hashOf(s string, hasher hash.Hash) string { + hasher.Reset() + key := make([]byte, 0, qs.hashSize) + hasher.Write([]byte(s)) + key = hasher.Sum(key) + return hex.EncodeToString(key) +} + +func (qs *QuadStore) GetType() string { + return QuadStoreType +} diff --git a/graph/gaedatastore/quadstore_test.go b/graph/gaedatastore/quadstore_test.go new file mode 100644 index 0000000..b27be1e --- /dev/null +++ b/graph/gaedatastore/quadstore_test.go @@ -0,0 +1,315 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. // +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gaedatastore + +import ( + "sort" + "testing" + + "errors" + "github.com/barakmich/glog" + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" + "github.com/google/cayley/writer" + "reflect" + + "appengine/aetest" +) + +// This is a simple test graph. +// +// +---+ +---+ +// | A |------- ->| F |<-- +// +---+ \------>+---+-/ +---+ \--+---+ +// ------>|#B#| | | E | +// +---+-------/ >+---+ | +---+ +// | C | / v +// +---+ -/ +---+ +// ---- +---+/ |#G#| +// \-->|#D#|------------->+---+ +// +---+ +// +var simpleGraph = []quad.Quad{ + {"A", "follows", "B", ""}, + {"C", "follows", "B", ""}, + {"C", "follows", "D", ""}, + {"D", "follows", "B", ""}, + {"B", "follows", "F", ""}, + {"F", "follows", "G", ""}, + {"D", "follows", "G", ""}, + {"E", "follows", "F", ""}, + {"B", "status", "cool", "status_graph"}, + {"D", "status", "cool", "status_graph"}, + {"G", "status", "cool", "status_graph"}, +} +var simpleGraphUpdate = []quad.Quad{ + {"A", "follows", "B", ""}, + {"F", "follows", "B", ""}, + {"C", "follows", "D", ""}, + {"X", "follows", "B", ""}, +} + +type pair struct { + query string + value int64 +} + +func makeTestStore(data []quad.Quad, opts graph.Options) (graph.QuadStore, graph.QuadWriter, []pair) { + seen := make(map[string]struct{}) + + qs, _ := newQuadStore("", opts) + qs, _ = newQuadStoreForRequest(qs, opts) + var ( + val int64 + ind []pair + ) + writer, _ := writer.NewSingleReplication(qs, nil) + for _, t := range data { + for _, qp := range []string{t.Subject, t.Predicate, t.Object, t.Label} { + if _, ok := seen[qp]; !ok && qp != "" { + val++ + ind = append(ind, pair{qp, val}) + seen[qp] = struct{}{} + } + } + } + writer.AddQuadSet(data) + return qs, writer, ind +} + +func iterateResults(qs graph.QuadStore, it graph.Iterator) []string { + var res []string + for graph.Next(it) { + v := it.Result() + if t, ok := v.(*Token); ok && t.Kind == nodeKind { + res = append(res, qs.NameOf(it.Result())) + } else { + res = append(res, qs.Quad(it.Result()).String()) + } + } + sort.Strings(res) + it.Reset() + return res +} + +func printIterator(qs graph.QuadStore, it graph.Iterator) { + for graph.Next(it) { + glog.Infof("%v", qs.Quad(it.Result())) + } +} + +func compareResults(qs graph.QuadStore, it graph.Iterator, expect []string) ([]string, bool) { + sort.Strings(expect) + for i := 0; i < 2; i++ { + got := iterateResults(qs, it) + sort.Strings(got) + if !reflect.DeepEqual(got, expect) { + return got, false + } + } + return nil, true +} + +func createInstance() (aetest.Instance, graph.Options, error) { + inst, err := aetest.NewInstance(&aetest.Options{"", true}) + if err != nil { + return nil, nil, errors.New("Creation of new instance failed") + } + req1, err := inst.NewRequest("POST", "/api/v1/write", nil) + if err != nil { + return nil, nil, errors.New("Creation of new request failed") + } + opts := make(graph.Options) + opts["HTTPRequest"] = req1 + + if inst == nil { + glog.Info("help") + } + + return inst, opts, nil +} + +func TestAddRemove(t *testing.T) { + inst, opts, err := createInstance() + defer inst.Close() + + if err != nil { + t.Fatalf("failed to create instance: %v", err) + } + + // Add quads + qs, writer, _ := makeTestStore(simpleGraph, opts) + if qs.Size() != 11 { + t.Fatal("Incorrect number of quads") + } + all := qs.NodesAllIterator() + expect := []string{ + "A", + "B", + "C", + "D", + "E", + "F", + "G", + "follows", + "status", + "cool", + "status_graph", + } + if got, ok := compareResults(qs, all, expect); !ok { + t.Errorf("Unexpected iterated result, got:%v expect:%v", got, expect) + } + + // Add more quads, some conflicts + if err := writer.AddQuadSet(simpleGraphUpdate); err != nil { + t.Errorf("AddQuadSet failed, %v", err) + } + if qs.Size() != 13 { + t.Fatal("Incorrect number of quads") + } + all = qs.NodesAllIterator() + expect = []string{ + "A", + "B", + "C", + "D", + "E", + "F", + "G", + "X", + "follows", + "status", + "cool", + "status_graph", + } + if got, ok := compareResults(qs, all, expect); !ok { + t.Errorf("Unexpected iterated result, got:%v expect:%v", got, expect) + } + + // Remove quad + toRemove := quad.Quad{"X", "follows", "B", ""} + err = writer.RemoveQuad(toRemove) + if err != nil { + t.Errorf("RemoveQuad failed: %v", err) + } + expect = []string{ + "A", + "B", + "C", + "D", + "E", + "F", + "G", + "follows", + "status", + "cool", + "status_graph", + } + if got, ok := compareResults(qs, all, expect); !ok { + t.Errorf("Unexpected iterated result, got:%v expect:%v", got, expect) + } +} + +func TestIterators(t *testing.T) { + glog.Info("\n-----------\n") + inst, opts, err := createInstance() + defer inst.Close() + + if err != nil { + t.Fatalf("failed to create instance: %v", err) + } + qs, _, _ := makeTestStore(simpleGraph, opts) + if qs.Size() != 11 { + t.Fatal("Incorrect number of quads") + } + + var expected = []string{ + quad.Quad{"C", "follows", "B", ""}.String(), + quad.Quad{"C", "follows", "D", ""}.String(), + } + + it := qs.QuadIterator(quad.Subject, qs.ValueOf("C")) + if got, ok := compareResults(qs, it, expected); !ok { + t.Errorf("Unexpected iterated result, got:%v expect:%v", got, expected) + } + + // Test contains + it = qs.QuadIterator(quad.Label, qs.ValueOf("status_graph")) + gqs := qs.(*QuadStore) + key := gqs.createKeyForQuad(quad.Quad{"G", "status", "cool", "status_graph"}) + token := &Token{quadKind, key.StringID()} + if !it.Contains(token) { + t.Error("Contains failed") + } +} + +func TestIteratorsAndNextResultOrderA(t *testing.T) { + glog.Info("\n-----------\n") + inst, opts, err := createInstance() + defer inst.Close() + + if err != nil { + t.Fatalf("failed to create instance: %v", err) + } + qs, _, _ := makeTestStore(simpleGraph, opts) + if qs.Size() != 11 { + t.Fatal("Incorrect number of quads") + } + + fixed := qs.FixedIterator() + fixed.Add(qs.ValueOf("C")) + + fixed2 := qs.FixedIterator() + fixed2.Add(qs.ValueOf("follows")) + + all := qs.NodesAllIterator() + + innerAnd := iterator.NewAnd() + innerAnd.AddSubIterator(iterator.NewLinksTo(qs, fixed2, quad.Predicate)) + innerAnd.AddSubIterator(iterator.NewLinksTo(qs, all, quad.Object)) + + hasa := iterator.NewHasA(qs, innerAnd, quad.Subject) + outerAnd := iterator.NewAnd() + outerAnd.AddSubIterator(fixed) + outerAnd.AddSubIterator(hasa) + + if !outerAnd.Next() { + t.Error("Expected one matching subtree") + } + val := outerAnd.Result() + if qs.NameOf(val) != "C" { + t.Errorf("Matching subtree should be %s, got %s", "barak", qs.NameOf(val)) + } + + var ( + got []string + expect = []string{"B", "D"} + ) + for { + got = append(got, qs.NameOf(all.Result())) + if !outerAnd.NextPath() { + break + } + } + sort.Strings(got) + + if !reflect.DeepEqual(got, expect) { + t.Errorf("Unexpected result, got:%q expect:%q", got, expect) + } + + if outerAnd.Next() { + t.Error("More than one possible top level output?") + } +}