Make Memstore work with the QuadWriter

This commit is contained in:
Barak Michener 2014-08-04 01:56:49 -04:00
parent 81b3bf9881
commit dcb495d145
6 changed files with 167 additions and 141 deletions

View file

@ -24,8 +24,11 @@ type AllIterator struct {
ts *TripleStore ts *TripleStore
} }
func NewMemstoreAllIterator(ts *TripleStore) *AllIterator { type NodesAllIterator AllIterator
var out AllIterator type QuadsAllIterator AllIterator
func NewMemstoreNodesAllIterator(ts *TripleStore) *NodesAllIterator {
var out NodesAllIterator
out.Int64 = *iterator.NewInt64(1, ts.idCounter-1) out.Int64 = *iterator.NewInt64(1, ts.idCounter-1)
out.ts = ts out.ts = ts
return &out return &out
@ -36,15 +39,33 @@ func (it *AllIterator) SubIterators() []graph.Iterator {
return nil return nil
} }
func (it *AllIterator) Next() (graph.Value, bool) { func (nit *NodesAllIterator) Next() (graph.Value, bool) {
next, out := it.Int64.Next() next, out := nit.Int64.Next()
if !out { if !out {
return next, out return next, out
} }
i64 := next.(int64) i64 := next.(int64)
_, ok := it.ts.revIdMap[i64] _, ok := nit.ts.revIdMap[i64]
if !ok { if !ok {
return it.Next() return nit.Next()
}
return next, out
}
func NewMemstoreQuadsAllIterator(ts *TripleStore) *QuadsAllIterator {
var out QuadsAllIterator
out.Int64 = *iterator.NewInt64(1, ts.quadIdCounter-1)
out.ts = ts
return &out
}
func (qit *QuadsAllIterator) Next() (graph.Value, bool) {
next, out := qit.Int64.Next()
if out {
i64 := next.(int64)
if qit.ts.log[i64].DeletedBy != 0 || qit.ts.log[i64].Action == graph.Delete {
return qit.Next()
}
} }
return next, out return next, out
} }

View file

@ -27,6 +27,7 @@ import (
type Iterator struct { type Iterator struct {
uid uint64 uid uint64
ts *TripleStore
tags graph.Tagger tags graph.Tagger
tree *llrb.LLRB tree *llrb.LLRB
data string data string
@ -54,9 +55,10 @@ func IterateOne(tree *llrb.LLRB, last Int64) Int64 {
return next return next
} }
func NewLlrbIterator(tree *llrb.LLRB, data string) *Iterator { func NewLlrbIterator(tree *llrb.LLRB, data string, ts *TripleStore) *Iterator {
return &Iterator{ return &Iterator{
uid: iterator.NextUID(), uid: iterator.NextUID(),
ts: ts,
tree: tree, tree: tree,
iterLast: Int64(-1), iterLast: Int64(-1),
data: data, data: data,
@ -86,19 +88,26 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) {
} }
func (it *Iterator) Clone() graph.Iterator { func (it *Iterator) Clone() graph.Iterator {
m := NewLlrbIterator(it.tree, it.data) m := NewLlrbIterator(it.tree, it.data, it.ts)
m.tags.CopyFrom(it) m.tags.CopyFrom(it)
return m return m
} }
func (it *Iterator) Close() {} func (it *Iterator) Close() {}
func (it *Iterator) checkValid(index int64) bool {
return it.ts.log[index].DeletedBy == 0
}
func (it *Iterator) Next() (graph.Value, bool) { func (it *Iterator) Next() (graph.Value, bool) {
graph.NextLogIn(it) graph.NextLogIn(it)
if it.tree.Max() == nil || it.result == int64(it.tree.Max().(Int64)) { if it.tree.Max() == nil || it.iterLast == it.tree.Max().(Int64) {
return graph.NextLogOut(it, nil, false) return graph.NextLogOut(it, nil, false)
} }
it.iterLast = IterateOne(it.tree, it.iterLast) it.iterLast = IterateOne(it.tree, it.iterLast)
if !it.checkValid(int64(it.iterLast)) {
return it.Next()
}
it.result = int64(it.iterLast) it.result = int64(it.iterLast)
return graph.NextLogOut(it, it.result, true) return graph.NextLogOut(it, it.result, true)
} }
@ -126,7 +135,7 @@ func (it *Iterator) Size() (int64, bool) {
func (it *Iterator) Contains(v graph.Value) bool { func (it *Iterator) Contains(v graph.Value) bool {
graph.ContainsLogIn(it, v) graph.ContainsLogIn(it, v)
if it.tree.Has(Int64(v.(int64))) { if it.tree.Has(Int64(v.(int64))) && it.checkValid(v.(int64)) {
it.result = v it.result = v
return graph.ContainsLogOut(it, v, true) return graph.ContainsLogOut(it, v, true)
} }

View file

@ -31,58 +31,63 @@ func init() {
}, nil) }, nil)
} }
type TripleDirectionIndex struct { type QuadDirectionIndex struct {
subject map[int64]*llrb.LLRB subject map[int64]*llrb.LLRB
predicate map[int64]*llrb.LLRB predicate map[int64]*llrb.LLRB
object map[int64]*llrb.LLRB object map[int64]*llrb.LLRB
label map[int64]*llrb.LLRB label map[int64]*llrb.LLRB
} }
func NewTripleDirectionIndex() *TripleDirectionIndex { func NewQuadDirectionIndex() *QuadDirectionIndex {
var tdi TripleDirectionIndex var qdi QuadDirectionIndex
tdi.subject = make(map[int64]*llrb.LLRB) qdi.subject = make(map[int64]*llrb.LLRB)
tdi.predicate = make(map[int64]*llrb.LLRB) qdi.predicate = make(map[int64]*llrb.LLRB)
tdi.object = make(map[int64]*llrb.LLRB) qdi.object = make(map[int64]*llrb.LLRB)
tdi.label = make(map[int64]*llrb.LLRB) qdi.label = make(map[int64]*llrb.LLRB)
return &tdi return &qdi
} }
func (tdi *TripleDirectionIndex) GetForDir(d quad.Direction) map[int64]*llrb.LLRB { func (qdi *QuadDirectionIndex) GetForDir(d quad.Direction) map[int64]*llrb.LLRB {
switch d { switch d {
case quad.Subject: case quad.Subject:
return tdi.subject return qdi.subject
case quad.Object: case quad.Object:
return tdi.object return qdi.object
case quad.Predicate: case quad.Predicate:
return tdi.predicate return qdi.predicate
case quad.Label: case quad.Label:
return tdi.label return qdi.label
} }
panic("illegal direction") panic("illegal direction")
} }
func (tdi *TripleDirectionIndex) GetOrCreate(d quad.Direction, id int64) *llrb.LLRB { func (qdi *QuadDirectionIndex) GetOrCreate(d quad.Direction, id int64) *llrb.LLRB {
directionIndex := tdi.GetForDir(d) directionIndex := qdi.GetForDir(d)
if _, ok := directionIndex[id]; !ok { if _, ok := directionIndex[id]; !ok {
directionIndex[id] = llrb.New() directionIndex[id] = llrb.New()
} }
return directionIndex[id] return directionIndex[id]
} }
func (tdi *TripleDirectionIndex) Get(d quad.Direction, id int64) (*llrb.LLRB, bool) { func (qdi *QuadDirectionIndex) Get(d quad.Direction, id int64) (*llrb.LLRB, bool) {
directionIndex := tdi.GetForDir(d) directionIndex := qdi.GetForDir(d)
tree, exists := directionIndex[id] tree, exists := directionIndex[id]
return tree, exists return tree, exists
} }
type LogEntry struct {
graph.Delta
DeletedBy int64
}
type TripleStore struct { type TripleStore struct {
idCounter int64 idCounter int64
tripleIdCounter int64 quadIdCounter int64
idMap map[string]int64 idMap map[string]int64
revIdMap map[int64]string revIdMap map[int64]string
triples []quad.Quad log []LogEntry
size int64 size int64
index TripleDirectionIndex index QuadDirectionIndex
// vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree // vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree
} }
@ -90,24 +95,32 @@ func newTripleStore() *TripleStore {
var ts TripleStore var ts TripleStore
ts.idMap = make(map[string]int64) ts.idMap = make(map[string]int64)
ts.revIdMap = make(map[int64]string) ts.revIdMap = make(map[int64]string)
ts.triples = make([]quad.Quad, 1, 200) ts.log = make([]LogEntry, 1, 200)
// Sentinel null triple so triple indices start at 1 // Sentinel null entry so indices start at 1
ts.triples[0] = quad.Quad{} ts.log[0] = LogEntry{}
ts.size = 1 ts.index = *NewQuadDirectionIndex()
ts.index = *NewTripleDirectionIndex()
ts.idCounter = 1 ts.idCounter = 1
ts.tripleIdCounter = 1 ts.quadIdCounter = 1
return &ts return &ts
} }
func (ts *TripleStore) AddTripleSet(triples []*quad.Quad) { func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error {
for _, t := range triples { for _, d := range deltas {
ts.AddTriple(t) var err error
if d.Action == graph.Add {
err = ts.AddDelta(d)
} else {
err = ts.RemoveDelta(d)
}
if err != nil {
return err
}
} }
return nil
} }
func (ts *TripleStore) tripleExists(t *quad.Quad) (bool, int64) { func (ts *TripleStore) quadExists(t *quad.Quad) (bool, int64) {
smallest := -1 smallest := -1
var smallest_tree *llrb.LLRB var smallest_tree *llrb.LLRB
for d := quad.Subject; d <= quad.Label; d++ { for d := quad.Subject; d <= quad.Label; d++ {
@ -130,33 +143,34 @@ func (ts *TripleStore) tripleExists(t *quad.Quad) (bool, int64) {
smallest_tree = index smallest_tree = index
} }
} }
it := NewLlrbIterator(smallest_tree, "") it := NewLlrbIterator(smallest_tree, "", ts)
for { for {
val, ok := it.Next() val, ok := it.Next()
if !ok { if !ok {
break break
} }
if t.Equals(&ts.triples[val.(int64)]) { ival := val.(int64)
return true, val.(int64) if t.Equals(&ts.log[ival].Quad) {
return true, ival
} }
} }
return false, 0 return false, 0
} }
func (ts *TripleStore) AddTriple(t *quad.Quad) { func (ts *TripleStore) AddDelta(d *graph.Delta) error {
if exists, _ := ts.tripleExists(t); exists { if exists, _ := ts.quadExists(&d.Quad); exists {
return return graph.ErrQuadExists
} }
var tripleID int64 var quadID int64
ts.triples = append(ts.triples, *t) quadID = ts.quadIdCounter
tripleID = ts.tripleIdCounter ts.log = append(ts.log, LogEntry{Delta: *d})
ts.size++ ts.size++
ts.tripleIdCounter++ ts.quadIdCounter++
for d := quad.Subject; d <= quad.Label; d++ { for dir := quad.Subject; dir <= quad.Label; dir++ {
sid := t.Get(d) sid := d.Quad.Get(dir)
if d == quad.Label && sid == "" { if dir == quad.Label && sid == "" {
continue continue
} }
if _, ok := ts.idMap[sid]; !ok { if _, ok := ts.idMap[sid]; !ok {
@ -166,87 +180,63 @@ func (ts *TripleStore) AddTriple(t *quad.Quad) {
} }
} }
for d := quad.Subject; d <= quad.Label; d++ { for dir := quad.Subject; dir <= quad.Label; dir++ {
if d == quad.Label && t.Get(d) == "" { if dir == quad.Label && d.Quad.Get(dir) == "" {
continue continue
} }
id := ts.idMap[t.Get(d)] id := ts.idMap[d.Quad.Get(dir)]
tree := ts.index.GetOrCreate(d, id) tree := ts.index.GetOrCreate(dir, id)
tree.ReplaceOrInsert(Int64(tripleID)) tree.ReplaceOrInsert(Int64(quadID))
} }
// TODO(barakmich): Add VIP indexing // TODO(barakmich): Add VIP indexing
return nil
} }
func (ts *TripleStore) RemoveTriple(t *quad.Quad) { func (ts *TripleStore) RemoveDelta(d *graph.Delta) error {
var tripleID int64 var prevQuadID int64
var exists bool var exists bool
tripleID = 0 prevQuadID = 0
if exists, tripleID = ts.tripleExists(t); !exists { if exists, prevQuadID = ts.quadExists(&d.Quad); !exists {
return return graph.ErrQuadNotExist
} }
ts.triples[tripleID] = quad.Quad{} var quadID int64
quadID = ts.quadIdCounter
ts.log = append(ts.log, LogEntry{Delta: *d})
ts.log[prevQuadID].DeletedBy = quadID
ts.size-- ts.size--
ts.quadIdCounter++
for d := quad.Subject; d <= quad.Label; d++ { return nil
if d == quad.Label && t.Get(d) == "" {
continue
}
id := ts.idMap[t.Get(d)]
tree := ts.index.GetOrCreate(d, id)
tree.Delete(Int64(tripleID))
}
for d := quad.Subject; d <= quad.Label; d++ {
if d == quad.Label && t.Get(d) == "" {
continue
}
id, ok := ts.idMap[t.Get(d)]
if !ok {
continue
}
stillExists := false
for d := quad.Subject; d <= quad.Label; d++ {
if d == quad.Label && t.Get(d) == "" {
continue
}
nodeTree := ts.index.GetOrCreate(d, id)
if nodeTree.Len() != 0 {
stillExists = true
break
}
}
if !stillExists {
delete(ts.idMap, t.Get(d))
delete(ts.revIdMap, id)
}
}
} }
func (ts *TripleStore) Quad(index graph.Value) *quad.Quad { func (ts *TripleStore) Quad(index graph.Value) *quad.Quad {
return &ts.triples[index.(int64)] return &ts.log[index.(int64)].Quad
} }
func (ts *TripleStore) TripleIterator(d quad.Direction, value graph.Value) graph.Iterator { func (ts *TripleStore) TripleIterator(d quad.Direction, value graph.Value) graph.Iterator {
index, ok := ts.index.Get(d, value.(int64)) index, ok := ts.index.Get(d, value.(int64))
data := fmt.Sprintf("dir:%s val:%d", d, value.(int64)) data := fmt.Sprintf("dir:%s val:%d", d, value.(int64))
if ok { if ok {
return NewLlrbIterator(index, data) return NewLlrbIterator(index, data, ts)
} }
return &iterator.Null{} return &iterator.Null{}
} }
func (ts *TripleStore) Horizon() int64 {
return ts.log[len(ts.log)-1].ID
}
func (ts *TripleStore) Size() int64 { func (ts *TripleStore) Size() int64 {
return ts.size - 1 // Don't count the sentinel return ts.size
} }
func (ts *TripleStore) DebugPrint() { func (ts *TripleStore) DebugPrint() {
for i, t := range ts.triples { for i, l := range ts.log {
if i == 0 { if i == 0 {
continue continue
} }
glog.V(2).Infof("%d: %s", i, t) glog.V(2).Infof("%d: %#v", i, l)
} }
} }
@ -259,7 +249,7 @@ func (ts *TripleStore) NameOf(id graph.Value) string {
} }
func (ts *TripleStore) TriplesAllIterator() graph.Iterator { func (ts *TripleStore) TriplesAllIterator() graph.Iterator {
return iterator.NewInt64(0, ts.Size()) return NewMemstoreQuadsAllIterator(ts)
} }
func (ts *TripleStore) FixedIterator() graph.FixedIterator { func (ts *TripleStore) FixedIterator() graph.FixedIterator {
@ -272,6 +262,6 @@ func (ts *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph.
} }
func (ts *TripleStore) NodesAllIterator() graph.Iterator { func (ts *TripleStore) NodesAllIterator() graph.Iterator {
return NewMemstoreAllIterator(ts) return NewMemstoreNodesAllIterator(ts)
} }
func (ts *TripleStore) Close() {} func (ts *TripleStore) Close() {}

View file

@ -22,6 +22,7 @@ import (
"github.com/google/cayley/graph" "github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator" "github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
"github.com/google/cayley/writer"
) )
// This is a simple test graph. // This is a simple test graph.
@ -51,13 +52,14 @@ var simpleGraph = []*quad.Quad{
{"G", "status", "cool", "status_graph"}, {"G", "status", "cool", "status_graph"},
} }
func makeTestStore(data []*quad.Quad) (*TripleStore, []pair) { func makeTestStore(data []*quad.Quad) (*TripleStore, graph.QuadWriter, []pair) {
seen := make(map[string]struct{}) seen := make(map[string]struct{})
ts := newTripleStore() ts := newTripleStore()
var ( var (
val int64 val int64
ind []pair ind []pair
) )
writer, _ := writer.NewSingleReplication(ts, nil)
for _, t := range data { for _, t := range data {
for _, qp := range []string{t.Subject, t.Predicate, t.Object, t.Label} { for _, qp := range []string{t.Subject, t.Predicate, t.Object, t.Label} {
if _, ok := seen[qp]; !ok && qp != "" { if _, ok := seen[qp]; !ok && qp != "" {
@ -66,9 +68,10 @@ func makeTestStore(data []*quad.Quad) (*TripleStore, []pair) {
seen[qp] = struct{}{} seen[qp] = struct{}{}
} }
} }
ts.AddTriple(t)
writer.AddQuad(t)
} }
return ts, ind return ts, writer, ind
} }
type pair struct { type pair struct {
@ -77,7 +80,7 @@ type pair struct {
} }
func TestMemstore(t *testing.T) { func TestMemstore(t *testing.T) {
ts, index := makeTestStore(simpleGraph) ts, _, index := makeTestStore(simpleGraph)
if size := ts.Size(); size != int64(len(simpleGraph)) { if size := ts.Size(); size != int64(len(simpleGraph)) {
t.Errorf("Triple store has unexpected size, got:%d expected %d", size, len(simpleGraph)) t.Errorf("Triple store has unexpected size, got:%d expected %d", size, len(simpleGraph))
} }
@ -95,7 +98,7 @@ func TestMemstore(t *testing.T) {
} }
func TestIteratorsAndNextResultOrderA(t *testing.T) { func TestIteratorsAndNextResultOrderA(t *testing.T) {
ts, _ := makeTestStore(simpleGraph) ts, _, _ := makeTestStore(simpleGraph)
fixed := ts.FixedIterator() fixed := ts.FixedIterator()
fixed.Add(ts.ValueOf("C")) fixed.Add(ts.ValueOf("C"))
@ -145,7 +148,7 @@ func TestIteratorsAndNextResultOrderA(t *testing.T) {
} }
func TestLinksToOptimization(t *testing.T) { func TestLinksToOptimization(t *testing.T) {
ts, _ := makeTestStore(simpleGraph) ts, _, _ := makeTestStore(simpleGraph)
fixed := ts.FixedIterator() fixed := ts.FixedIterator()
fixed.Add(ts.ValueOf("cool")) fixed.Add(ts.ValueOf("cool"))
@ -173,9 +176,9 @@ func TestLinksToOptimization(t *testing.T) {
} }
func TestRemoveTriple(t *testing.T) { func TestRemoveTriple(t *testing.T) {
ts, _ := makeTestStore(simpleGraph) ts, w, _ := makeTestStore(simpleGraph)
ts.RemoveTriple(&quad.Quad{"E", "follows", "F", ""}) w.RemoveQuad(&quad.Quad{"E", "follows", "F", ""})
fixed := ts.FixedIterator() fixed := ts.FixedIterator()
fixed.Add(ts.ValueOf("E")) fixed.Add(ts.ValueOf("E"))

View file

@ -38,19 +38,22 @@ const (
type Delta struct { type Delta struct {
ID int64 ID int64
Quad *quad.Quad Quad quad.Quad
Action Procedure Action Procedure
Timestamp time.Time Timestamp time.Time
} }
var ErrQuadExists = errors.New("Quad exists")
var ErrQuadNotExist = errors.New("Quad doesn't exist")
type QuadWriter interface { type QuadWriter interface {
// Add a triple to the store. // Add a quad to the store.
AddQuad(*quad.Quad) error AddQuad(*quad.Quad) error
// Add a set of triples to the store, atomically if possible. // Add a set of quads to the store, atomically if possible.
AddQuadSet([]*quad.Quad) error AddQuadSet([]*quad.Quad) error
// Removes a triple matching the given one from the database, // Removes a quad matching the given one from the database,
// if it exists. Does nothing otherwise. // if it exists. Does nothing otherwise.
RemoveQuad(*quad.Quad) error RemoveQuad(*quad.Quad) error
} }

View file

@ -36,48 +36,48 @@ func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.QuadW
return rep, nil return rep, nil
} }
func (s *Single) AcquireNextId() int64 { func (s *Single) AcquireNextID() int64 {
s.mut.Lock() s.mut.Lock()
defer s.mut.Unlock() defer s.mut.Unlock()
id := s.nextID id := s.nextID
s.nextID += 1 s.nextID++
return id return id
} }
func (s *Single) AddQuad(t *quad.Quad) error { func (s *Single) AddQuad(q *quad.Quad) error {
trans := make([]*graph.Transaction, 1) deltas := make([]*graph.Delta, 1)
trans[0] = &graph.Transaction{ deltas[0] = &graph.Delta{
ID: s.AcquireNextId(), ID: s.AcquireNextID(),
Quad: t, Quad: *q,
Action: graph.Add, Action: graph.Add,
Timestamp: time.Now(), Timestamp: time.Now(),
} }
return s.ts.ApplyTransactions(trans) return s.ts.ApplyDeltas(deltas)
} }
func (s *Single) AddQuadSet(set []*quad.Quad) error { func (s *Single) AddQuadSet(set []*quad.Quad) error {
trans := make([]*graph.Transaction, len(set)) deltas := make([]*graph.Delta, len(set))
for i, t := range set { for i, q := range set {
trans[i] = &graph.Transaction{ deltas[i] = &graph.Delta{
ID: s.AcquireNextId(), ID: s.AcquireNextID(),
Quad: t, Quad: *q,
Action: graph.Add, Action: graph.Add,
Timestamp: time.Now(), Timestamp: time.Now(),
} }
} }
s.ts.ApplyTransactions(trans) s.ts.ApplyDeltas(deltas)
return nil return nil
} }
func (s *Single) RemoveQuad(t *graph.Quad) error { func (s *Single) RemoveQuad(q *quad.Quad) error {
trans := make([]*graph.Transaction, 1) deltas := make([]*graph.Delta, 1)
trans[0] = &graph.Transaction{ deltas[0] = &graph.Delta{
ID: s.AcquireNextId(), ID: s.AcquireNextID(),
Triple: t, Quad: *q,
Action: graph.Delete, Action: graph.Delete,
Timestamp: time.Now(), Timestamp: time.Now(),
} }
return s.ts.ApplyTransactions(trans) return s.ts.ApplyDeltas(deltas)
} }
func init() { func init() {