diff --git a/graph/bolt/migrate.go b/graph/bolt/migrate.go index 2929c55..15ce577 100644 --- a/graph/bolt/migrate.go +++ b/graph/bolt/migrate.go @@ -15,14 +15,16 @@ package bolt import ( + "encoding/json" "fmt" "github.com/barakmich/glog" "github.com/boltdb/bolt" "github.com/google/cayley/graph" + "github.com/google/cayley/graph/proto" ) -const latestDataVersion = 1 +const latestDataVersion = 2 const nilDataVersion = 1 type upgradeFunc func(*bolt.DB) error @@ -65,6 +67,7 @@ func upgradeBolt(path string, opts graph.Options) error { if err != nil { return err } + setVersion(db, i+1) } return nil @@ -77,6 +80,64 @@ func upgrade1To2(db *bolt.DB) error { return err } defer tx.Rollback() + fmt.Println("Upgrading bucket", string(logBucket)) + lb := tx.Bucket(logBucket) + c := lb.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + var delta graph.Delta + err := json.Unmarshal(v, &delta) + if err != nil { + return err + } + var newd proto.LogDelta + newd.ID = uint64(delta.ID.Int()) + newd.Action = int32(delta.Action) + newd.Timestamp = delta.Timestamp.UnixNano() + newd.Quad = &proto.Quad{ + Subject: delta.Quad.Subject, + Predicate: delta.Quad.Predicate, + Object: delta.Quad.Object, + Label: delta.Quad.Label, + } + data, err := newd.Marshal() + if err != nil { + return err + } + lb.Put(k, data) + } + fmt.Println("Upgrading bucket", string(nodeBucket)) + nb := tx.Bucket(nodeBucket) + c = nb.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + var vd proto.NodeData + err := json.Unmarshal(v, &vd) + if err != nil { + return err + } + data, err := vd.Marshal() + if err != nil { + return err + } + nb.Put(k, data) + } + + for _, bucket := range [4][]byte{spoBucket, ospBucket, posBucket, cpsBucket} { + fmt.Println("Upgrading bucket", string(bucket)) + b := tx.Bucket(bucket) + cur := b.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + var h proto.HistoryEntry + err := json.Unmarshal(v, &h) + if err != nil { + return err + } + data, err := h.Marshal() + if err != nil { + return err + } + b.Put(k, data) + } + } if err := tx.Commit(); err != nil { return err } diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 55115b7..8aa0a28 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -89,7 +89,7 @@ func createNewBolt(path string, _ graph.Options) error { if err != nil { return err } - err = qs.setVersion(latestDataVersion) + err = setVersion(qs.db, latestDataVersion) if err != nil { return err } @@ -148,8 +148,8 @@ func (qs *QuadStore) createBuckets() error { }) } -func (qs *QuadStore) setVersion(version int) error { - return qs.db.Update(func(tx *bolt.Tx) error { +func setVersion(db *bolt.DB, version int64) error { + return db.Update(func(tx *bolt.Tx) error { buf := new(bytes.Buffer) err := binary.Write(buf, binary.LittleEndian, version) if err != nil { diff --git a/graph/proto/serializations.pb.go b/graph/proto/serializations.pb.go index 2b7e839..30b24d4 100644 --- a/graph/proto/serializations.pb.go +++ b/graph/proto/serializations.pb.go @@ -28,7 +28,7 @@ type LogDelta struct { ID uint64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"` Quad *Quad `protobuf:"bytes,2,opt" json:"Quad,omitempty"` Action int32 `protobuf:"varint,3,opt,proto3" json:"Action,omitempty"` - Timestamp uint64 `protobuf:"varint,4,opt,proto3" json:"Timestamp,omitempty"` + Timestamp int64 `protobuf:"varint,4,opt,proto3" json:"Timestamp,omitempty"` } func (m *LogDelta) Reset() { *m = LogDelta{} } @@ -158,7 +158,7 @@ func (m *LogDelta) Unmarshal(data []byte) error { } b := data[iNdEx] iNdEx++ - m.Timestamp |= (uint64(b) & 0x7F) << shift + m.Timestamp |= (int64(b) & 0x7F) << shift if b < 0x80 { break } diff --git a/graph/proto/serializations.proto b/graph/proto/serializations.proto index 30155b6..ee8295d 100644 --- a/graph/proto/serializations.proto +++ b/graph/proto/serializations.proto @@ -20,7 +20,7 @@ message LogDelta { uint64 ID = 1; Quad Quad = 2; int32 Action = 3; - uint64 Timestamp = 4; + int64 Timestamp = 4; } message HistoryEntry {