first migration

This commit is contained in:
Barak Michener 2015-06-23 16:39:13 -04:00
parent 983a3a52d0
commit 80ddf74cd2
4 changed files with 68 additions and 7 deletions

View file

@ -15,14 +15,16 @@
package bolt
import (
"encoding/json"
"fmt"
"github.com/barakmich/glog"
"github.com/boltdb/bolt"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/proto"
)
const latestDataVersion = 1
const latestDataVersion = 2
const nilDataVersion = 1
type upgradeFunc func(*bolt.DB) error
@ -65,6 +67,7 @@ func upgradeBolt(path string, opts graph.Options) error {
if err != nil {
return err
}
setVersion(db, i+1)
}
return nil
@ -77,6 +80,64 @@ func upgrade1To2(db *bolt.DB) error {
return err
}
defer tx.Rollback()
fmt.Println("Upgrading bucket", string(logBucket))
lb := tx.Bucket(logBucket)
c := lb.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
var delta graph.Delta
err := json.Unmarshal(v, &delta)
if err != nil {
return err
}
var newd proto.LogDelta
newd.ID = uint64(delta.ID.Int())
newd.Action = int32(delta.Action)
newd.Timestamp = delta.Timestamp.UnixNano()
newd.Quad = &proto.Quad{
Subject: delta.Quad.Subject,
Predicate: delta.Quad.Predicate,
Object: delta.Quad.Object,
Label: delta.Quad.Label,
}
data, err := newd.Marshal()
if err != nil {
return err
}
lb.Put(k, data)
}
fmt.Println("Upgrading bucket", string(nodeBucket))
nb := tx.Bucket(nodeBucket)
c = nb.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
var vd proto.NodeData
err := json.Unmarshal(v, &vd)
if err != nil {
return err
}
data, err := vd.Marshal()
if err != nil {
return err
}
nb.Put(k, data)
}
for _, bucket := range [4][]byte{spoBucket, ospBucket, posBucket, cpsBucket} {
fmt.Println("Upgrading bucket", string(bucket))
b := tx.Bucket(bucket)
cur := b.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
var h proto.HistoryEntry
err := json.Unmarshal(v, &h)
if err != nil {
return err
}
data, err := h.Marshal()
if err != nil {
return err
}
b.Put(k, data)
}
}
if err := tx.Commit(); err != nil {
return err
}

View file

@ -89,7 +89,7 @@ func createNewBolt(path string, _ graph.Options) error {
if err != nil {
return err
}
err = qs.setVersion(latestDataVersion)
err = setVersion(qs.db, latestDataVersion)
if err != nil {
return err
}
@ -148,8 +148,8 @@ func (qs *QuadStore) createBuckets() error {
})
}
func (qs *QuadStore) setVersion(version int) error {
return qs.db.Update(func(tx *bolt.Tx) error {
func setVersion(db *bolt.DB, version int64) error {
return db.Update(func(tx *bolt.Tx) error {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, version)
if err != nil {

View file

@ -28,7 +28,7 @@ type LogDelta struct {
ID uint64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"`
Quad *Quad `protobuf:"bytes,2,opt" json:"Quad,omitempty"`
Action int32 `protobuf:"varint,3,opt,proto3" json:"Action,omitempty"`
Timestamp uint64 `protobuf:"varint,4,opt,proto3" json:"Timestamp,omitempty"`
Timestamp int64 `protobuf:"varint,4,opt,proto3" json:"Timestamp,omitempty"`
}
func (m *LogDelta) Reset() { *m = LogDelta{} }
@ -158,7 +158,7 @@ func (m *LogDelta) Unmarshal(data []byte) error {
}
b := data[iNdEx]
iNdEx++
m.Timestamp |= (uint64(b) & 0x7F) << shift
m.Timestamp |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}

View file

@ -20,7 +20,7 @@ message LogDelta {
uint64 ID = 1;
Quad Quad = 2;
int32 Action = 3;
uint64 Timestamp = 4;
int64 Timestamp = 4;
}
message HistoryEntry {