From abda6cbbb07ecd9fb0646a9284386991ee9c4d08 Mon Sep 17 00:00:00 2001 From: panamafrancis Date: Mon, 1 Dec 2014 23:00:06 +0100 Subject: [PATCH] Refactoring of Delta.IDs & horizon functionality with a PrimaryKey type, needed for implementing UUID based primary keys for backends such as the appengine datastore \n Tests: at top level and if available per backend, if not then just visual --- graph/bolt/quadstore.go | 11 ++++----- graph/iterator/mock_ts_test.go | 3 ++- graph/leveldb/leveldb_test.go | 10 +++++++++ graph/leveldb/quadstore.go | 11 ++++----- graph/memstore/quadstore.go | 23 ++++++++++++++----- graph/mongo/quadstore.go | 11 ++++----- graph/primarykey.go | 28 +++++++++++++++++++++++ graph/quadstore.go | 2 +- graph/quadwriter.go | 2 +- keys/sequentialkey.go | 51 ++++++++++++++++++++++++++++++++++++++++++ writer/single.go | 27 +++++----------------- 11 files changed, 135 insertions(+), 44 deletions(-) create mode 100644 graph/primarykey.go create mode 100644 keys/sequentialkey.go diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 0957bab..2a76d8e 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -28,6 +28,7 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/keys" "github.com/google/cayley/quad" ) @@ -124,8 +125,8 @@ func (qs *QuadStore) Size() int64 { return qs.size } -func (qs *QuadStore) Horizon() int64 { - return qs.horizon +func (qs *QuadStore) Horizon() graph.PrimaryKey { + return keys.NewSequentialKey(qs.horizon) } func (qs *QuadStore) createDeltaKeyFor(id int64) []byte { @@ -195,13 +196,13 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { if err != nil { return err } - err = b.Put(qs.createDeltaKeyFor(d.ID), bytes) + err = b.Put(qs.createDeltaKeyFor(d.ID.Int()), bytes) if err != nil { return err } } for _, d := range deltas { - err := qs.buildQuadWrite(tx, d.Quad, d.ID, d.Action == graph.Add) + err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { return err } @@ -216,7 +217,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { resizeMap[d.Quad.Label] += delta } sizeChange += delta - qs.horizon = d.ID + qs.horizon = d.ID.Int() } for k, v := range resizeMap { if v != 0 { diff --git a/graph/iterator/mock_ts_test.go b/graph/iterator/mock_ts_test.go index bf60406..2a1407b 100644 --- a/graph/iterator/mock_ts_test.go +++ b/graph/iterator/mock_ts_test.go @@ -16,6 +16,7 @@ package iterator import ( "github.com/google/cayley/graph" + "github.com/google/cayley/keys" "github.com/google/cayley/quad" ) @@ -56,7 +57,7 @@ func (qs *store) NameOf(v graph.Value) string { func (qs *store) Size() int64 { return 0 } -func (qs *store) Horizon() int64 { return 0 } +func (qs *store) Horizon() graph.PrimaryKey { return keys.NewSequentialKey(0) } func (qs *store) DebugPrint() {} diff --git a/graph/leveldb/leveldb_test.go b/graph/leveldb/leveldb_test.go index e2757b7..0711f25 100644 --- a/graph/leveldb/leveldb_test.go +++ b/graph/leveldb/leveldb_test.go @@ -168,6 +168,12 @@ func TestLoadDatabase(t *testing.T) { t.Errorf("Could not convert from generic to LevelDB QuadStore") } + //Test horizon + horizon := qs.Horizon() + if horizon.Int() != 1 { + t.Errorf("Unexpected horizon value, got:%d expect:1", horizon.Int()) + } + w.AddQuadSet(makeQuadSet()) if s := qs.Size(); s != 11 { t.Errorf("Unexpected quadstore size, got:%d expect:11", s) @@ -175,6 +181,10 @@ func TestLoadDatabase(t *testing.T) { if s := ts2.SizeOf(qs.ValueOf("B")); s != 5 { t.Errorf("Unexpected quadstore size, got:%d expect:5", s) } + horizon = qs.Horizon() + if horizon.Int() != 12 { + t.Errorf("Unexpected horizon value, got:%d expect:12", horizon.Int()) + } w.RemoveQuad(quad.Quad{ Subject: "A", diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go index 641a6f5..3842aa9 100644 --- a/graph/leveldb/quadstore.go +++ b/graph/leveldb/quadstore.go @@ -32,6 +32,7 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/keys" "github.com/google/cayley/quad" ) @@ -135,8 +136,8 @@ func (qs *QuadStore) Size() int64 { return qs.size } -func (qs *QuadStore) Horizon() int64 { - return qs.horizon +func (qs *QuadStore) Horizon() graph.PrimaryKey { + return keys.NewSequentialKey(qs.horizon) } func hashOf(s string) []byte { @@ -190,7 +191,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { return err } batch.Put(keyFor(d), bytes) - err = qs.buildQuadWrite(batch, d.Quad, d.ID, d.Action == graph.Add) + err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { return err } @@ -205,7 +206,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { resizeMap[d.Quad.Label] += delta } sizeChange += delta - qs.horizon = d.ID + qs.horizon = d.ID.Int() } for k, v := range resizeMap { if v != 0 { @@ -227,7 +228,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { func keyFor(d graph.Delta) []byte { key := make([]byte, 0, 19) key = append(key, 'd') - key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...) + key = append(key, []byte(fmt.Sprintf("%018x", d.ID.Int()))...) return key } diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 9a0f32b..bd7581c 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -16,12 +16,14 @@ package memstore import ( "fmt" + "time" "github.com/barakmich/glog" "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" "github.com/google/cayley/graph/memstore/b" + "github.com/google/cayley/keys" "github.com/google/cayley/quad" ) @@ -65,7 +67,10 @@ func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*b.Tree, bool) { } type LogEntry struct { - graph.Delta + ID int64 + Quad quad.Quad + Action graph.Procedure + Timestamp time.Time DeletedBy int64 } @@ -149,7 +154,11 @@ func (qs *QuadStore) AddDelta(d graph.Delta) error { return graph.ErrQuadExists } qid := qs.nextQuadID - qs.log = append(qs.log, LogEntry{Delta: d}) + qs.log = append(qs.log, LogEntry{ + ID: d.ID.Int(), + Quad: d.Quad, + Action: d.Action, + Timestamp: d.Timestamp}) qs.size++ qs.nextQuadID++ @@ -185,7 +194,11 @@ func (qs *QuadStore) RemoveDelta(d graph.Delta) error { } quadID := qs.nextQuadID - qs.log = append(qs.log, LogEntry{Delta: d}) + qs.log = append(qs.log, LogEntry{ + ID: d.ID.Int(), + Quad: d.Quad, + Action: d.Action, + Timestamp: d.Timestamp}) qs.log[prevQuadID].DeletedBy = quadID qs.size-- qs.nextQuadID++ @@ -205,8 +218,8 @@ func (qs *QuadStore) QuadIterator(d quad.Direction, value graph.Value) graph.Ite return &iterator.Null{} } -func (qs *QuadStore) Horizon() int64 { - return qs.log[len(qs.log)-1].ID +func (qs *QuadStore) Horizon() graph.PrimaryKey { + return keys.NewSequentialKey(qs.log[len(qs.log)-1].ID) } func (qs *QuadStore) Size() int64 { diff --git a/graph/mongo/quadstore.go b/graph/mongo/quadstore.go index 1151357..9d7af09 100644 --- a/graph/mongo/quadstore.go +++ b/graph/mongo/quadstore.go @@ -26,6 +26,7 @@ import ( "github.com/barakmich/glog" "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/keys" "github.com/google/cayley/quad" ) @@ -200,7 +201,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error { action = "Delete" } entry := MongoLogEntry{ - LogID: d.ID, + LogID: d.ID.Int(), Action: action, Key: qs.getIDForQuad(d.Quad), Timestamp: d.Timestamp.UnixNano(), @@ -239,7 +240,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { } } for _, d := range in { - err := qs.updateQuad(d.Quad, d.ID, d.Action) + err := qs.updateQuad(d.Quad, d.ID.Int(), d.Action) if err != nil { return err } @@ -315,16 +316,16 @@ func (qs *QuadStore) Size() int64 { return int64(count) } -func (qs *QuadStore) Horizon() int64 { +func (qs *QuadStore) Horizon() graph.PrimaryKey { var log MongoLogEntry err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log) if err != nil { if err == mgo.ErrNotFound { - return 0 + return keys.NewSequentialKey(0) } glog.Errorf("Could not get Horizon from Mongo: %v", err) } - return log.LogID + return keys.NewSequentialKey(log.LogID) } func (qs *QuadStore) FixedIterator() graph.FixedIterator { diff --git a/graph/primarykey.go b/graph/primarykey.go new file mode 100644 index 0000000..25d97ba --- /dev/null +++ b/graph/primarykey.go @@ -0,0 +1,28 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package graph + +// Defines the PrimaryKey interface, this abstracts the generation of IDs + +type PrimaryKey interface { + // Returns a new unique primary key + Next() PrimaryKey + + // Get the integer format if possible, otherwise logs an error and returns -1 + Int() int64 + + // Get the string format + String() string +} diff --git a/graph/quadstore.go b/graph/quadstore.go index d3c681f..79f1df7 100644 --- a/graph/quadstore.go +++ b/graph/quadstore.go @@ -70,7 +70,7 @@ type QuadStore interface { Size() int64 // The last replicated transaction ID that this quadstore has verified. - Horizon() int64 + Horizon() PrimaryKey // Creates a fixed iterator which can compare Values FixedIterator() FixedIterator diff --git a/graph/quadwriter.go b/graph/quadwriter.go index 88073fd..5e95539 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -37,7 +37,7 @@ const ( ) type Delta struct { - ID int64 + ID PrimaryKey Quad quad.Quad Action Procedure Timestamp time.Time diff --git a/keys/sequentialkey.go b/keys/sequentialkey.go new file mode 100644 index 0000000..286a61e --- /dev/null +++ b/keys/sequentialkey.go @@ -0,0 +1,51 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keys + +import ( + "github.com/google/cayley/graph" + "strconv" + "sync" +) + +type Sequential struct { + nextID int64 + mut sync.Mutex +} + +func NewSequentialKey(horizon int64) graph.PrimaryKey { + if horizon <= 0 { + horizon = 1 + } + return &Sequential{nextID: horizon} +} + +func (s *Sequential) Next() graph.PrimaryKey { + s.mut.Lock() + defer s.mut.Unlock() + s.nextID++ + if s.nextID <= 0 { + s.nextID = 1 + } + return s +} + +func (s *Sequential) Int() int64 { + return s.nextID +} + +func (s *Sequential) String() string { + return strconv.FormatInt(s.nextID, 10) +} diff --git a/writer/single.go b/writer/single.go index e1566aa..cff4ae9 100644 --- a/writer/single.go +++ b/writer/single.go @@ -15,7 +15,6 @@ package writer import ( - "sync" "time" "github.com/google/cayley/graph" @@ -27,32 +26,18 @@ func init() { } type Single struct { - nextID int64 - qs graph.QuadStore - mut sync.Mutex + currentID graph.PrimaryKey + qs graph.QuadStore } func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) { - horizon := qs.Horizon() - rep := &Single{nextID: horizon + 1, qs: qs} - if horizon <= 0 { - rep.nextID = 1 - } - return rep, nil -} - -func (s *Single) AcquireNextID() int64 { - s.mut.Lock() - defer s.mut.Unlock() - id := s.nextID - s.nextID++ - return id + return &Single{currentID: qs.Horizon(), qs: qs}, nil } func (s *Single) AddQuad(q quad.Quad) error { deltas := make([]graph.Delta, 1) deltas[0] = graph.Delta{ - ID: s.AcquireNextID(), + ID: s.currentID.Next(), Quad: q, Action: graph.Add, Timestamp: time.Now(), @@ -64,7 +49,7 @@ func (s *Single) AddQuadSet(set []quad.Quad) error { deltas := make([]graph.Delta, len(set)) for i, q := range set { deltas[i] = graph.Delta{ - ID: s.AcquireNextID(), + ID: s.currentID.Next(), Quad: q, Action: graph.Add, Timestamp: time.Now(), @@ -77,7 +62,7 @@ func (s *Single) AddQuadSet(set []quad.Quad) error { func (s *Single) RemoveQuad(q quad.Quad) error { deltas := make([]graph.Delta, 1) deltas[0] = graph.Delta{ - ID: s.AcquireNextID(), + ID: s.currentID.Next(), Quad: q, Action: graph.Delete, Timestamp: time.Now(),