Refactoring of Delta.IDs & horizon functionality with a PrimaryKey type, needed for implementing UUID based primary keys for backends such as the appengine datastore \n Tests: at top level and if available per backend, if not then just visual
This commit is contained in:
parent
26ceed35cc
commit
abda6cbbb0
11 changed files with 135 additions and 44 deletions
|
|
@ -28,6 +28,7 @@ import (
|
|||
|
||||
"github.com/google/cayley/graph"
|
||||
"github.com/google/cayley/graph/iterator"
|
||||
"github.com/google/cayley/keys"
|
||||
"github.com/google/cayley/quad"
|
||||
)
|
||||
|
||||
|
|
@ -124,8 +125,8 @@ func (qs *QuadStore) Size() int64 {
|
|||
return qs.size
|
||||
}
|
||||
|
||||
func (qs *QuadStore) Horizon() int64 {
|
||||
return qs.horizon
|
||||
func (qs *QuadStore) Horizon() graph.PrimaryKey {
|
||||
return keys.NewSequentialKey(qs.horizon)
|
||||
}
|
||||
|
||||
func (qs *QuadStore) createDeltaKeyFor(id int64) []byte {
|
||||
|
|
@ -195,13 +196,13 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = b.Put(qs.createDeltaKeyFor(d.ID), bytes)
|
||||
err = b.Put(qs.createDeltaKeyFor(d.ID.Int()), bytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, d := range deltas {
|
||||
err := qs.buildQuadWrite(tx, d.Quad, d.ID, d.Action == graph.Add)
|
||||
err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -216,7 +217,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
|
|||
resizeMap[d.Quad.Label] += delta
|
||||
}
|
||||
sizeChange += delta
|
||||
qs.horizon = d.ID
|
||||
qs.horizon = d.ID.Int()
|
||||
}
|
||||
for k, v := range resizeMap {
|
||||
if v != 0 {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package iterator
|
|||
|
||||
import (
|
||||
"github.com/google/cayley/graph"
|
||||
"github.com/google/cayley/keys"
|
||||
"github.com/google/cayley/quad"
|
||||
)
|
||||
|
||||
|
|
@ -56,7 +57,7 @@ func (qs *store) NameOf(v graph.Value) string {
|
|||
|
||||
func (qs *store) Size() int64 { return 0 }
|
||||
|
||||
func (qs *store) Horizon() int64 { return 0 }
|
||||
func (qs *store) Horizon() graph.PrimaryKey { return keys.NewSequentialKey(0) }
|
||||
|
||||
func (qs *store) DebugPrint() {}
|
||||
|
||||
|
|
|
|||
|
|
@ -168,6 +168,12 @@ func TestLoadDatabase(t *testing.T) {
|
|||
t.Errorf("Could not convert from generic to LevelDB QuadStore")
|
||||
}
|
||||
|
||||
//Test horizon
|
||||
horizon := qs.Horizon()
|
||||
if horizon.Int() != 1 {
|
||||
t.Errorf("Unexpected horizon value, got:%d expect:1", horizon.Int())
|
||||
}
|
||||
|
||||
w.AddQuadSet(makeQuadSet())
|
||||
if s := qs.Size(); s != 11 {
|
||||
t.Errorf("Unexpected quadstore size, got:%d expect:11", s)
|
||||
|
|
@ -175,6 +181,10 @@ func TestLoadDatabase(t *testing.T) {
|
|||
if s := ts2.SizeOf(qs.ValueOf("B")); s != 5 {
|
||||
t.Errorf("Unexpected quadstore size, got:%d expect:5", s)
|
||||
}
|
||||
horizon = qs.Horizon()
|
||||
if horizon.Int() != 12 {
|
||||
t.Errorf("Unexpected horizon value, got:%d expect:12", horizon.Int())
|
||||
}
|
||||
|
||||
w.RemoveQuad(quad.Quad{
|
||||
Subject: "A",
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import (
|
|||
|
||||
"github.com/google/cayley/graph"
|
||||
"github.com/google/cayley/graph/iterator"
|
||||
"github.com/google/cayley/keys"
|
||||
"github.com/google/cayley/quad"
|
||||
)
|
||||
|
||||
|
|
@ -135,8 +136,8 @@ func (qs *QuadStore) Size() int64 {
|
|||
return qs.size
|
||||
}
|
||||
|
||||
func (qs *QuadStore) Horizon() int64 {
|
||||
return qs.horizon
|
||||
func (qs *QuadStore) Horizon() graph.PrimaryKey {
|
||||
return keys.NewSequentialKey(qs.horizon)
|
||||
}
|
||||
|
||||
func hashOf(s string) []byte {
|
||||
|
|
@ -190,7 +191,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
|
|||
return err
|
||||
}
|
||||
batch.Put(keyFor(d), bytes)
|
||||
err = qs.buildQuadWrite(batch, d.Quad, d.ID, d.Action == graph.Add)
|
||||
err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -205,7 +206,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
|
|||
resizeMap[d.Quad.Label] += delta
|
||||
}
|
||||
sizeChange += delta
|
||||
qs.horizon = d.ID
|
||||
qs.horizon = d.ID.Int()
|
||||
}
|
||||
for k, v := range resizeMap {
|
||||
if v != 0 {
|
||||
|
|
@ -227,7 +228,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
|
|||
func keyFor(d graph.Delta) []byte {
|
||||
key := make([]byte, 0, 19)
|
||||
key = append(key, 'd')
|
||||
key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...)
|
||||
key = append(key, []byte(fmt.Sprintf("%018x", d.ID.Int()))...)
|
||||
return key
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,12 +16,14 @@ package memstore
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/barakmich/glog"
|
||||
|
||||
"github.com/google/cayley/graph"
|
||||
"github.com/google/cayley/graph/iterator"
|
||||
"github.com/google/cayley/graph/memstore/b"
|
||||
"github.com/google/cayley/keys"
|
||||
"github.com/google/cayley/quad"
|
||||
)
|
||||
|
||||
|
|
@ -65,7 +67,10 @@ func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*b.Tree, bool) {
|
|||
}
|
||||
|
||||
type LogEntry struct {
|
||||
graph.Delta
|
||||
ID int64
|
||||
Quad quad.Quad
|
||||
Action graph.Procedure
|
||||
Timestamp time.Time
|
||||
DeletedBy int64
|
||||
}
|
||||
|
||||
|
|
@ -149,7 +154,11 @@ func (qs *QuadStore) AddDelta(d graph.Delta) error {
|
|||
return graph.ErrQuadExists
|
||||
}
|
||||
qid := qs.nextQuadID
|
||||
qs.log = append(qs.log, LogEntry{Delta: d})
|
||||
qs.log = append(qs.log, LogEntry{
|
||||
ID: d.ID.Int(),
|
||||
Quad: d.Quad,
|
||||
Action: d.Action,
|
||||
Timestamp: d.Timestamp})
|
||||
qs.size++
|
||||
qs.nextQuadID++
|
||||
|
||||
|
|
@ -185,7 +194,11 @@ func (qs *QuadStore) RemoveDelta(d graph.Delta) error {
|
|||
}
|
||||
|
||||
quadID := qs.nextQuadID
|
||||
qs.log = append(qs.log, LogEntry{Delta: d})
|
||||
qs.log = append(qs.log, LogEntry{
|
||||
ID: d.ID.Int(),
|
||||
Quad: d.Quad,
|
||||
Action: d.Action,
|
||||
Timestamp: d.Timestamp})
|
||||
qs.log[prevQuadID].DeletedBy = quadID
|
||||
qs.size--
|
||||
qs.nextQuadID++
|
||||
|
|
@ -205,8 +218,8 @@ func (qs *QuadStore) QuadIterator(d quad.Direction, value graph.Value) graph.Ite
|
|||
return &iterator.Null{}
|
||||
}
|
||||
|
||||
func (qs *QuadStore) Horizon() int64 {
|
||||
return qs.log[len(qs.log)-1].ID
|
||||
func (qs *QuadStore) Horizon() graph.PrimaryKey {
|
||||
return keys.NewSequentialKey(qs.log[len(qs.log)-1].ID)
|
||||
}
|
||||
|
||||
func (qs *QuadStore) Size() int64 {
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/barakmich/glog"
|
||||
"github.com/google/cayley/graph"
|
||||
"github.com/google/cayley/graph/iterator"
|
||||
"github.com/google/cayley/keys"
|
||||
"github.com/google/cayley/quad"
|
||||
)
|
||||
|
||||
|
|
@ -200,7 +201,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error {
|
|||
action = "Delete"
|
||||
}
|
||||
entry := MongoLogEntry{
|
||||
LogID: d.ID,
|
||||
LogID: d.ID.Int(),
|
||||
Action: action,
|
||||
Key: qs.getIDForQuad(d.Quad),
|
||||
Timestamp: d.Timestamp.UnixNano(),
|
||||
|
|
@ -239,7 +240,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error {
|
|||
}
|
||||
}
|
||||
for _, d := range in {
|
||||
err := qs.updateQuad(d.Quad, d.ID, d.Action)
|
||||
err := qs.updateQuad(d.Quad, d.ID.Int(), d.Action)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -315,16 +316,16 @@ func (qs *QuadStore) Size() int64 {
|
|||
return int64(count)
|
||||
}
|
||||
|
||||
func (qs *QuadStore) Horizon() int64 {
|
||||
func (qs *QuadStore) Horizon() graph.PrimaryKey {
|
||||
var log MongoLogEntry
|
||||
err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log)
|
||||
if err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
return 0
|
||||
return keys.NewSequentialKey(0)
|
||||
}
|
||||
glog.Errorf("Could not get Horizon from Mongo: %v", err)
|
||||
}
|
||||
return log.LogID
|
||||
return keys.NewSequentialKey(log.LogID)
|
||||
}
|
||||
|
||||
func (qs *QuadStore) FixedIterator() graph.FixedIterator {
|
||||
|
|
|
|||
28
graph/primarykey.go
Normal file
28
graph/primarykey.go
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
// Copyright 2014 The Cayley Authors. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package graph
|
||||
|
||||
// Defines the PrimaryKey interface, this abstracts the generation of IDs
|
||||
|
||||
type PrimaryKey interface {
|
||||
// Returns a new unique primary key
|
||||
Next() PrimaryKey
|
||||
|
||||
// Get the integer format if possible, otherwise logs an error and returns -1
|
||||
Int() int64
|
||||
|
||||
// Get the string format
|
||||
String() string
|
||||
}
|
||||
|
|
@ -70,7 +70,7 @@ type QuadStore interface {
|
|||
Size() int64
|
||||
|
||||
// The last replicated transaction ID that this quadstore has verified.
|
||||
Horizon() int64
|
||||
Horizon() PrimaryKey
|
||||
|
||||
// Creates a fixed iterator which can compare Values
|
||||
FixedIterator() FixedIterator
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ const (
|
|||
)
|
||||
|
||||
type Delta struct {
|
||||
ID int64
|
||||
ID PrimaryKey
|
||||
Quad quad.Quad
|
||||
Action Procedure
|
||||
Timestamp time.Time
|
||||
|
|
|
|||
51
keys/sequentialkey.go
Normal file
51
keys/sequentialkey.go
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
// Copyright 2014 The Cayley Authors. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package keys
|
||||
|
||||
import (
|
||||
"github.com/google/cayley/graph"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Sequential struct {
|
||||
nextID int64
|
||||
mut sync.Mutex
|
||||
}
|
||||
|
||||
func NewSequentialKey(horizon int64) graph.PrimaryKey {
|
||||
if horizon <= 0 {
|
||||
horizon = 1
|
||||
}
|
||||
return &Sequential{nextID: horizon}
|
||||
}
|
||||
|
||||
func (s *Sequential) Next() graph.PrimaryKey {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
s.nextID++
|
||||
if s.nextID <= 0 {
|
||||
s.nextID = 1
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Sequential) Int() int64 {
|
||||
return s.nextID
|
||||
}
|
||||
|
||||
func (s *Sequential) String() string {
|
||||
return strconv.FormatInt(s.nextID, 10)
|
||||
}
|
||||
|
|
@ -15,7 +15,6 @@
|
|||
package writer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/cayley/graph"
|
||||
|
|
@ -27,32 +26,18 @@ func init() {
|
|||
}
|
||||
|
||||
type Single struct {
|
||||
nextID int64
|
||||
currentID graph.PrimaryKey
|
||||
qs graph.QuadStore
|
||||
mut sync.Mutex
|
||||
}
|
||||
|
||||
func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) {
|
||||
horizon := qs.Horizon()
|
||||
rep := &Single{nextID: horizon + 1, qs: qs}
|
||||
if horizon <= 0 {
|
||||
rep.nextID = 1
|
||||
}
|
||||
return rep, nil
|
||||
}
|
||||
|
||||
func (s *Single) AcquireNextID() int64 {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
id := s.nextID
|
||||
s.nextID++
|
||||
return id
|
||||
return &Single{currentID: qs.Horizon(), qs: qs}, nil
|
||||
}
|
||||
|
||||
func (s *Single) AddQuad(q quad.Quad) error {
|
||||
deltas := make([]graph.Delta, 1)
|
||||
deltas[0] = graph.Delta{
|
||||
ID: s.AcquireNextID(),
|
||||
ID: s.currentID.Next(),
|
||||
Quad: q,
|
||||
Action: graph.Add,
|
||||
Timestamp: time.Now(),
|
||||
|
|
@ -64,7 +49,7 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
|
|||
deltas := make([]graph.Delta, len(set))
|
||||
for i, q := range set {
|
||||
deltas[i] = graph.Delta{
|
||||
ID: s.AcquireNextID(),
|
||||
ID: s.currentID.Next(),
|
||||
Quad: q,
|
||||
Action: graph.Add,
|
||||
Timestamp: time.Now(),
|
||||
|
|
@ -77,7 +62,7 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
|
|||
func (s *Single) RemoveQuad(q quad.Quad) error {
|
||||
deltas := make([]graph.Delta, 1)
|
||||
deltas[0] = graph.Delta{
|
||||
ID: s.AcquireNextID(),
|
||||
ID: s.currentID.Next(),
|
||||
Quad: q,
|
||||
Action: graph.Delete,
|
||||
Timestamp: time.Now(),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue