use proto everywhere on v2 stores

This commit is contained in:
Barak Michener 2015-06-23 16:57:42 -04:00
parent 80ddf74cd2
commit 390d012fc4
3 changed files with 60 additions and 46 deletions

View file

@ -16,7 +16,6 @@ package bolt
import ( import (
"bytes" "bytes"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -25,6 +24,7 @@ import (
"github.com/google/cayley/graph" "github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator" "github.com/google/cayley/graph/iterator"
"github.com/google/cayley/graph/proto"
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
) )
@ -114,8 +114,8 @@ func (it *Iterator) Close() error {
} }
func (it *Iterator) isLiveValue(val []byte) bool { func (it *Iterator) isLiveValue(val []byte) bool {
var entry IndexEntry var entry proto.HistoryEntry
json.Unmarshal(val, &entry) entry.Unmarshal(val)
return len(entry.History)%2 != 0 return len(entry.History)%2 != 0
} }

View file

@ -73,6 +73,15 @@ func upgradeBolt(path string, opts graph.Options) error {
return nil return nil
} }
type v1ValueData struct {
Name string
Size int64
}
type v1IndexEntry struct {
History []int64
}
func upgrade1To2(db *bolt.DB) error { func upgrade1To2(db *bolt.DB) error {
fmt.Println("Upgrading v1 to v2...") fmt.Println("Upgrading v1 to v2...")
tx, err := db.Begin(true) tx, err := db.Begin(true)
@ -89,16 +98,7 @@ func upgrade1To2(db *bolt.DB) error {
if err != nil { if err != nil {
return err return err
} }
var newd proto.LogDelta newd := deltaToProto(delta)
newd.ID = uint64(delta.ID.Int())
newd.Action = int32(delta.Action)
newd.Timestamp = delta.Timestamp.UnixNano()
newd.Quad = &proto.Quad{
Subject: delta.Quad.Subject,
Predicate: delta.Quad.Predicate,
Object: delta.Quad.Object,
Label: delta.Quad.Label,
}
data, err := newd.Marshal() data, err := newd.Marshal()
if err != nil { if err != nil {
return err return err

View file

@ -18,7 +18,6 @@ import (
"bytes" "bytes"
"crypto/sha1" "crypto/sha1"
"encoding/binary" "encoding/binary"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"hash" "hash"
@ -29,6 +28,7 @@ import (
"github.com/google/cayley/graph" "github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator" "github.com/google/cayley/graph/iterator"
"github.com/google/cayley/graph/proto"
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
) )
@ -207,10 +207,6 @@ func (qs *QuadStore) createValueKeyFor(s string) []byte {
return key return key
} }
type IndexEntry struct {
History []int64
}
var ( var (
// Short hand for direction permutations. // Short hand for direction permutations.
spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label}
@ -228,6 +224,20 @@ var (
metaBucket = []byte("meta") metaBucket = []byte("meta")
) )
func deltaToProto(delta graph.Delta) proto.LogDelta {
var newd proto.LogDelta
newd.ID = uint64(delta.ID.Int())
newd.Action = int32(delta.Action)
newd.Timestamp = delta.Timestamp.UnixNano()
newd.Quad = &proto.Quad{
Subject: delta.Quad.Subject,
Predicate: delta.Quad.Predicate,
Object: delta.Quad.Object,
Label: delta.Quad.Label,
}
return newd
}
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
oldSize := qs.size oldSize := qs.size
oldHorizon := qs.horizon oldHorizon := qs.horizon
@ -240,7 +250,8 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp
if d.Action != graph.Add && d.Action != graph.Delete { if d.Action != graph.Add && d.Action != graph.Delete {
return errors.New("bolt: invalid action") return errors.New("bolt: invalid action")
} }
bytes, err := json.Marshal(d) p := deltaToProto(d)
bytes, err := p.Marshal()
if err != nil { if err != nil {
return err return err
} }
@ -295,13 +306,13 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp
} }
func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error { func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error {
var entry IndexEntry var entry proto.HistoryEntry
b := tx.Bucket(spoBucket) b := tx.Bucket(spoBucket)
b.FillPercent = localFillPercent b.FillPercent = localFillPercent
data := b.Get(qs.createKeyFor(spo, q)) data := b.Get(qs.createKeyFor(spo, q))
if data != nil { if data != nil {
// We got something. // We got something.
err := json.Unmarshal(data, &entry) err := entry.Unmarshal(data)
if err != nil { if err != nil {
return err return err
} }
@ -316,9 +327,9 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo
return graph.ErrQuadNotExist return graph.ErrQuadNotExist
} }
entry.History = append(entry.History, id) entry.History = append(entry.History, uint64(id))
jsonbytes, err := json.Marshal(entry) bytes, err := entry.Marshal()
if err != nil { if err != nil {
glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err)
return err return err
@ -329,7 +340,7 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo
} }
b := tx.Bucket(bucketFor(index)) b := tx.Bucket(bucketFor(index))
b.FillPercent = localFillPercent b.FillPercent = localFillPercent
err = b.Put(qs.createKeyFor(index, q), jsonbytes) err = b.Put(qs.createKeyFor(index, q), bytes)
if err != nil { if err != nil {
return err return err
} }
@ -337,13 +348,11 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo
return nil return nil
} }
type ValueData struct {
Name string
Size int64
}
func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error { func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error {
value := ValueData{name, amount} value := proto.NodeData{
Name: name,
Size_: amount,
}
b := tx.Bucket(nodeBucket) b := tx.Bucket(nodeBucket)
b.FillPercent = localFillPercent b.FillPercent = localFillPercent
key := qs.createValueKeyFor(name) key := qs.createValueKeyFor(name)
@ -351,21 +360,21 @@ func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) er
if data != nil { if data != nil {
// Node exists in the database -- unmarshal and update. // Node exists in the database -- unmarshal and update.
err := json.Unmarshal(data, &value) err := value.Unmarshal(data)
if err != nil { if err != nil {
glog.Errorf("Error: couldn't reconstruct value: %v", err) glog.Errorf("Error: couldn't reconstruct value: %v", err)
return err return err
} }
value.Size += amount value.Size_ += amount
} }
// Are we deleting something? // Are we deleting something?
if value.Size <= 0 { if value.Size_ <= 0 {
value.Size = 0 value.Size_ = 0
} }
// Repackage and rewrite. // Repackage and rewrite.
bytes, err := json.Marshal(&value) bytes, err := value.Marshal()
if err != nil { if err != nil {
glog.Errorf("Couldn't write to buffer for value %s: %s", name, err) glog.Errorf("Couldn't write to buffer for value %s: %s", name, err)
return err return err
@ -413,7 +422,7 @@ func (qs *QuadStore) Close() {
} }
func (qs *QuadStore) Quad(k graph.Value) quad.Quad { func (qs *QuadStore) Quad(k graph.Value) quad.Quad {
var d graph.Delta var d proto.LogDelta
tok := k.(*Token) tok := k.(*Token)
err := qs.db.View(func(tx *bolt.Tx) error { err := qs.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(tok.bucket) b := tx.Bucket(tok.bucket)
@ -421,8 +430,8 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad {
if data == nil { if data == nil {
return nil return nil
} }
var in IndexEntry var in proto.HistoryEntry
err := json.Unmarshal(data, &in) err := in.Unmarshal(data)
if err != nil { if err != nil {
return err return err
} }
@ -430,18 +439,23 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad {
return nil return nil
} }
b = tx.Bucket(logBucket) b = tx.Bucket(logBucket)
data = b.Get(qs.createDeltaKeyFor(in.History[len(in.History)-1])) data = b.Get(qs.createDeltaKeyFor(int64(in.History[len(in.History)-1])))
if data == nil { if data == nil {
// No harm, no foul. // No harm, no foul.
return nil return nil
} }
return json.Unmarshal(data, &d) return d.Unmarshal(data)
}) })
if err != nil { if err != nil {
glog.Error("Error getting quad: ", err) glog.Error("Error getting quad: ", err)
return quad.Quad{} return quad.Quad{}
} }
return d.Quad return quad.Quad{
d.Quad.Subject,
d.Quad.Predicate,
d.Quad.Object,
d.Quad.Label,
}
} }
func (qs *QuadStore) ValueOf(s string) graph.Value { func (qs *QuadStore) ValueOf(s string) graph.Value {
@ -451,8 +465,8 @@ func (qs *QuadStore) ValueOf(s string) graph.Value {
} }
} }
func (qs *QuadStore) valueData(t *Token) ValueData { func (qs *QuadStore) valueData(t *Token) proto.NodeData {
var out ValueData var out proto.NodeData
if glog.V(3) { if glog.V(3) {
glog.V(3).Infof("%s %v", string(t.bucket), t.key) glog.V(3).Infof("%s %v", string(t.bucket), t.key)
} }
@ -460,13 +474,13 @@ func (qs *QuadStore) valueData(t *Token) ValueData {
b := tx.Bucket(t.bucket) b := tx.Bucket(t.bucket)
data := b.Get(t.key) data := b.Get(t.key)
if data != nil { if data != nil {
return json.Unmarshal(data, &out) return out.Unmarshal(data)
} }
return nil return nil
}) })
if err != nil { if err != nil {
glog.Errorln("Error: couldn't get value") glog.Errorln("Error: couldn't get value")
return ValueData{} return proto.NodeData{}
} }
return out return out
} }
@ -483,7 +497,7 @@ func (qs *QuadStore) SizeOf(k graph.Value) int64 {
if k == nil { if k == nil {
return 0 return 0
} }
return int64(qs.valueData(k.(*Token)).Size) return int64(qs.valueData(k.(*Token)).Size_)
} }
func getInt64ForMetaKey(tx *bolt.Tx, key string, empty int64) (int64, error) { func getInt64ForMetaKey(tx *bolt.Tx, key string, empty int64) (int64, error) {