diff --git a/graph/memstore/all_iterator.go b/graph/memstore/all_iterator.go index 658a4a1..8e173a5 100644 --- a/graph/memstore/all_iterator.go +++ b/graph/memstore/all_iterator.go @@ -24,8 +24,11 @@ type AllIterator struct { ts *TripleStore } -func NewMemstoreAllIterator(ts *TripleStore) *AllIterator { - var out AllIterator +type NodesAllIterator AllIterator +type QuadsAllIterator AllIterator + +func NewMemstoreNodesAllIterator(ts *TripleStore) *NodesAllIterator { + var out NodesAllIterator out.Int64 = *iterator.NewInt64(1, ts.idCounter-1) out.ts = ts return &out @@ -36,15 +39,33 @@ func (it *AllIterator) SubIterators() []graph.Iterator { return nil } -func (it *AllIterator) Next() (graph.Value, bool) { - next, out := it.Int64.Next() +func (nit *NodesAllIterator) Next() (graph.Value, bool) { + next, out := nit.Int64.Next() if !out { return next, out } i64 := next.(int64) - _, ok := it.ts.revIdMap[i64] + _, ok := nit.ts.revIdMap[i64] if !ok { - return it.Next() + return nit.Next() + } + return next, out +} + +func NewMemstoreQuadsAllIterator(ts *TripleStore) *QuadsAllIterator { + var out QuadsAllIterator + out.Int64 = *iterator.NewInt64(1, ts.quadIdCounter-1) + out.ts = ts + return &out +} + +func (qit *QuadsAllIterator) Next() (graph.Value, bool) { + next, out := qit.Int64.Next() + if out { + i64 := next.(int64) + if qit.ts.log[i64].DeletedBy != 0 || qit.ts.log[i64].Action == graph.Delete { + return qit.Next() + } } return next, out } diff --git a/graph/memstore/iterator.go b/graph/memstore/iterator.go index 8a7e1ef..3ab9d34 100644 --- a/graph/memstore/iterator.go +++ b/graph/memstore/iterator.go @@ -27,6 +27,7 @@ import ( type Iterator struct { uid uint64 + ts *TripleStore tags graph.Tagger tree *llrb.LLRB data string @@ -54,9 +55,10 @@ func IterateOne(tree *llrb.LLRB, last Int64) Int64 { return next } -func NewLlrbIterator(tree *llrb.LLRB, data string) *Iterator { +func NewLlrbIterator(tree *llrb.LLRB, data string, ts *TripleStore) *Iterator { return &Iterator{ uid: iterator.NextUID(), + ts: ts, tree: tree, iterLast: Int64(-1), data: data, @@ -86,19 +88,26 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) { } func (it *Iterator) Clone() graph.Iterator { - m := NewLlrbIterator(it.tree, it.data) + m := NewLlrbIterator(it.tree, it.data, it.ts) m.tags.CopyFrom(it) return m } func (it *Iterator) Close() {} +func (it *Iterator) checkValid(index int64) bool { + return it.ts.log[index].DeletedBy == 0 +} + func (it *Iterator) Next() (graph.Value, bool) { graph.NextLogIn(it) - if it.tree.Max() == nil || it.result == int64(it.tree.Max().(Int64)) { + if it.tree.Max() == nil || it.iterLast == it.tree.Max().(Int64) { return graph.NextLogOut(it, nil, false) } it.iterLast = IterateOne(it.tree, it.iterLast) + if !it.checkValid(int64(it.iterLast)) { + return it.Next() + } it.result = int64(it.iterLast) return graph.NextLogOut(it, it.result, true) } @@ -126,7 +135,7 @@ func (it *Iterator) Size() (int64, bool) { func (it *Iterator) Contains(v graph.Value) bool { graph.ContainsLogIn(it, v) - if it.tree.Has(Int64(v.(int64))) { + if it.tree.Has(Int64(v.(int64))) && it.checkValid(v.(int64)) { it.result = v return graph.ContainsLogOut(it, v, true) } diff --git a/graph/memstore/triplestore.go b/graph/memstore/triplestore.go index 23eb11a..75152df 100644 --- a/graph/memstore/triplestore.go +++ b/graph/memstore/triplestore.go @@ -31,58 +31,63 @@ func init() { }, nil) } -type TripleDirectionIndex struct { +type QuadDirectionIndex struct { subject map[int64]*llrb.LLRB predicate map[int64]*llrb.LLRB object map[int64]*llrb.LLRB label map[int64]*llrb.LLRB } -func NewTripleDirectionIndex() *TripleDirectionIndex { - var tdi TripleDirectionIndex - tdi.subject = make(map[int64]*llrb.LLRB) - tdi.predicate = make(map[int64]*llrb.LLRB) - tdi.object = make(map[int64]*llrb.LLRB) - tdi.label = make(map[int64]*llrb.LLRB) - return &tdi +func NewQuadDirectionIndex() *QuadDirectionIndex { + var qdi QuadDirectionIndex + qdi.subject = make(map[int64]*llrb.LLRB) + qdi.predicate = make(map[int64]*llrb.LLRB) + qdi.object = make(map[int64]*llrb.LLRB) + qdi.label = make(map[int64]*llrb.LLRB) + return &qdi } -func (tdi *TripleDirectionIndex) GetForDir(d quad.Direction) map[int64]*llrb.LLRB { +func (qdi *QuadDirectionIndex) GetForDir(d quad.Direction) map[int64]*llrb.LLRB { switch d { case quad.Subject: - return tdi.subject + return qdi.subject case quad.Object: - return tdi.object + return qdi.object case quad.Predicate: - return tdi.predicate + return qdi.predicate case quad.Label: - return tdi.label + return qdi.label } panic("illegal direction") } -func (tdi *TripleDirectionIndex) GetOrCreate(d quad.Direction, id int64) *llrb.LLRB { - directionIndex := tdi.GetForDir(d) +func (qdi *QuadDirectionIndex) GetOrCreate(d quad.Direction, id int64) *llrb.LLRB { + directionIndex := qdi.GetForDir(d) if _, ok := directionIndex[id]; !ok { directionIndex[id] = llrb.New() } return directionIndex[id] } -func (tdi *TripleDirectionIndex) Get(d quad.Direction, id int64) (*llrb.LLRB, bool) { - directionIndex := tdi.GetForDir(d) +func (qdi *QuadDirectionIndex) Get(d quad.Direction, id int64) (*llrb.LLRB, bool) { + directionIndex := qdi.GetForDir(d) tree, exists := directionIndex[id] return tree, exists } +type LogEntry struct { + graph.Delta + DeletedBy int64 +} + type TripleStore struct { - idCounter int64 - tripleIdCounter int64 - idMap map[string]int64 - revIdMap map[int64]string - triples []quad.Quad - size int64 - index TripleDirectionIndex + idCounter int64 + quadIdCounter int64 + idMap map[string]int64 + revIdMap map[int64]string + log []LogEntry + size int64 + index QuadDirectionIndex // vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree } @@ -90,24 +95,32 @@ func newTripleStore() *TripleStore { var ts TripleStore ts.idMap = make(map[string]int64) ts.revIdMap = make(map[int64]string) - ts.triples = make([]quad.Quad, 1, 200) + ts.log = make([]LogEntry, 1, 200) - // Sentinel null triple so triple indices start at 1 - ts.triples[0] = quad.Quad{} - ts.size = 1 - ts.index = *NewTripleDirectionIndex() + // Sentinel null entry so indices start at 1 + ts.log[0] = LogEntry{} + ts.index = *NewQuadDirectionIndex() ts.idCounter = 1 - ts.tripleIdCounter = 1 + ts.quadIdCounter = 1 return &ts } -func (ts *TripleStore) AddTripleSet(triples []*quad.Quad) { - for _, t := range triples { - ts.AddTriple(t) +func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { + for _, d := range deltas { + var err error + if d.Action == graph.Add { + err = ts.AddDelta(d) + } else { + err = ts.RemoveDelta(d) + } + if err != nil { + return err + } } + return nil } -func (ts *TripleStore) tripleExists(t *quad.Quad) (bool, int64) { +func (ts *TripleStore) quadExists(t *quad.Quad) (bool, int64) { smallest := -1 var smallest_tree *llrb.LLRB for d := quad.Subject; d <= quad.Label; d++ { @@ -130,33 +143,34 @@ func (ts *TripleStore) tripleExists(t *quad.Quad) (bool, int64) { smallest_tree = index } } - it := NewLlrbIterator(smallest_tree, "") + it := NewLlrbIterator(smallest_tree, "", ts) for { val, ok := it.Next() if !ok { break } - if t.Equals(&ts.triples[val.(int64)]) { - return true, val.(int64) + ival := val.(int64) + if t.Equals(&ts.log[ival].Quad) { + return true, ival } } return false, 0 } -func (ts *TripleStore) AddTriple(t *quad.Quad) { - if exists, _ := ts.tripleExists(t); exists { - return +func (ts *TripleStore) AddDelta(d *graph.Delta) error { + if exists, _ := ts.quadExists(&d.Quad); exists { + return graph.ErrQuadExists } - var tripleID int64 - ts.triples = append(ts.triples, *t) - tripleID = ts.tripleIdCounter + var quadID int64 + quadID = ts.quadIdCounter + ts.log = append(ts.log, LogEntry{Delta: *d}) ts.size++ - ts.tripleIdCounter++ + ts.quadIdCounter++ - for d := quad.Subject; d <= quad.Label; d++ { - sid := t.Get(d) - if d == quad.Label && sid == "" { + for dir := quad.Subject; dir <= quad.Label; dir++ { + sid := d.Quad.Get(dir) + if dir == quad.Label && sid == "" { continue } if _, ok := ts.idMap[sid]; !ok { @@ -166,87 +180,63 @@ func (ts *TripleStore) AddTriple(t *quad.Quad) { } } - for d := quad.Subject; d <= quad.Label; d++ { - if d == quad.Label && t.Get(d) == "" { + for dir := quad.Subject; dir <= quad.Label; dir++ { + if dir == quad.Label && d.Quad.Get(dir) == "" { continue } - id := ts.idMap[t.Get(d)] - tree := ts.index.GetOrCreate(d, id) - tree.ReplaceOrInsert(Int64(tripleID)) + id := ts.idMap[d.Quad.Get(dir)] + tree := ts.index.GetOrCreate(dir, id) + tree.ReplaceOrInsert(Int64(quadID)) } // TODO(barakmich): Add VIP indexing + return nil } -func (ts *TripleStore) RemoveTriple(t *quad.Quad) { - var tripleID int64 +func (ts *TripleStore) RemoveDelta(d *graph.Delta) error { + var prevQuadID int64 var exists bool - tripleID = 0 - if exists, tripleID = ts.tripleExists(t); !exists { - return + prevQuadID = 0 + if exists, prevQuadID = ts.quadExists(&d.Quad); !exists { + return graph.ErrQuadNotExist } - ts.triples[tripleID] = quad.Quad{} + var quadID int64 + quadID = ts.quadIdCounter + ts.log = append(ts.log, LogEntry{Delta: *d}) + ts.log[prevQuadID].DeletedBy = quadID ts.size-- - - for d := quad.Subject; d <= quad.Label; d++ { - if d == quad.Label && t.Get(d) == "" { - continue - } - id := ts.idMap[t.Get(d)] - tree := ts.index.GetOrCreate(d, id) - tree.Delete(Int64(tripleID)) - } - - for d := quad.Subject; d <= quad.Label; d++ { - if d == quad.Label && t.Get(d) == "" { - continue - } - id, ok := ts.idMap[t.Get(d)] - if !ok { - continue - } - stillExists := false - for d := quad.Subject; d <= quad.Label; d++ { - if d == quad.Label && t.Get(d) == "" { - continue - } - nodeTree := ts.index.GetOrCreate(d, id) - if nodeTree.Len() != 0 { - stillExists = true - break - } - } - if !stillExists { - delete(ts.idMap, t.Get(d)) - delete(ts.revIdMap, id) - } - } + ts.quadIdCounter++ + return nil } func (ts *TripleStore) Quad(index graph.Value) *quad.Quad { - return &ts.triples[index.(int64)] + return &ts.log[index.(int64)].Quad } func (ts *TripleStore) TripleIterator(d quad.Direction, value graph.Value) graph.Iterator { index, ok := ts.index.Get(d, value.(int64)) data := fmt.Sprintf("dir:%s val:%d", d, value.(int64)) if ok { - return NewLlrbIterator(index, data) + return NewLlrbIterator(index, data, ts) } return &iterator.Null{} } +func (ts *TripleStore) Horizon() int64 { + return ts.log[len(ts.log)-1].ID +} + func (ts *TripleStore) Size() int64 { - return ts.size - 1 // Don't count the sentinel + return ts.size } func (ts *TripleStore) DebugPrint() { - for i, t := range ts.triples { + for i, l := range ts.log { if i == 0 { continue } - glog.V(2).Infof("%d: %s", i, t) + glog.V(2).Infof("%d: %#v", i, l) } } @@ -259,7 +249,7 @@ func (ts *TripleStore) NameOf(id graph.Value) string { } func (ts *TripleStore) TriplesAllIterator() graph.Iterator { - return iterator.NewInt64(0, ts.Size()) + return NewMemstoreQuadsAllIterator(ts) } func (ts *TripleStore) FixedIterator() graph.FixedIterator { @@ -272,6 +262,6 @@ func (ts *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph. } func (ts *TripleStore) NodesAllIterator() graph.Iterator { - return NewMemstoreAllIterator(ts) + return NewMemstoreNodesAllIterator(ts) } func (ts *TripleStore) Close() {} diff --git a/graph/memstore/triplestore_test.go b/graph/memstore/triplestore_test.go index 44c43b4..c711267 100644 --- a/graph/memstore/triplestore_test.go +++ b/graph/memstore/triplestore_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" "github.com/google/cayley/quad" + "github.com/google/cayley/writer" ) // This is a simple test graph. @@ -51,13 +52,14 @@ var simpleGraph = []*quad.Quad{ {"G", "status", "cool", "status_graph"}, } -func makeTestStore(data []*quad.Quad) (*TripleStore, []pair) { +func makeTestStore(data []*quad.Quad) (*TripleStore, graph.QuadWriter, []pair) { seen := make(map[string]struct{}) ts := newTripleStore() var ( val int64 ind []pair ) + writer, _ := writer.NewSingleReplication(ts, nil) for _, t := range data { for _, qp := range []string{t.Subject, t.Predicate, t.Object, t.Label} { if _, ok := seen[qp]; !ok && qp != "" { @@ -66,9 +68,10 @@ func makeTestStore(data []*quad.Quad) (*TripleStore, []pair) { seen[qp] = struct{}{} } } - ts.AddTriple(t) + + writer.AddQuad(t) } - return ts, ind + return ts, writer, ind } type pair struct { @@ -77,7 +80,7 @@ type pair struct { } func TestMemstore(t *testing.T) { - ts, index := makeTestStore(simpleGraph) + ts, _, index := makeTestStore(simpleGraph) if size := ts.Size(); size != int64(len(simpleGraph)) { t.Errorf("Triple store has unexpected size, got:%d expected %d", size, len(simpleGraph)) } @@ -95,7 +98,7 @@ func TestMemstore(t *testing.T) { } func TestIteratorsAndNextResultOrderA(t *testing.T) { - ts, _ := makeTestStore(simpleGraph) + ts, _, _ := makeTestStore(simpleGraph) fixed := ts.FixedIterator() fixed.Add(ts.ValueOf("C")) @@ -145,7 +148,7 @@ func TestIteratorsAndNextResultOrderA(t *testing.T) { } func TestLinksToOptimization(t *testing.T) { - ts, _ := makeTestStore(simpleGraph) + ts, _, _ := makeTestStore(simpleGraph) fixed := ts.FixedIterator() fixed.Add(ts.ValueOf("cool")) @@ -173,9 +176,9 @@ func TestLinksToOptimization(t *testing.T) { } func TestRemoveTriple(t *testing.T) { - ts, _ := makeTestStore(simpleGraph) + ts, w, _ := makeTestStore(simpleGraph) - ts.RemoveTriple(&quad.Quad{"E", "follows", "F", ""}) + w.RemoveQuad(&quad.Quad{"E", "follows", "F", ""}) fixed := ts.FixedIterator() fixed.Add(ts.ValueOf("E")) diff --git a/graph/quadwriter.go b/graph/quadwriter.go index bae0ced..782a2e6 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -38,19 +38,22 @@ const ( type Delta struct { ID int64 - Quad *quad.Quad + Quad quad.Quad Action Procedure Timestamp time.Time } +var ErrQuadExists = errors.New("Quad exists") +var ErrQuadNotExist = errors.New("Quad doesn't exist") + type QuadWriter interface { - // Add a triple to the store. + // Add a quad to the store. AddQuad(*quad.Quad) error - // Add a set of triples to the store, atomically if possible. + // Add a set of quads to the store, atomically if possible. AddQuadSet([]*quad.Quad) error - // Removes a triple matching the given one from the database, + // Removes a quad matching the given one from the database, // if it exists. Does nothing otherwise. RemoveQuad(*quad.Quad) error } diff --git a/writer/single.go b/writer/single.go index d797af1..642b7ee 100644 --- a/writer/single.go +++ b/writer/single.go @@ -36,48 +36,48 @@ func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.QuadW return rep, nil } -func (s *Single) AcquireNextId() int64 { +func (s *Single) AcquireNextID() int64 { s.mut.Lock() defer s.mut.Unlock() id := s.nextID - s.nextID += 1 + s.nextID++ return id } -func (s *Single) AddQuad(t *quad.Quad) error { - trans := make([]*graph.Transaction, 1) - trans[0] = &graph.Transaction{ - ID: s.AcquireNextId(), - Quad: t, +func (s *Single) AddQuad(q *quad.Quad) error { + deltas := make([]*graph.Delta, 1) + deltas[0] = &graph.Delta{ + ID: s.AcquireNextID(), + Quad: *q, Action: graph.Add, Timestamp: time.Now(), } - return s.ts.ApplyTransactions(trans) + return s.ts.ApplyDeltas(deltas) } func (s *Single) AddQuadSet(set []*quad.Quad) error { - trans := make([]*graph.Transaction, len(set)) - for i, t := range set { - trans[i] = &graph.Transaction{ - ID: s.AcquireNextId(), - Quad: t, + deltas := make([]*graph.Delta, len(set)) + for i, q := range set { + deltas[i] = &graph.Delta{ + ID: s.AcquireNextID(), + Quad: *q, Action: graph.Add, Timestamp: time.Now(), } } - s.ts.ApplyTransactions(trans) + s.ts.ApplyDeltas(deltas) return nil } -func (s *Single) RemoveQuad(t *graph.Quad) error { - trans := make([]*graph.Transaction, 1) - trans[0] = &graph.Transaction{ - ID: s.AcquireNextId(), - Triple: t, +func (s *Single) RemoveQuad(q *quad.Quad) error { + deltas := make([]*graph.Delta, 1) + deltas[0] = &graph.Delta{ + ID: s.AcquireNextID(), + Quad: *q, Action: graph.Delete, Timestamp: time.Now(), } - return s.ts.ApplyTransactions(trans) + return s.ts.ApplyDeltas(deltas) } func init() {