From d4e5eead32f8674b96a5fe77b98ac7655e6975a2 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Tue, 5 Aug 2014 14:17:38 -0400 Subject: [PATCH] convert leveldb to log-structure --- graph/leveldb/iterator.go | 40 ++++--- graph/leveldb/leveldb_test.go | 20 +++- graph/leveldb/triplestore.go | 255 +++++++++++++++++++++++------------------- writer/single.go | 2 +- 4 files changed, 177 insertions(+), 140 deletions(-) diff --git a/graph/leveldb/iterator.go b/graph/leveldb/iterator.go index b434dfd..2a82765 100644 --- a/graph/leveldb/iterator.go +++ b/graph/leveldb/iterator.go @@ -16,9 +16,11 @@ package leveldb import ( "bytes" + "encoding/json" "fmt" "strings" + "github.com/barakmich/glog" ldbit "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" @@ -41,7 +43,7 @@ type Iterator struct { result graph.Value } -func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) *Iterator { +func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) graph.Iterator { vb := value.([]byte) p := make([]byte, 0, 2+qs.hasher.Size()) p = append(p, []byte(prefix)...) @@ -65,10 +67,10 @@ func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleS ok := it.iter.Seek(it.nextPrefix) if !ok { - // FIXME(kortschak) What are the semantics here? Is this iterator usable? - // If not, we should return nil *Iterator and an error. it.open = false it.iter.Release() + glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix) + return &iterator.Null{} } return &it @@ -106,7 +108,7 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) { func (it *Iterator) Clone() graph.Iterator { out := NewIterator(it.originalPrefix, it.dir, it.checkId, it.qs) - out.tags.CopyFrom(it) + out.Tagger().CopyFrom(it) return out } @@ -117,6 +119,12 @@ func (it *Iterator) Close() { } } +func (it *Iterator) isLiveValue(val []byte) bool { + var entry IndexEntry + json.Unmarshal(val, &entry) + return len(entry.History)%2 != 0 +} + func (it *Iterator) Next() (graph.Value, bool) { if it.iter == nil { it.result = nil @@ -132,6 +140,9 @@ func (it *Iterator) Next() (graph.Value, bool) { return nil, false } if bytes.HasPrefix(it.iter.Key(), it.nextPrefix) { + if !it.isLiveValue(it.iter.Value()) { + return it.Next() + } out := make([]byte, len(it.iter.Key())) copy(out, it.iter.Key()) it.result = out @@ -173,7 +184,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int { case quad.Object: return 2*qs.hasher.Size() + 2 case quad.Label: - return -1 + return 3*qs.hasher.Size() + 2 } } if bytes.Equal(prefix, []byte("po")) { @@ -185,7 +196,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int { case quad.Object: return qs.hasher.Size() + 2 case quad.Label: - return -1 + return 3*qs.hasher.Size() + 2 } } if bytes.Equal(prefix, []byte("os")) { @@ -197,7 +208,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int { case quad.Object: return 2 case quad.Label: - return -1 + return 3*qs.hasher.Size() + 2 } } if bytes.Equal(prefix, []byte("cp")) { @@ -221,16 +232,13 @@ func (it *Iterator) Contains(v graph.Value) bool { return false } offset := PositionOf(val[0:2], it.dir, it.qs) - if offset != -1 { - if bytes.HasPrefix(val[offset:], it.checkId[1:]) { - return true - } - } else { - nameForDir := it.qs.Quad(v).Get(it.dir) - hashForDir := it.qs.ValueOf(nameForDir).([]byte) - if bytes.Equal(hashForDir, it.checkId) { - return true + if bytes.HasPrefix(val[offset:], it.checkId[1:]) { + data, err := it.qs.db.Get(val, it.ro) + if err != nil { + glog.Error("Couldn't get data for key ", val, " in iterator ", it.UID(), " failing Contains.") + return false } + return it.isLiveValue(data) } return false } diff --git a/graph/leveldb/leveldb_test.go b/graph/leveldb/leveldb_test.go index 0e6772a..f1c9112 100644 --- a/graph/leveldb/leveldb_test.go +++ b/graph/leveldb/leveldb_test.go @@ -24,6 +24,7 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" "github.com/google/cayley/quad" + "github.com/google/cayley/writer" ) func makeTripleSet() []*quad.Quad { @@ -143,7 +144,8 @@ func TestLoadDatabase(t *testing.T) { t.Error("Failed to create leveldb TripleStore.") } - qs.AddTriple(&quad.Quad{"Something", "points_to", "Something Else", "context"}) + w, _ := writer.NewSingleReplication(qs, nil) + w.AddQuad(&quad.Quad{"Something", "points_to", "Something Else", "context"}) for _, pq := range []string{"Something", "points_to", "Something Else", "context"} { if got := qs.NameOf(qs.ValueOf(pq)); got != pq { t.Errorf("Failed to roundtrip %q, got:%q expect:%q", pq, got, pq) @@ -162,13 +164,14 @@ func TestLoadDatabase(t *testing.T) { if qs == nil || err != nil { t.Error("Failed to create leveldb TripleStore.") } + w, _ = writer.NewSingleReplication(qs, nil) ts2, didConvert := qs.(*TripleStore) if !didConvert { t.Errorf("Could not convert from generic to LevelDB TripleStore") } - qs.AddTripleSet(makeTripleSet()) + w.AddQuadSet(makeTripleSet()) if s := qs.Size(); s != 11 { t.Errorf("Unexpected triplestore size, got:%d expect:11", s) } @@ -176,7 +179,7 @@ func TestLoadDatabase(t *testing.T) { t.Errorf("Unexpected triplestore size, got:%d expect:5", s) } - qs.RemoveTriple(&quad.Quad{"A", "follows", "B", ""}) + w.RemoveQuad(&quad.Quad{"A", "follows", "B", ""}) if s := qs.Size(); s != 10 { t.Errorf("Unexpected triplestore size after RemoveTriple, got:%d expect:10", s) } @@ -204,7 +207,9 @@ func TestIterator(t *testing.T) { if qs == nil || err != nil { t.Error("Failed to create leveldb TripleStore.") } - qs.AddTripleSet(makeTripleSet()) + + w, _ := writer.NewSingleReplication(qs, nil) + w.AddQuadSet(makeTripleSet()) var it graph.Iterator it = qs.NodesAllIterator() @@ -299,7 +304,8 @@ func TestSetIterator(t *testing.T) { } defer qs.Close() - qs.AddTripleSet(makeTripleSet()) + w, _ := writer.NewSingleReplication(qs, nil) + w.AddQuadSet(makeTripleSet()) expect := []*quad.Quad{ {"C", "follows", "B", ""}, @@ -411,7 +417,9 @@ func TestOptimize(t *testing.T) { if qs == nil || err != nil { t.Error("Failed to create leveldb TripleStore.") } - qs.AddTripleSet(makeTripleSet()) + + w, _ := writer.NewSingleReplication(qs, nil) + w.AddQuadSet(makeTripleSet()) // With an linksto-fixed pair fixed := qs.FixedIterator() diff --git a/graph/leveldb/triplestore.go b/graph/leveldb/triplestore.go index 8092d96..3a4d16e 100644 --- a/graph/leveldb/triplestore.go +++ b/graph/leveldb/triplestore.go @@ -19,6 +19,7 @@ import ( "crypto/sha1" "encoding/binary" "encoding/json" + "errors" "fmt" "hash" @@ -48,6 +49,7 @@ type TripleStore struct { path string open bool size int64 + horizon int64 hasher hash.Hash writeopts *opt.WriteOptions readopts *opt.ReadOptions @@ -72,6 +74,7 @@ func createNewLevelDB(path string, _ graph.Options) error { func newTripleStore(path string, options graph.Options) (graph.TripleStore, error) { var qs TripleStore + var err error qs.path = path cache_size := DefaultCacheSize if val, ok := options.IntKey("cache_size_mb"); ok { @@ -94,11 +97,15 @@ func newTripleStore(path string, options graph.Options) (graph.TripleStore, erro qs.readopts = &opt.ReadOptions{} db, err := leveldb.OpenFile(qs.path, qs.dbOpts) if err != nil { - panic("Error, couldn't open! " + err.Error()) + glog.Errorln("Error, couldn't open! ", err) + return nil, err } qs.db = db glog.Infoln(qs.GetStats()) - qs.getSize() + err = qs.getMetadata() + if err != nil { + return nil, err + } return &qs, nil } @@ -116,24 +123,25 @@ func (qs *TripleStore) Size() int64 { return qs.size } -func (qs *TripleStore) createKeyFor(d [3]quad.Direction, triple *quad.Quad) []byte { - key := make([]byte, 0, 2+(qs.hasher.Size()*3)) +func (qs *TripleStore) Horizon() int64 { + return qs.horizon +} + +func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte { + key := make([]byte, 0, 19) + key = append(key, 'd') + key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...) + return key +} + +func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple *quad.Quad) []byte { + key := make([]byte, 0, 2+(qs.hasher.Size()*4)) // TODO(kortschak) Remove dependence on String() method. key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...) key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) - return key -} - -func (qs *TripleStore) createProvKeyFor(d [3]quad.Direction, triple *quad.Quad) []byte { - key := make([]byte, 0, 2+(qs.hasher.Size()*4)) - // TODO(kortschak) Remove dependence on String() method. - key = append(key, []byte{quad.Label.Prefix(), d[0].Prefix()}...) - key = append(key, qs.convertStringToByteHash(triple.Get(quad.Label))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) return key } @@ -144,76 +152,98 @@ func (qs *TripleStore) createValueKeyFor(s string) []byte { return key } -func (qs *TripleStore) AddTriple(t *quad.Quad) { - batch := &leveldb.Batch{} - qs.buildWrite(batch, t) - err := qs.db.Write(batch, qs.writeopts) - if err != nil { - glog.Errorf("Couldn't write to DB for triple %s.", t) - return - } - qs.size++ +type IndexEntry struct { + *quad.Quad + History []int64 } // Short hand for direction permutations. var ( - spo = [3]quad.Direction{quad.Subject, quad.Predicate, quad.Object} - osp = [3]quad.Direction{quad.Object, quad.Subject, quad.Predicate} - pos = [3]quad.Direction{quad.Predicate, quad.Object, quad.Subject} - pso = [3]quad.Direction{quad.Predicate, quad.Subject, quad.Object} + spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} + osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} + pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} + cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} ) -func (qs *TripleStore) RemoveTriple(t *quad.Quad) { - _, err := qs.db.Get(qs.createKeyFor(spo, t), qs.readopts) - if err != nil && err != leveldb.ErrNotFound { - glog.Error("Couldn't access DB to confirm deletion") - return - } - if err == leveldb.ErrNotFound { - // No such triple in the database, forget about it. - return - } +func (qs *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { batch := &leveldb.Batch{} - batch.Delete(qs.createKeyFor(spo, t)) - batch.Delete(qs.createKeyFor(osp, t)) - batch.Delete(qs.createKeyFor(pos, t)) - qs.UpdateValueKeyBy(t.Get(quad.Subject), -1, batch) - qs.UpdateValueKeyBy(t.Get(quad.Predicate), -1, batch) - qs.UpdateValueKeyBy(t.Get(quad.Object), -1, batch) - if t.Get(quad.Label) != "" { - batch.Delete(qs.createProvKeyFor(pso, t)) - qs.UpdateValueKeyBy(t.Get(quad.Label), -1, batch) + resizeMap := make(map[string]int64) + size_change := int64(0) + for _, d := range deltas { + bytes, err := json.Marshal(d) + if err != nil { + return err + } + batch.Put(qs.createDeltaKeyFor(d), bytes) + err = qs.buildQuadWrite(batch, &d.Quad, d.ID, d.Action == graph.Add) + if err != nil { + return err + } + delta := int64(1) + if d.Action == graph.Delete { + delta = int64(-1) + } + resizeMap[d.Quad.Subject] += delta + resizeMap[d.Quad.Predicate] += delta + resizeMap[d.Quad.Object] += delta + if d.Quad.Label != "" { + resizeMap[d.Quad.Label] += delta + } + size_change += delta + qs.horizon = d.ID } - err = qs.db.Write(batch, nil) + for k, v := range resizeMap { + if v != 0 { + err := qs.UpdateValueKeyBy(k, v, batch) + if err != nil { + return err + } + } + } + err := qs.db.Write(batch, qs.writeopts) if err != nil { - glog.Errorf("Couldn't delete triple %s.", t) - return + glog.Error("Couldn't write to DB for tripleset.") + return err } - qs.size-- + qs.size += size_change + return nil } -func (qs *TripleStore) buildTripleWrite(batch *leveldb.Batch, t *quad.Quad) { - bytes, err := json.Marshal(*t) - if err != nil { - glog.Errorf("Couldn't write to buffer for triple %s: %s", t, err) - return +func (qs *TripleStore) buildQuadWrite(batch *leveldb.Batch, q *quad.Quad, id int64, isAdd bool) error { + var entry IndexEntry + data, err := qs.db.Get(qs.createKeyFor(spo, q), qs.readopts) + if err != nil && err != leveldb.ErrNotFound { + glog.Error("Couldn't access DB to prepare index: ", err) + return err } - batch.Put(qs.createKeyFor(spo, t), bytes) - batch.Put(qs.createKeyFor(osp, t), bytes) - batch.Put(qs.createKeyFor(pos, t), bytes) - if t.Get(quad.Label) != "" { - batch.Put(qs.createProvKeyFor(pso, t), bytes) + if err == nil { + // We got something. + err = json.Unmarshal(data, &entry) + if err != nil { + return err + } + } else { + entry.Quad = q } -} + entry.History = append(entry.History, id) -func (qs *TripleStore) buildWrite(batch *leveldb.Batch, t *quad.Quad) { - qs.buildTripleWrite(batch, t) - qs.UpdateValueKeyBy(t.Get(quad.Subject), 1, nil) - qs.UpdateValueKeyBy(t.Get(quad.Predicate), 1, nil) - qs.UpdateValueKeyBy(t.Get(quad.Object), 1, nil) - if t.Get(quad.Label) != "" { - qs.UpdateValueKeyBy(t.Get(quad.Label), 1, nil) + if isAdd && len(entry.History)%2 == 0 { + glog.Error("Entry History is out of sync for", entry) + return errors.New("Odd index history") } + + bytes, err := json.Marshal(entry) + if err != nil { + glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) + return err + } + batch.Put(qs.createKeyFor(spo, q), bytes) + batch.Put(qs.createKeyFor(osp, q), bytes) + batch.Put(qs.createKeyFor(pos, q), bytes) + if q.Get(quad.Label) != "" { + batch.Put(qs.createKeyFor(cps, q), bytes) + } + return nil } type ValueData struct { @@ -221,15 +251,15 @@ type ValueData struct { Size int64 } -func (qs *TripleStore) UpdateValueKeyBy(name string, amount int, batch *leveldb.Batch) { - value := &ValueData{name, int64(amount)} +func (qs *TripleStore) UpdateValueKeyBy(name string, amount int64, batch *leveldb.Batch) error { + value := &ValueData{name, amount} key := qs.createValueKeyFor(name) b, err := qs.db.Get(key, qs.readopts) // Error getting the node from the database. if err != nil && err != leveldb.ErrNotFound { glog.Errorf("Error reading Value %s from the DB.", name) - return + return err } // Node exists in the database -- unmarshal and update. @@ -237,58 +267,28 @@ func (qs *TripleStore) UpdateValueKeyBy(name string, amount int, batch *leveldb. err = json.Unmarshal(b, value) if err != nil { glog.Errorf("Error: couldn't reconstruct value: %v", err) - return + return err } - value.Size += int64(amount) + value.Size += amount } // Are we deleting something? - if amount < 0 { - if value.Size <= 0 { - if batch == nil { - qs.db.Delete(key, qs.writeopts) - } else { - batch.Delete(key) - } - return - } + if value.Size <= 0 { + value.Size = 0 } // Repackage and rewrite. bytes, err := json.Marshal(&value) if err != nil { glog.Errorf("Couldn't write to buffer for value %s: %s", name, err) - return + return err } if batch == nil { qs.db.Put(key, bytes, qs.writeopts) } else { batch.Put(key, bytes) } -} - -func (qs *TripleStore) AddTripleSet(t_s []*quad.Quad) { - batch := &leveldb.Batch{} - newTs := len(t_s) - resizeMap := make(map[string]int) - for _, t := range t_s { - qs.buildTripleWrite(batch, t) - resizeMap[t.Subject]++ - resizeMap[t.Predicate]++ - resizeMap[t.Object]++ - if t.Label != "" { - resizeMap[t.Label]++ - } - } - for k, v := range resizeMap { - qs.UpdateValueKeyBy(k, v, batch) - } - err := qs.db.Write(batch, qs.writeopts) - if err != nil { - glog.Error("Couldn't write to DB for tripleset.") - return - } - qs.size += int64(newTs) + return nil } func (qs *TripleStore) Close() { @@ -302,6 +302,16 @@ func (qs *TripleStore) Close() { } else { glog.Errorf("Couldn't convert size before closing!") } + buf.Reset() + err = binary.Write(buf, binary.LittleEndian, qs.horizon) + if err == nil { + werr := qs.db.Put([]byte("__horizon"), buf.Bytes(), qs.writeopts) + if werr != nil { + glog.Error("Couldn't write horizon before closing!") + } + } else { + glog.Errorf("Couldn't convert horizon before closing!") + } qs.db.Close() qs.open = false } @@ -372,23 +382,34 @@ func (qs *TripleStore) SizeOf(k graph.Value) int64 { return int64(qs.valueData(k.([]byte)).Size) } -func (qs *TripleStore) getSize() { - var size int64 - b, err := qs.db.Get([]byte("__size"), qs.readopts) +func (qs *TripleStore) getInt64ForKey(key string, empty int64) (int64, error) { + var out int64 + b, err := qs.db.Get([]byte(key), qs.readopts) if err != nil && err != leveldb.ErrNotFound { - panic("Couldn't read size " + err.Error()) + glog.Errorln("Couldn't read " + key + ": " + err.Error()) + return 0, err } if err == leveldb.ErrNotFound { // Must be a new database. Cool - qs.size = 0 - return + return empty, nil } buf := bytes.NewBuffer(b) - err = binary.Read(buf, binary.LittleEndian, &size) + err = binary.Read(buf, binary.LittleEndian, &out) if err != nil { - glog.Errorln("Error: couldn't parse size") + glog.Errorln("Error: couldn't parse", key) + return 0, err } - qs.size = size + return out, nil +} + +func (qs *TripleStore) getMetadata() error { + var err error + qs.size, err = qs.getInt64ForKey("__size", 0) + if err != nil { + return err + } + qs.horizon, err = qs.getInt64ForKey("__horizon", 0) + return err } func (qs *TripleStore) SizeOfPrefix(pre []byte) (int64, error) { diff --git a/writer/single.go b/writer/single.go index 642b7ee..a455e02 100644 --- a/writer/single.go +++ b/writer/single.go @@ -30,7 +30,7 @@ type Single struct { func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.QuadWriter, error) { rep := &Single{nextID: ts.Horizon(), ts: ts} - if rep.nextID == -1 { + if rep.nextID <= 0 { rep.nextID = 1 } return rep, nil