diff --git a/graph/replication.go b/graph/replication.go index cfefc8e..d890219 100644 --- a/graph/replication.go +++ b/graph/replication.go @@ -23,6 +23,7 @@ package graph import ( "errors" + "time" ) type Procedure byte @@ -34,48 +35,46 @@ const ( ) type Transaction struct { - ID int64 - Triple *Triple - Action Procedure + ID int64 + Triple *Triple + Action Procedure + Timestamp *time.Time } -type Replication interface { - // Get a unique range of triple IDs from the Replication strategy. - // Returns the inclusive set of unique ids. - AcquireNextIds(size int64) (start int64, end int64) +type TripleWriter interface { + // Add a triple to the store. + AddTriple(*Triple) error - // Returns the highest current ID. - GetLastID() int64 + // Add a set of triples to the store, atomically if possible. + AddTripleSet([]*Triple) error - // Sends the transactions to the replicas. - Replicate([]*Transaction) - - // Attempt to acquire the given range of triples from other replicated sources. - RequestTransactionRange(start int64, end int64) + // Removes a triple matching the given one from the database, + // if it exists. Does nothing otherwise. + RemoveTriple(*Triple) error } -type NewReplicationFunc func(TripleStore, Options) (Replication, error) +type NewTripleWriterFunc func(TripleStore, Options) (TripleWriter, error) -var replicationRegistry = make(map[string]NewReplicationFunc) +var writerRegistry = make(map[string]NewTripleWriterFunc) -func RegisterReplication(name string, newFunc NewReplicationFunc) { - if _, found := replicationRegistry[name]; found { - panic("already registered Replication " + name) +func RegisterWriter(name string, newFunc NewTripleWriterFunc) { + if _, found := writerRegistry[name]; found { + panic("already registered TripleWriter " + name) } - replicationRegistry[name] = newFunc + writerRegistry[name] = newFunc } -func NewReplication(name string, ts TripleStore, opts Options) (Replication, error) { - newFunc, hasNew := replicationRegistry[name] +func NewTripleWriter(name string, ts TripleStore, opts Options) (TripleWriter, error) { + newFunc, hasNew := writerRegistry[name] if !hasNew { return nil, errors.New("replication: name '" + name + "' is not registered") } return newFunc(ts, opts) } -func ReplicationMethods() []string { - t := make([]string, 0, len(replicationRegistry)) - for n := range replicationRegistry { +func WriterMethods() []string { + t := make([]string, 0, len(writerRegistry)) + for n := range writerRegistry { t = append(t, n) } return t diff --git a/graph/triplestore.go b/graph/triplestore.go index e72c09d..9927f4c 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -37,15 +37,9 @@ import ( type Value interface{} type TripleStore interface { - // Add a triple to the store. - AddTriple(*Triple) - - // Add a set of triples to the store, atomically if possible. - AddTripleSet([]*Triple) - - // Removes a triple matching the given one from the database, - // if it exists. Does nothing otherwise. - RemoveTriple(*Triple) + // The only way in is through building a transaction, which + // is done by a replication strategy. + ApplyTransactions([]*Transaction) // Given an opaque token, returns the triple for that token from the store. Triple(Value) *Triple @@ -73,11 +67,6 @@ type TripleStore interface { // The last replicated transaction ID that this triplestore has verified. Horizon() int64 - // Inform the triplestore of a new replication strategy. Happens at startup and, - // perhaps in the future, if replication changes mid-run. Writes without any replication - // strategy are not allowed. - SetReplication(Replication) - // Creates a fixed iterator which can compare Values FixedIterator() FixedIterator diff --git a/replication/local.go b/replication/local.go index ff84fe8..5e3f60b 100644 --- a/replication/local.go +++ b/replication/local.go @@ -21,37 +21,31 @@ import ( ) type Single struct { - lastID int64 + nextID int64 ts graph.TripleStore mut sync.Mutex } -func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.Replication, error) { - rep := &Single{lastID: ts.Horizon(), ts: ts} - ts.SetReplication(rep) +func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.TripleWriter, error) { + rep := &Single{nextID: ts.Horizon(), ts: ts} + if rep.nextID == -1 { + rep.nextID = 1 + } return rep, nil } -func (s *Single) AcquireNextIds(size int64) (start int64, end int64) { +func (s *Single) AcquireNextId() int64 { s.mut.Lock() defer s.mut.Unlock() - start = s.lastID + 1 - end = s.lastID + size - s.lastID += size - return + id := s.nextID + s.nextID += 1 + return id } -func (s *Single) GetLastID() int64 { - return s.lastID -} - -func (s *Single) Replicate([]*graph.Transaction) { - // Noop, single-machines don't replicate out anywhere. -} -func (s *Single) RequestTransactionRange(int64, int64) { - // Single machines also can't catch up. +func AddTriple(*graph.Triple) error { + return nil } func init() { - graph.RegisterReplication("local", NewSingleReplication) + graph.RegisterWriter("single", NewSingleReplication) }