From 390d012fc41b04f961731980f2482dd4c4ece383 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Tue, 23 Jun 2015 16:57:42 -0400 Subject: [PATCH] use proto everywhere on v2 stores --- graph/bolt/iterator.go | 6 ++-- graph/bolt/migrate.go | 20 ++++++------- graph/bolt/quadstore.go | 80 +++++++++++++++++++++++++++++-------------------- 3 files changed, 60 insertions(+), 46 deletions(-) diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go index f732572..389aff1 100644 --- a/graph/bolt/iterator.go +++ b/graph/bolt/iterator.go @@ -16,7 +16,6 @@ package bolt import ( "bytes" - "encoding/json" "errors" "fmt" @@ -25,6 +24,7 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/graph/proto" "github.com/google/cayley/quad" ) @@ -114,8 +114,8 @@ func (it *Iterator) Close() error { } func (it *Iterator) isLiveValue(val []byte) bool { - var entry IndexEntry - json.Unmarshal(val, &entry) + var entry proto.HistoryEntry + entry.Unmarshal(val) return len(entry.History)%2 != 0 } diff --git a/graph/bolt/migrate.go b/graph/bolt/migrate.go index 15ce577..d831ee1 100644 --- a/graph/bolt/migrate.go +++ b/graph/bolt/migrate.go @@ -73,6 +73,15 @@ func upgradeBolt(path string, opts graph.Options) error { return nil } +type v1ValueData struct { + Name string + Size int64 +} + +type v1IndexEntry struct { + History []int64 +} + func upgrade1To2(db *bolt.DB) error { fmt.Println("Upgrading v1 to v2...") tx, err := db.Begin(true) @@ -89,16 +98,7 @@ func upgrade1To2(db *bolt.DB) error { if err != nil { return err } - var newd proto.LogDelta - newd.ID = uint64(delta.ID.Int()) - newd.Action = int32(delta.Action) - newd.Timestamp = delta.Timestamp.UnixNano() - newd.Quad = &proto.Quad{ - Subject: delta.Quad.Subject, - Predicate: delta.Quad.Predicate, - Object: delta.Quad.Object, - Label: delta.Quad.Label, - } + newd := deltaToProto(delta) data, err := newd.Marshal() if err != nil { return err diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 8aa0a28..efc8ecb 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -18,7 +18,6 @@ import ( "bytes" "crypto/sha1" "encoding/binary" - "encoding/json" "errors" "fmt" "hash" @@ -29,6 +28,7 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/graph/proto" "github.com/google/cayley/quad" ) @@ -207,10 +207,6 @@ func (qs *QuadStore) createValueKeyFor(s string) []byte { return key } -type IndexEntry struct { - History []int64 -} - var ( // Short hand for direction permutations. spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} @@ -228,6 +224,20 @@ var ( metaBucket = []byte("meta") ) +func deltaToProto(delta graph.Delta) proto.LogDelta { + var newd proto.LogDelta + newd.ID = uint64(delta.ID.Int()) + newd.Action = int32(delta.Action) + newd.Timestamp = delta.Timestamp.UnixNano() + newd.Quad = &proto.Quad{ + Subject: delta.Quad.Subject, + Predicate: delta.Quad.Predicate, + Object: delta.Quad.Object, + Label: delta.Quad.Label, + } + return newd +} + func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { oldSize := qs.size oldHorizon := qs.horizon @@ -240,7 +250,8 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp if d.Action != graph.Add && d.Action != graph.Delete { return errors.New("bolt: invalid action") } - bytes, err := json.Marshal(d) + p := deltaToProto(d) + bytes, err := p.Marshal() if err != nil { return err } @@ -295,13 +306,13 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp } func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error { - var entry IndexEntry + var entry proto.HistoryEntry b := tx.Bucket(spoBucket) b.FillPercent = localFillPercent data := b.Get(qs.createKeyFor(spo, q)) if data != nil { // We got something. - err := json.Unmarshal(data, &entry) + err := entry.Unmarshal(data) if err != nil { return err } @@ -316,9 +327,9 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo return graph.ErrQuadNotExist } - entry.History = append(entry.History, id) + entry.History = append(entry.History, uint64(id)) - jsonbytes, err := json.Marshal(entry) + bytes, err := entry.Marshal() if err != nil { glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) return err @@ -329,7 +340,7 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo } b := tx.Bucket(bucketFor(index)) b.FillPercent = localFillPercent - err = b.Put(qs.createKeyFor(index, q), jsonbytes) + err = b.Put(qs.createKeyFor(index, q), bytes) if err != nil { return err } @@ -337,13 +348,11 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo return nil } -type ValueData struct { - Name string - Size int64 -} - func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error { - value := ValueData{name, amount} + value := proto.NodeData{ + Name: name, + Size_: amount, + } b := tx.Bucket(nodeBucket) b.FillPercent = localFillPercent key := qs.createValueKeyFor(name) @@ -351,21 +360,21 @@ func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) er if data != nil { // Node exists in the database -- unmarshal and update. - err := json.Unmarshal(data, &value) + err := value.Unmarshal(data) if err != nil { glog.Errorf("Error: couldn't reconstruct value: %v", err) return err } - value.Size += amount + value.Size_ += amount } // Are we deleting something? - if value.Size <= 0 { - value.Size = 0 + if value.Size_ <= 0 { + value.Size_ = 0 } // Repackage and rewrite. - bytes, err := json.Marshal(&value) + bytes, err := value.Marshal() if err != nil { glog.Errorf("Couldn't write to buffer for value %s: %s", name, err) return err @@ -413,7 +422,7 @@ func (qs *QuadStore) Close() { } func (qs *QuadStore) Quad(k graph.Value) quad.Quad { - var d graph.Delta + var d proto.LogDelta tok := k.(*Token) err := qs.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(tok.bucket) @@ -421,8 +430,8 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { if data == nil { return nil } - var in IndexEntry - err := json.Unmarshal(data, &in) + var in proto.HistoryEntry + err := in.Unmarshal(data) if err != nil { return err } @@ -430,18 +439,23 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { return nil } b = tx.Bucket(logBucket) - data = b.Get(qs.createDeltaKeyFor(in.History[len(in.History)-1])) + data = b.Get(qs.createDeltaKeyFor(int64(in.History[len(in.History)-1]))) if data == nil { // No harm, no foul. return nil } - return json.Unmarshal(data, &d) + return d.Unmarshal(data) }) if err != nil { glog.Error("Error getting quad: ", err) return quad.Quad{} } - return d.Quad + return quad.Quad{ + d.Quad.Subject, + d.Quad.Predicate, + d.Quad.Object, + d.Quad.Label, + } } func (qs *QuadStore) ValueOf(s string) graph.Value { @@ -451,8 +465,8 @@ func (qs *QuadStore) ValueOf(s string) graph.Value { } } -func (qs *QuadStore) valueData(t *Token) ValueData { - var out ValueData +func (qs *QuadStore) valueData(t *Token) proto.NodeData { + var out proto.NodeData if glog.V(3) { glog.V(3).Infof("%s %v", string(t.bucket), t.key) } @@ -460,13 +474,13 @@ func (qs *QuadStore) valueData(t *Token) ValueData { b := tx.Bucket(t.bucket) data := b.Get(t.key) if data != nil { - return json.Unmarshal(data, &out) + return out.Unmarshal(data) } return nil }) if err != nil { glog.Errorln("Error: couldn't get value") - return ValueData{} + return proto.NodeData{} } return out } @@ -483,7 +497,7 @@ func (qs *QuadStore) SizeOf(k graph.Value) int64 { if k == nil { return 0 } - return int64(qs.valueData(k.(*Token)).Size) + return int64(qs.valueData(k.(*Token)).Size_) } func getInt64ForMetaKey(tx *bolt.Tx, key string, empty int64) (int64, error) {