This commit is contained in:
Barak Michener 2014-07-27 15:26:57 -04:00
parent 9793096b9a
commit 7a8d4194bd
3 changed files with 40 additions and 58 deletions

View file

@ -23,6 +23,7 @@ package graph
import ( import (
"errors" "errors"
"time"
) )
type Procedure byte type Procedure byte
@ -34,48 +35,46 @@ const (
) )
type Transaction struct { type Transaction struct {
ID int64 ID int64
Triple *Triple Triple *Triple
Action Procedure Action Procedure
Timestamp *time.Time
} }
type Replication interface { type TripleWriter interface {
// Get a unique range of triple IDs from the Replication strategy. // Add a triple to the store.
// Returns the inclusive set of unique ids. AddTriple(*Triple) error
AcquireNextIds(size int64) (start int64, end int64)
// Returns the highest current ID. // Add a set of triples to the store, atomically if possible.
GetLastID() int64 AddTripleSet([]*Triple) error
// Sends the transactions to the replicas. // Removes a triple matching the given one from the database,
Replicate([]*Transaction) // if it exists. Does nothing otherwise.
RemoveTriple(*Triple) error
// Attempt to acquire the given range of triples from other replicated sources.
RequestTransactionRange(start int64, end int64)
} }
type NewReplicationFunc func(TripleStore, Options) (Replication, error) type NewTripleWriterFunc func(TripleStore, Options) (TripleWriter, error)
var replicationRegistry = make(map[string]NewReplicationFunc) var writerRegistry = make(map[string]NewTripleWriterFunc)
func RegisterReplication(name string, newFunc NewReplicationFunc) { func RegisterWriter(name string, newFunc NewTripleWriterFunc) {
if _, found := replicationRegistry[name]; found { if _, found := writerRegistry[name]; found {
panic("already registered Replication " + name) panic("already registered TripleWriter " + name)
} }
replicationRegistry[name] = newFunc writerRegistry[name] = newFunc
} }
func NewReplication(name string, ts TripleStore, opts Options) (Replication, error) { func NewTripleWriter(name string, ts TripleStore, opts Options) (TripleWriter, error) {
newFunc, hasNew := replicationRegistry[name] newFunc, hasNew := writerRegistry[name]
if !hasNew { if !hasNew {
return nil, errors.New("replication: name '" + name + "' is not registered") return nil, errors.New("replication: name '" + name + "' is not registered")
} }
return newFunc(ts, opts) return newFunc(ts, opts)
} }
func ReplicationMethods() []string { func WriterMethods() []string {
t := make([]string, 0, len(replicationRegistry)) t := make([]string, 0, len(writerRegistry))
for n := range replicationRegistry { for n := range writerRegistry {
t = append(t, n) t = append(t, n)
} }
return t return t

View file

@ -37,15 +37,9 @@ import (
type Value interface{} type Value interface{}
type TripleStore interface { type TripleStore interface {
// Add a triple to the store. // The only way in is through building a transaction, which
AddTriple(*Triple) // is done by a replication strategy.
ApplyTransactions([]*Transaction)
// Add a set of triples to the store, atomically if possible.
AddTripleSet([]*Triple)
// Removes a triple matching the given one from the database,
// if it exists. Does nothing otherwise.
RemoveTriple(*Triple)
// Given an opaque token, returns the triple for that token from the store. // Given an opaque token, returns the triple for that token from the store.
Triple(Value) *Triple Triple(Value) *Triple
@ -73,11 +67,6 @@ type TripleStore interface {
// The last replicated transaction ID that this triplestore has verified. // The last replicated transaction ID that this triplestore has verified.
Horizon() int64 Horizon() int64
// Inform the triplestore of a new replication strategy. Happens at startup and,
// perhaps in the future, if replication changes mid-run. Writes without any replication
// strategy are not allowed.
SetReplication(Replication)
// Creates a fixed iterator which can compare Values // Creates a fixed iterator which can compare Values
FixedIterator() FixedIterator FixedIterator() FixedIterator

View file

@ -21,37 +21,31 @@ import (
) )
type Single struct { type Single struct {
lastID int64 nextID int64
ts graph.TripleStore ts graph.TripleStore
mut sync.Mutex mut sync.Mutex
} }
func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.Replication, error) { func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.TripleWriter, error) {
rep := &Single{lastID: ts.Horizon(), ts: ts} rep := &Single{nextID: ts.Horizon(), ts: ts}
ts.SetReplication(rep) if rep.nextID == -1 {
rep.nextID = 1
}
return rep, nil return rep, nil
} }
func (s *Single) AcquireNextIds(size int64) (start int64, end int64) { func (s *Single) AcquireNextId() int64 {
s.mut.Lock() s.mut.Lock()
defer s.mut.Unlock() defer s.mut.Unlock()
start = s.lastID + 1 id := s.nextID
end = s.lastID + size s.nextID += 1
s.lastID += size return id
return
} }
func (s *Single) GetLastID() int64 { func AddTriple(*graph.Triple) error {
return s.lastID return nil
}
func (s *Single) Replicate([]*graph.Transaction) {
// Noop, single-machines don't replicate out anywhere.
}
func (s *Single) RequestTransactionRange(int64, int64) {
// Single machines also can't catch up.
} }
func init() { func init() {
graph.RegisterReplication("local", NewSingleReplication) graph.RegisterWriter("single", NewSingleReplication)
} }