diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 35c2120..3213d24 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -15,14 +15,18 @@ }, { "ImportPath": "github.com/boltdb/bolt", - "Comment": "v1.0-65-g3b44955", - "Rev": "3b449559cf34cbcc74460b59041a4399d3226e5a" + "Comment": "v1.0-111-g04a3e85", + "Rev": "04a3e85793043e76d41164037d0d7f9d53eecae3" }, { "ImportPath": "github.com/cznic/mathutil", "Rev": "f9551431b78e71ee24939a1e9d8f49f43898b5cd" }, { + "ImportPath": "github.com/gogo/protobuf/proto", + "Rev": "58bbd41c1a2d1b7154f5d99a8d0d839b3093301a" + }, + { "ImportPath": "github.com/julienschmidt/httprouter", "Rev": "b59a38004596b696aca7aa2adccfa68760864d86" }, diff --git a/cmd/cayleyupgrade/cayleyupgrade.go b/cmd/cayleyupgrade/cayleyupgrade.go new file mode 100644 index 0000000..f8153be --- /dev/null +++ b/cmd/cayleyupgrade/cayleyupgrade.go @@ -0,0 +1,79 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !appengine + +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/barakmich/glog" + "github.com/google/cayley/config" + "github.com/google/cayley/graph" + + // Load all supported backends. + + _ "github.com/google/cayley/graph/bolt" + _ "github.com/google/cayley/graph/leveldb" + _ "github.com/google/cayley/graph/memstore" + _ "github.com/google/cayley/graph/mongo" +) + +var ( + configFile = flag.String("config", "", "Path to an explicit configuration file.") + databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.") + databaseBackend = flag.String("db", "memstore", "Database Backend.") +) + +func configFrom(file string) *config.Config { + // Find the file... + if file != "" { + if _, err := os.Stat(file); os.IsNotExist(err) { + glog.Fatalln("Cannot find specified configuration file", file, ", aborting.") + } + } else if _, err := os.Stat(os.Getenv("CAYLEY_CFG")); err == nil { + file = os.Getenv("CAYLEY_CFG") + } else if _, err := os.Stat("/etc/cayley.cfg"); err == nil { + file = "/etc/cayley.cfg" + } + if file == "" { + glog.Infoln("Couldn't find a config file in either $CAYLEY_CFG or /etc/cayley.cfg. Going by flag defaults only.") + } + cfg, err := config.Load(file) + if err != nil { + glog.Fatalln(err) + } + if cfg.DatabasePath == "" { + cfg.DatabasePath = *databasePath + } + + if cfg.DatabaseType == "" { + cfg.DatabaseType = *databaseBackend + } + return cfg +} + +func main() { + flag.Parse() + cfg := configFrom(*configFile) + + err := graph.UpgradeQuadStore(cfg.DatabaseType, cfg.DatabasePath, nil) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/graph/bolt/bolt_test.go b/graph/bolt/bolt_test.go index b02a8bd..760d39e 100644 --- a/graph/bolt/bolt_test.go +++ b/graph/bolt/bolt_test.go @@ -104,7 +104,7 @@ func TestCreateDatabase(t *testing.T) { qs, err := newQuadStore(tmpFile.Name(), nil) if qs == nil || err != nil { - t.Error("Failed to create leveldb QuadStore.") + t.Error("Failed to create bolt QuadStore.") } if s := qs.Size(); s != 0 { t.Errorf("Unexpected size, got:%d expected:0", s) @@ -167,7 +167,7 @@ func TestLoadDatabase(t *testing.T) { ts2, didConvert := qs.(*QuadStore) if !didConvert { - t.Errorf("Could not convert from generic to LevelDB QuadStore") + t.Errorf("Could not convert from generic to Bolt QuadStore") } //Test horizon @@ -219,7 +219,7 @@ func TestIterator(t *testing.T) { qs, err := newQuadStore(tmpFile.Name(), nil) if qs == nil || err != nil { - t.Error("Failed to create leveldb QuadStore.") + t.Error("Failed to create bolt QuadStore.") } w, _ := writer.NewSingleReplication(qs, nil) @@ -313,7 +313,7 @@ func TestSetIterator(t *testing.T) { qs, err := newQuadStore(tmpFile.Name(), nil) if qs == nil || err != nil { - t.Error("Failed to create leveldb QuadStore.") + t.Error("Failed to create bolt QuadStore.") } defer qs.Close() @@ -428,7 +428,7 @@ func TestOptimize(t *testing.T) { } qs, err := newQuadStore(tmpFile.Name(), nil) if qs == nil || err != nil { - t.Error("Failed to create leveldb QuadStore.") + t.Error("Failed to create bolt QuadStore.") } w, _ := writer.NewSingleReplication(qs, nil) @@ -478,7 +478,7 @@ func TestDeletedFromIterator(t *testing.T) { qs, err := newQuadStore(tmpFile.Name(), nil) if qs == nil || err != nil { - t.Error("Failed to create leveldb QuadStore.") + t.Error("Failed to create bolt QuadStore.") } defer qs.Close() diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go index f732572..389aff1 100644 --- a/graph/bolt/iterator.go +++ b/graph/bolt/iterator.go @@ -16,7 +16,6 @@ package bolt import ( "bytes" - "encoding/json" "errors" "fmt" @@ -25,6 +24,7 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/graph/proto" "github.com/google/cayley/quad" ) @@ -114,8 +114,8 @@ func (it *Iterator) Close() error { } func (it *Iterator) isLiveValue(val []byte) bool { - var entry IndexEntry - json.Unmarshal(val, &entry) + var entry proto.HistoryEntry + entry.Unmarshal(val) return len(entry.History)%2 != 0 } diff --git a/graph/bolt/migrate.go b/graph/bolt/migrate.go new file mode 100644 index 0000000..afee99a --- /dev/null +++ b/graph/bolt/migrate.go @@ -0,0 +1,162 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "encoding/json" + "fmt" + + "github.com/barakmich/glog" + "github.com/boltdb/bolt" + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/proto" +) + +const latestDataVersion = 2 +const nilDataVersion = 1 + +type upgradeFunc func(*bolt.DB) error + +var migrateFunctions = []upgradeFunc{ + nil, + upgrade1To2, +} + +func upgradeBolt(path string, opts graph.Options) error { + db, err := bolt.Open(path, 0600, nil) + defer db.Close() + + if err != nil { + glog.Errorln("Error, couldn't open! ", err) + return err + } + var version int64 + err = db.View(func(tx *bolt.Tx) error { + version, err = getInt64ForMetaKey(tx, "version", nilDataVersion) + return err + }) + if err != nil { + glog.Errorln("error:", err) + return err + } + + if version == latestDataVersion { + fmt.Printf("Already at latest version: %d\n", latestDataVersion) + return nil + } + + if version > latestDataVersion { + err := fmt.Errorf("Unknown data version: %d -- upgrade this tool", version) + glog.Errorln("error:", err) + return err + } + + for i := version; i < latestDataVersion; i++ { + err := migrateFunctions[i](db) + if err != nil { + return err + } + setVersion(db, i+1) + } + + return nil +} + +type v1ValueData struct { + Name string + Size int64 +} + +type v1IndexEntry struct { + History []int64 +} + +func upgrade1To2(db *bolt.DB) error { + fmt.Println("Upgrading v1 to v2...") + tx, err := db.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + fmt.Println("Upgrading bucket", string(logBucket)) + lb := tx.Bucket(logBucket) + c := lb.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + var delta graph.Delta + err := json.Unmarshal(v, &delta) + if err != nil { + return err + } + newd := deltaToProto(delta) + data, err := newd.Marshal() + if err != nil { + return err + } + lb.Put(k, data) + } + if err := tx.Commit(); err != nil { + return err + } + tx, err = db.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + fmt.Println("Upgrading bucket", string(nodeBucket)) + nb := tx.Bucket(nodeBucket) + c = nb.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + var vd proto.NodeData + err := json.Unmarshal(v, &vd) + if err != nil { + return err + } + data, err := vd.Marshal() + if err != nil { + return err + } + nb.Put(k, data) + } + if err := tx.Commit(); err != nil { + return err + } + + for _, bucket := range [4][]byte{spoBucket, ospBucket, posBucket, cpsBucket} { + tx, err = db.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + fmt.Println("Upgrading bucket", string(bucket)) + b := tx.Bucket(bucket) + cur := b.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + var h proto.HistoryEntry + err := json.Unmarshal(v, &h) + if err != nil { + return err + } + data, err := h.Marshal() + if err != nil { + return err + } + b.Put(k, data) + } + if err := tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index ec066c4..ea4b576 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -18,7 +18,6 @@ import ( "bytes" "crypto/sha1" "encoding/binary" - "encoding/json" "errors" "fmt" "hash" @@ -29,11 +28,18 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/graph/proto" "github.com/google/cayley/quad" ) func init() { - graph.RegisterQuadStore("bolt", true, newQuadStore, createNewBolt, nil) + graph.RegisterQuadStore(QuadStoreType, graph.QuadStoreRegistration{ + NewFunc: newQuadStore, + NewForRequestFunc: nil, + UpgradeFunc: upgradeBolt, + InitFunc: createNewBolt, + IsPersistent: true, + }) } var ( @@ -67,6 +73,7 @@ type QuadStore struct { open bool size int64 horizon int64 + version int64 } func createNewBolt(path string, _ graph.Options) error { @@ -82,6 +89,10 @@ func createNewBolt(path string, _ graph.Options) error { if err != nil { return err } + err = setVersion(qs.db, latestDataVersion) + if err != nil { + return err + } qs.Close() return nil } @@ -106,6 +117,9 @@ func newQuadStore(path string, options graph.Options) (graph.QuadStore, error) { } else if err != nil { return nil, err } + if qs.version != latestDataVersion { + return nil, errors.New("bolt: data version is out of date. Run cayleyupgrade for your config to update the data.") + } return &qs, nil } @@ -134,6 +148,24 @@ func (qs *QuadStore) createBuckets() error { }) } +func setVersion(db *bolt.DB, version int64) error { + return db.Update(func(tx *bolt.Tx) error { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, version) + if err != nil { + glog.Errorf("Couldn't convert version!") + return err + } + b := tx.Bucket(metaBucket) + werr := b.Put([]byte("version"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write version!") + return werr + } + return nil + }) +} + func (qs *QuadStore) Size() int64 { return qs.size } @@ -175,10 +207,6 @@ func (qs *QuadStore) createValueKeyFor(s string) []byte { return key } -type IndexEntry struct { - History []int64 -} - var ( // Short hand for direction permutations. spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} @@ -196,6 +224,20 @@ var ( metaBucket = []byte("meta") ) +func deltaToProto(delta graph.Delta) proto.LogDelta { + var newd proto.LogDelta + newd.ID = uint64(delta.ID.Int()) + newd.Action = int32(delta.Action) + newd.Timestamp = delta.Timestamp.UnixNano() + newd.Quad = &proto.Quad{ + Subject: delta.Quad.Subject, + Predicate: delta.Quad.Predicate, + Object: delta.Quad.Object, + Label: delta.Quad.Label, + } + return newd +} + func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { oldSize := qs.size oldHorizon := qs.horizon @@ -208,7 +250,8 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp if d.Action != graph.Add && d.Action != graph.Delete { return errors.New("bolt: invalid action") } - bytes, err := json.Marshal(d) + p := deltaToProto(d) + bytes, err := p.Marshal() if err != nil { return err } @@ -263,13 +306,13 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp } func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error { - var entry IndexEntry + var entry proto.HistoryEntry b := tx.Bucket(spoBucket) b.FillPercent = localFillPercent data := b.Get(qs.createKeyFor(spo, q)) if data != nil { // We got something. - err := json.Unmarshal(data, &entry) + err := entry.Unmarshal(data) if err != nil { return err } @@ -284,9 +327,9 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo return graph.ErrQuadNotExist } - entry.History = append(entry.History, id) + entry.History = append(entry.History, uint64(id)) - jsonbytes, err := json.Marshal(entry) + bytes, err := entry.Marshal() if err != nil { glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) return err @@ -297,7 +340,7 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo } b := tx.Bucket(bucketFor(index)) b.FillPercent = localFillPercent - err = b.Put(qs.createKeyFor(index, q), jsonbytes) + err = b.Put(qs.createKeyFor(index, q), bytes) if err != nil { return err } @@ -305,13 +348,11 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo return nil } -type ValueData struct { - Name string - Size int64 -} - func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error { - value := ValueData{name, amount} + value := proto.NodeData{ + Name: name, + Size_: amount, + } b := tx.Bucket(nodeBucket) b.FillPercent = localFillPercent key := qs.createValueKeyFor(name) @@ -319,21 +360,23 @@ func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) er if data != nil { // Node exists in the database -- unmarshal and update. - err := json.Unmarshal(data, &value) + var oldvalue proto.NodeData + err := oldvalue.Unmarshal(data) if err != nil { glog.Errorf("Error: couldn't reconstruct value: %v", err) return err } - value.Size += amount + oldvalue.Size_ += amount + value = oldvalue } // Are we deleting something? - if value.Size <= 0 { - value.Size = 0 + if value.Size_ <= 0 { + value.Size_ = 0 } // Repackage and rewrite. - bytes, err := json.Marshal(&value) + bytes, err := value.Marshal() if err != nil { glog.Errorf("Couldn't write to buffer for value %s: %s", name, err) return err @@ -381,7 +424,7 @@ func (qs *QuadStore) Close() { } func (qs *QuadStore) Quad(k graph.Value) quad.Quad { - var d graph.Delta + var d proto.LogDelta tok := k.(*Token) err := qs.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(tok.bucket) @@ -389,8 +432,8 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { if data == nil { return nil } - var in IndexEntry - err := json.Unmarshal(data, &in) + var in proto.HistoryEntry + err := in.Unmarshal(data) if err != nil { return err } @@ -398,18 +441,23 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { return nil } b = tx.Bucket(logBucket) - data = b.Get(qs.createDeltaKeyFor(in.History[len(in.History)-1])) + data = b.Get(qs.createDeltaKeyFor(int64(in.History[len(in.History)-1]))) if data == nil { // No harm, no foul. return nil } - return json.Unmarshal(data, &d) + return d.Unmarshal(data) }) if err != nil { glog.Error("Error getting quad: ", err) return quad.Quad{} } - return d.Quad + return quad.Quad{ + d.Quad.Subject, + d.Quad.Predicate, + d.Quad.Object, + d.Quad.Label, + } } func (qs *QuadStore) ValueOf(s string) graph.Value { @@ -419,8 +467,8 @@ func (qs *QuadStore) ValueOf(s string) graph.Value { } } -func (qs *QuadStore) valueData(t *Token) ValueData { - var out ValueData +func (qs *QuadStore) valueData(t *Token) proto.NodeData { + var out proto.NodeData if glog.V(3) { glog.V(3).Infof("%s %v", string(t.bucket), t.key) } @@ -428,13 +476,13 @@ func (qs *QuadStore) valueData(t *Token) ValueData { b := tx.Bucket(t.bucket) data := b.Get(t.key) if data != nil { - return json.Unmarshal(data, &out) + return out.Unmarshal(data) } return nil }) if err != nil { glog.Errorln("Error: couldn't get value") - return ValueData{} + return proto.NodeData{} } return out } @@ -449,12 +497,12 @@ func (qs *QuadStore) NameOf(k graph.Value) string { func (qs *QuadStore) SizeOf(k graph.Value) int64 { if k == nil { - return 0 + return -1 } - return int64(qs.valueData(k.(*Token)).Size) + return int64(qs.valueData(k.(*Token)).Size_) } -func (qs *QuadStore) getInt64ForKey(tx *bolt.Tx, key string, empty int64) (int64, error) { +func getInt64ForMetaKey(tx *bolt.Tx, key string, empty int64) (int64, error) { var out int64 b := tx.Bucket(metaBucket) if b == nil { @@ -475,11 +523,15 @@ func (qs *QuadStore) getInt64ForKey(tx *bolt.Tx, key string, empty int64) (int64 func (qs *QuadStore) getMetadata() error { err := qs.db.View(func(tx *bolt.Tx) error { var err error - qs.size, err = qs.getInt64ForKey(tx, "size", 0) + qs.size, err = getInt64ForMetaKey(tx, "size", 0) if err != nil { return err } - qs.horizon, err = qs.getInt64ForKey(tx, "horizon", 0) + qs.version, err = getInt64ForMetaKey(tx, "version", nilDataVersion) + if err != nil { + return err + } + qs.horizon, err = getInt64ForMetaKey(tx, "horizon", 0) return err }) return err diff --git a/graph/gaedatastore/quadstore.go b/graph/gaedatastore/quadstore.go index 8a7b12d..35599e5 100644 --- a/graph/gaedatastore/quadstore.go +++ b/graph/gaedatastore/quadstore.go @@ -25,9 +25,10 @@ import ( "net/http" "sync" + "github.com/barakmich/glog" + "appengine" "appengine/datastore" - "github.com/barakmich/glog" "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" @@ -86,7 +87,13 @@ type LogEntry struct { } func init() { - graph.RegisterQuadStore("gaedatastore", true, newQuadStore, initQuadStore, newQuadStoreForRequest) + graph.RegisterQuadStore("gaedatastore", graph.QuadStoreRegistration{ + NewFunc: newQuadStore, + NewForRequestFunc: newQuadStoreForRequest, + UpgradeFunc: nil, + InitFunc: initQuadStore, + IsPersistent: true, + }) } func initQuadStore(_ string, _ graph.Options) error { diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go index 73cf386..8437ae6 100644 --- a/graph/leveldb/quadstore.go +++ b/graph/leveldb/quadstore.go @@ -35,7 +35,13 @@ import ( ) func init() { - graph.RegisterQuadStore(QuadStoreType, true, newQuadStore, createNewLevelDB, nil) + graph.RegisterQuadStore(QuadStoreType, graph.QuadStoreRegistration{ + NewFunc: newQuadStore, + NewForRequestFunc: nil, + UpgradeFunc: nil, + InitFunc: createNewLevelDB, + IsPersistent: true, + }) } const ( diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 9fec5fa..7e94e7f 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -30,9 +30,15 @@ import ( const QuadStoreType = "memstore" func init() { - graph.RegisterQuadStore(QuadStoreType, false, func(string, graph.Options) (graph.QuadStore, error) { - return newQuadStore(), nil - }, nil, nil) + graph.RegisterQuadStore(QuadStoreType, graph.QuadStoreRegistration{ + NewFunc: func(string, graph.Options) (graph.QuadStore, error) { + return newQuadStore(), nil + }, + NewForRequestFunc: nil, + UpgradeFunc: nil, + InitFunc: nil, + IsPersistent: false, + }) } func cmp(a, b int64) int { diff --git a/graph/mongo/quadstore.go b/graph/mongo/quadstore.go index a997ecd..06cce2b 100644 --- a/graph/mongo/quadstore.go +++ b/graph/mongo/quadstore.go @@ -34,7 +34,13 @@ const DefaultDBName = "cayley" const QuadStoreType = "mongo" func init() { - graph.RegisterQuadStore(QuadStoreType, true, newQuadStore, createNewMongoGraph, nil) + graph.RegisterQuadStore(QuadStoreType, graph.QuadStoreRegistration{ + NewFunc: newQuadStore, + NewForRequestFunc: nil, + UpgradeFunc: nil, + InitFunc: createNewMongoGraph, + IsPersistent: true, + }) } var ( diff --git a/graph/proto/serializations.pb.go b/graph/proto/serializations.pb.go new file mode 100644 index 0000000..30b24d4 --- /dev/null +++ b/graph/proto/serializations.pb.go @@ -0,0 +1,780 @@ +// Code generated by protoc-gen-gogo. +// source: serializations.proto +// DO NOT EDIT! + +/* + Package proto is a generated protocol buffer package. + + It is generated from these files: + serializations.proto + + It has these top-level messages: + LogDelta + HistoryEntry + NodeData + Quad +*/ +package proto + +import proto1 "github.com/gogo/protobuf/proto" + +import io "io" +import fmt "fmt" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto1.Marshal + +type LogDelta struct { + ID uint64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"` + Quad *Quad `protobuf:"bytes,2,opt" json:"Quad,omitempty"` + Action int32 `protobuf:"varint,3,opt,proto3" json:"Action,omitempty"` + Timestamp int64 `protobuf:"varint,4,opt,proto3" json:"Timestamp,omitempty"` +} + +func (m *LogDelta) Reset() { *m = LogDelta{} } +func (m *LogDelta) String() string { return proto1.CompactTextString(m) } +func (*LogDelta) ProtoMessage() {} + +func (m *LogDelta) GetQuad() *Quad { + if m != nil { + return m.Quad + } + return nil +} + +type HistoryEntry struct { + History []uint64 `protobuf:"varint,1,rep" json:"History,omitempty"` +} + +func (m *HistoryEntry) Reset() { *m = HistoryEntry{} } +func (m *HistoryEntry) String() string { return proto1.CompactTextString(m) } +func (*HistoryEntry) ProtoMessage() {} + +type NodeData struct { + Name string `protobuf:"bytes,1,opt,proto3" json:"Name,omitempty"` + Size_ int64 `protobuf:"varint,2,opt,proto3" json:"Size,omitempty"` +} + +func (m *NodeData) Reset() { *m = NodeData{} } +func (m *NodeData) String() string { return proto1.CompactTextString(m) } +func (*NodeData) ProtoMessage() {} + +type Quad struct { + Subject string `protobuf:"bytes,1,opt,name=subject,proto3" json:"subject,omitempty"` + Predicate string `protobuf:"bytes,2,opt,name=predicate,proto3" json:"predicate,omitempty"` + Object string `protobuf:"bytes,3,opt,name=object,proto3" json:"object,omitempty"` + Label string `protobuf:"bytes,4,opt,name=label,proto3" json:"label,omitempty"` +} + +func (m *Quad) Reset() { *m = Quad{} } +func (m *Quad) String() string { return proto1.CompactTextString(m) } +func (*Quad) ProtoMessage() {} + +func init() { +} +func (m *LogDelta) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) + } + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ID |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Quad", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Quad == nil { + m.Quad = &Quad{} + } + if err := m.Quad.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Action", wireType) + } + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Action |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Timestamp |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipSerializations(data[iNdEx:]) + if err != nil { + return err + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *HistoryEntry) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field History", wireType) + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.History = append(m.History, v) + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipSerializations(data[iNdEx:]) + if err != nil { + return err + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *NodeData) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Size_", wireType) + } + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Size_ |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipSerializations(data[iNdEx:]) + if err != nil { + return err + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *Quad) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Subject", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Subject = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Predicate", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Predicate = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Object", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Object = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Label", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Label = string(data[iNdEx:postIndex]) + iNdEx = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipSerializations(data[iNdEx:]) + if err != nil { + return err + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func skipSerializations(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + return iNdEx, nil + case 3: + for { + var wire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + if wireType == 4 { + break + } + next, err := skipSerializations(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} +func (m *LogDelta) Size() (n int) { + var l int + _ = l + if m.ID != 0 { + n += 1 + sovSerializations(uint64(m.ID)) + } + l = m.Quad.Size() + n += 1 + l + sovSerializations(uint64(l)) + if m.Action != 0 { + n += 1 + sovSerializations(uint64(m.Action)) + } + if m.Timestamp != 0 { + n += 1 + sovSerializations(uint64(m.Timestamp)) + } + return n +} + +func (m *HistoryEntry) Size() (n int) { + var l int + _ = l + if len(m.History) > 0 { + for _, e := range m.History { + n += 1 + sovSerializations(uint64(e)) + } + } + return n +} + +func (m *NodeData) Size() (n int) { + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovSerializations(uint64(l)) + } + if m.Size_ != 0 { + n += 1 + sovSerializations(uint64(m.Size_)) + } + return n +} + +func (m *Quad) Size() (n int) { + var l int + _ = l + l = len(m.Subject) + if l > 0 { + n += 1 + l + sovSerializations(uint64(l)) + } + l = len(m.Predicate) + if l > 0 { + n += 1 + l + sovSerializations(uint64(l)) + } + l = len(m.Object) + if l > 0 { + n += 1 + l + sovSerializations(uint64(l)) + } + l = len(m.Label) + if l > 0 { + n += 1 + l + sovSerializations(uint64(l)) + } + return n +} + +func sovSerializations(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozSerializations(x uint64) (n int) { + return sovSerializations(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *LogDelta) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LogDelta) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.ID != 0 { + data[i] = 0x8 + i++ + i = encodeVarintSerializations(data, i, uint64(m.ID)) + } + data[i] = 0x12 + i++ + i = encodeVarintSerializations(data, i, uint64(m.Quad.Size())) + n1, err := m.Quad.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n1 + if m.Action != 0 { + data[i] = 0x18 + i++ + i = encodeVarintSerializations(data, i, uint64(m.Action)) + } + if m.Timestamp != 0 { + data[i] = 0x20 + i++ + i = encodeVarintSerializations(data, i, uint64(m.Timestamp)) + } + return i, nil +} + +func (m *HistoryEntry) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *HistoryEntry) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if len(m.History) > 0 { + for _, num := range m.History { + data[i] = 0x8 + i++ + i = encodeVarintSerializations(data, i, uint64(num)) + } + } + return i, nil +} + +func (m *NodeData) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *NodeData) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if len(m.Name) > 0 { + data[i] = 0xa + i++ + i = encodeVarintSerializations(data, i, uint64(len(m.Name))) + i += copy(data[i:], m.Name) + } + if m.Size_ != 0 { + data[i] = 0x10 + i++ + i = encodeVarintSerializations(data, i, uint64(m.Size_)) + } + return i, nil +} + +func (m *Quad) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Quad) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if len(m.Subject) > 0 { + data[i] = 0xa + i++ + i = encodeVarintSerializations(data, i, uint64(len(m.Subject))) + i += copy(data[i:], m.Subject) + } + if len(m.Predicate) > 0 { + data[i] = 0x12 + i++ + i = encodeVarintSerializations(data, i, uint64(len(m.Predicate))) + i += copy(data[i:], m.Predicate) + } + if len(m.Object) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintSerializations(data, i, uint64(len(m.Object))) + i += copy(data[i:], m.Object) + } + if len(m.Label) > 0 { + data[i] = 0x22 + i++ + i = encodeVarintSerializations(data, i, uint64(len(m.Label))) + i += copy(data[i:], m.Label) + } + return i, nil +} + +func encodeFixed64Serializations(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Serializations(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintSerializations(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} diff --git a/graph/proto/serializations.proto b/graph/proto/serializations.proto new file mode 100644 index 0000000..ee8295d --- /dev/null +++ b/graph/proto/serializations.proto @@ -0,0 +1,43 @@ +// Copyright 2015 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package proto; + +message LogDelta { + uint64 ID = 1; + Quad Quad = 2; + int32 Action = 3; + int64 Timestamp = 4; +} + +message HistoryEntry { + repeated uint64 History = 1; +} + +message NodeData { + string Name = 1; + int64 Size = 2; +} + +message Quad { + string subject = 1; + string predicate = 2; + string object = 3; + string label = 4; +} + + + diff --git a/graph/quadstore.go b/graph/quadstore.go index 8a89885..04b5c37 100644 --- a/graph/quadstore.go +++ b/graph/quadstore.go @@ -150,27 +150,24 @@ type BulkLoader interface { type NewStoreFunc func(string, Options) (QuadStore, error) type InitStoreFunc func(string, Options) error +type UpgradeStoreFunc func(string, Options) error type NewStoreForRequestFunc func(QuadStore, Options) (QuadStore, error) -type register struct { - newFunc NewStoreFunc - newForRequestFunc NewStoreForRequestFunc - initFunc InitStoreFunc - isPersistent bool +type QuadStoreRegistration struct { + NewFunc NewStoreFunc + NewForRequestFunc NewStoreForRequestFunc + UpgradeFunc UpgradeStoreFunc + InitFunc InitStoreFunc + IsPersistent bool } -var storeRegistry = make(map[string]register) +var storeRegistry = make(map[string]QuadStoreRegistration) -func RegisterQuadStore(name string, persists bool, newFunc NewStoreFunc, initFunc InitStoreFunc, newForRequestFunc NewStoreForRequestFunc) { +func RegisterQuadStore(name string, register QuadStoreRegistration) { if _, found := storeRegistry[name]; found { panic("already registered QuadStore " + name) } - storeRegistry[name] = register{ - newFunc: newFunc, - initFunc: initFunc, - newForRequestFunc: newForRequestFunc, - isPersistent: persists, - } + storeRegistry[name] = register } func NewQuadStore(name, dbpath string, opts Options) (QuadStore, error) { @@ -178,13 +175,13 @@ func NewQuadStore(name, dbpath string, opts Options) (QuadStore, error) { if !registered { return nil, errors.New("quadstore: name '" + name + "' is not registered") } - return r.newFunc(dbpath, opts) + return r.NewFunc(dbpath, opts) } func InitQuadStore(name, dbpath string, opts Options) error { r, registered := storeRegistry[name] if registered { - return r.initFunc(dbpath, opts) + return r.InitFunc(dbpath, opts) } return errors.New("quadstore: name '" + name + "' is not registered") } @@ -192,13 +189,26 @@ func InitQuadStore(name, dbpath string, opts Options) error { func NewQuadStoreForRequest(qs QuadStore, opts Options) (QuadStore, error) { r, registered := storeRegistry[qs.Type()] if registered { - return r.newForRequestFunc(qs, opts) + return r.NewForRequestFunc(qs, opts) } return nil, errors.New("QuadStore does not support Per Request construction, check config") } +func UpgradeQuadStore(name, dbpath string, opts Options) error { + r, registered := storeRegistry[name] + if registered { + if r.UpgradeFunc != nil { + return r.UpgradeFunc(dbpath, opts) + } else { + return nil + } + } + return errors.New("quadstore: name '" + name + "' is not registered") + +} + func IsPersistent(name string) bool { - return storeRegistry[name].isPersistent + return storeRegistry[name].IsPersistent } func QuadStores() []string {