Merge pull request #186 from panamafrancis/primarykeys

Implementation of the PrimaryKey type
This commit is contained in:
Barak Michener 2014-12-28 14:05:58 -08:00
commit 9088fe376b
11 changed files with 135 additions and 44 deletions

View file

@ -28,6 +28,7 @@ import (
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/keys"
"github.com/google/cayley/quad"
)
@ -124,8 +125,8 @@ func (qs *QuadStore) Size() int64 {
return qs.size
}
func (qs *QuadStore) Horizon() int64 {
return qs.horizon
func (qs *QuadStore) Horizon() graph.PrimaryKey {
return keys.NewSequentialKey(qs.horizon)
}
func (qs *QuadStore) createDeltaKeyFor(id int64) []byte {
@ -195,13 +196,13 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
if err != nil {
return err
}
err = b.Put(qs.createDeltaKeyFor(d.ID), bytes)
err = b.Put(qs.createDeltaKeyFor(d.ID.Int()), bytes)
if err != nil {
return err
}
}
for _, d := range deltas {
err := qs.buildQuadWrite(tx, d.Quad, d.ID, d.Action == graph.Add)
err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add)
if err != nil {
return err
}
@ -216,7 +217,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
resizeMap[d.Quad.Label] += delta
}
sizeChange += delta
qs.horizon = d.ID
qs.horizon = d.ID.Int()
}
for k, v := range resizeMap {
if v != 0 {

View file

@ -16,6 +16,7 @@ package iterator
import (
"github.com/google/cayley/graph"
"github.com/google/cayley/keys"
"github.com/google/cayley/quad"
)
@ -56,7 +57,7 @@ func (qs *store) NameOf(v graph.Value) string {
func (qs *store) Size() int64 { return 0 }
func (qs *store) Horizon() int64 { return 0 }
func (qs *store) Horizon() graph.PrimaryKey { return keys.NewSequentialKey(0) }
func (qs *store) DebugPrint() {}

View file

@ -168,6 +168,12 @@ func TestLoadDatabase(t *testing.T) {
t.Errorf("Could not convert from generic to LevelDB QuadStore")
}
//Test horizon
horizon := qs.Horizon()
if horizon.Int() != 1 {
t.Errorf("Unexpected horizon value, got:%d expect:1", horizon.Int())
}
w.AddQuadSet(makeQuadSet())
if s := qs.Size(); s != 11 {
t.Errorf("Unexpected quadstore size, got:%d expect:11", s)
@ -175,6 +181,10 @@ func TestLoadDatabase(t *testing.T) {
if s := ts2.SizeOf(qs.ValueOf("B")); s != 5 {
t.Errorf("Unexpected quadstore size, got:%d expect:5", s)
}
horizon = qs.Horizon()
if horizon.Int() != 12 {
t.Errorf("Unexpected horizon value, got:%d expect:12", horizon.Int())
}
w.RemoveQuad(quad.Quad{
Subject: "A",

View file

@ -31,6 +31,7 @@ import (
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/keys"
"github.com/google/cayley/quad"
)
@ -134,8 +135,8 @@ func (qs *QuadStore) Size() int64 {
return qs.size
}
func (qs *QuadStore) Horizon() int64 {
return qs.horizon
func (qs *QuadStore) Horizon() graph.PrimaryKey {
return keys.NewSequentialKey(qs.horizon)
}
func hashOf(s string) []byte {
@ -189,7 +190,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
return err
}
batch.Put(keyFor(d), bytes)
err = qs.buildQuadWrite(batch, d.Quad, d.ID, d.Action == graph.Add)
err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add)
if err != nil {
return err
}
@ -204,7 +205,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
resizeMap[d.Quad.Label] += delta
}
sizeChange += delta
qs.horizon = d.ID
qs.horizon = d.ID.Int()
}
for k, v := range resizeMap {
if v != 0 {
@ -226,7 +227,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
func keyFor(d graph.Delta) []byte {
key := make([]byte, 0, 19)
key = append(key, 'd')
key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...)
key = append(key, []byte(fmt.Sprintf("%018x", d.ID.Int()))...)
return key
}

View file

@ -16,12 +16,14 @@ package memstore
import (
"fmt"
"time"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/graph/memstore/b"
"github.com/google/cayley/keys"
"github.com/google/cayley/quad"
)
@ -65,7 +67,10 @@ func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*b.Tree, bool) {
}
type LogEntry struct {
graph.Delta
ID int64
Quad quad.Quad
Action graph.Procedure
Timestamp time.Time
DeletedBy int64
}
@ -149,7 +154,11 @@ func (qs *QuadStore) AddDelta(d graph.Delta) error {
return graph.ErrQuadExists
}
qid := qs.nextQuadID
qs.log = append(qs.log, LogEntry{Delta: d})
qs.log = append(qs.log, LogEntry{
ID: d.ID.Int(),
Quad: d.Quad,
Action: d.Action,
Timestamp: d.Timestamp})
qs.size++
qs.nextQuadID++
@ -185,7 +194,11 @@ func (qs *QuadStore) RemoveDelta(d graph.Delta) error {
}
quadID := qs.nextQuadID
qs.log = append(qs.log, LogEntry{Delta: d})
qs.log = append(qs.log, LogEntry{
ID: d.ID.Int(),
Quad: d.Quad,
Action: d.Action,
Timestamp: d.Timestamp})
qs.log[prevQuadID].DeletedBy = quadID
qs.size--
qs.nextQuadID++
@ -205,8 +218,8 @@ func (qs *QuadStore) QuadIterator(d quad.Direction, value graph.Value) graph.Ite
return &iterator.Null{}
}
func (qs *QuadStore) Horizon() int64 {
return qs.log[len(qs.log)-1].ID
func (qs *QuadStore) Horizon() graph.PrimaryKey {
return keys.NewSequentialKey(qs.log[len(qs.log)-1].ID)
}
func (qs *QuadStore) Size() int64 {

View file

@ -26,6 +26,7 @@ import (
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/keys"
"github.com/google/cayley/quad"
)
@ -200,7 +201,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error {
action = "Delete"
}
entry := MongoLogEntry{
LogID: d.ID,
LogID: d.ID.Int(),
Action: action,
Key: qs.getIDForQuad(d.Quad),
Timestamp: d.Timestamp.UnixNano(),
@ -239,7 +240,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error {
}
}
for _, d := range in {
err := qs.updateQuad(d.Quad, d.ID, d.Action)
err := qs.updateQuad(d.Quad, d.ID.Int(), d.Action)
if err != nil {
return err
}
@ -315,16 +316,16 @@ func (qs *QuadStore) Size() int64 {
return int64(count)
}
func (qs *QuadStore) Horizon() int64 {
func (qs *QuadStore) Horizon() graph.PrimaryKey {
var log MongoLogEntry
err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log)
if err != nil {
if err == mgo.ErrNotFound {
return 0
return keys.NewSequentialKey(0)
}
glog.Errorf("Could not get Horizon from Mongo: %v", err)
}
return log.LogID
return keys.NewSequentialKey(log.LogID)
}
func (qs *QuadStore) FixedIterator() graph.FixedIterator {

28
graph/primarykey.go Normal file
View file

@ -0,0 +1,28 @@
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package graph
// Defines the PrimaryKey interface, this abstracts the generation of IDs
type PrimaryKey interface {
// Returns a new unique primary key
Next() PrimaryKey
// Get the integer format if possible, otherwise logs an error and returns -1
Int() int64
// Get the string format
String() string
}

View file

@ -70,7 +70,7 @@ type QuadStore interface {
Size() int64
// The last replicated transaction ID that this quadstore has verified.
Horizon() int64
Horizon() PrimaryKey
// Creates a fixed iterator which can compare Values
FixedIterator() FixedIterator

View file

@ -37,7 +37,7 @@ const (
)
type Delta struct {
ID int64
ID PrimaryKey
Quad quad.Quad
Action Procedure
Timestamp time.Time

51
keys/sequentialkey.go Normal file
View file

@ -0,0 +1,51 @@
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package keys
import (
"github.com/google/cayley/graph"
"strconv"
"sync"
)
type Sequential struct {
nextID int64
mut sync.Mutex
}
func NewSequentialKey(horizon int64) graph.PrimaryKey {
if horizon <= 0 {
horizon = 1
}
return &Sequential{nextID: horizon}
}
func (s *Sequential) Next() graph.PrimaryKey {
s.mut.Lock()
defer s.mut.Unlock()
s.nextID++
if s.nextID <= 0 {
s.nextID = 1
}
return s
}
func (s *Sequential) Int() int64 {
return s.nextID
}
func (s *Sequential) String() string {
return strconv.FormatInt(s.nextID, 10)
}

View file

@ -15,7 +15,6 @@
package writer
import (
"sync"
"time"
"github.com/google/cayley/graph"
@ -27,32 +26,18 @@ func init() {
}
type Single struct {
nextID int64
qs graph.QuadStore
mut sync.Mutex
currentID graph.PrimaryKey
qs graph.QuadStore
}
func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) {
horizon := qs.Horizon()
rep := &Single{nextID: horizon + 1, qs: qs}
if horizon <= 0 {
rep.nextID = 1
}
return rep, nil
}
func (s *Single) AcquireNextID() int64 {
s.mut.Lock()
defer s.mut.Unlock()
id := s.nextID
s.nextID++
return id
return &Single{currentID: qs.Horizon(), qs: qs}, nil
}
func (s *Single) AddQuad(q quad.Quad) error {
deltas := make([]graph.Delta, 1)
deltas[0] = graph.Delta{
ID: s.AcquireNextID(),
ID: s.currentID.Next(),
Quad: q,
Action: graph.Add,
Timestamp: time.Now(),
@ -64,7 +49,7 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
deltas := make([]graph.Delta, len(set))
for i, q := range set {
deltas[i] = graph.Delta{
ID: s.AcquireNextID(),
ID: s.currentID.Next(),
Quad: q,
Action: graph.Add,
Timestamp: time.Now(),
@ -77,7 +62,7 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
func (s *Single) RemoveQuad(q quad.Quad) error {
deltas := make([]graph.Delta, 1)
deltas[0] = graph.Delta{
ID: s.AcquireNextID(),
ID: s.currentID.Next(),
Quad: q,
Action: graph.Delete,
Timestamp: time.Now(),