From 426e0b62945f42e4dd5ce5bcdeb3138a645427de Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Tue, 22 Jul 2014 21:06:57 -0400 Subject: [PATCH 01/18] add replication interface --- graph/replication.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 graph/replication.go diff --git a/graph/replication.go b/graph/replication.go new file mode 100644 index 0000000..57a4b2e --- /dev/null +++ b/graph/replication.go @@ -0,0 +1,53 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package graph + +// Defines the interface for consistent replication of a graph instance. +// +// Separate from the backend, this dictates how individual triples get +// identified and replicated consistently across (potentially) multiple +// instances. The simplest case is to keep an append-only log of triple +// changes. + +type Procedure byte + +const ( + Add Procedure = iota + Delete +) + +type Transaction struct { + Id int64 + Triple *Triple + Action Procedure +} + +type Replication interface { + // Get a unique range of triple IDs from the Replication strategy. + // Returns the inclusive set of unique ids. + AcquireNextIds(size int64) (start int64, end int64) + + // Returns the highest current ID. + GetLastId() int64 + + // Sends the transactions to the replicas. + Replicate([]*Transaction) + + // Attempt to acquire the given range of triples from other replicated sources. + RequestTransactionRange(start int64, end int64) + + // Opens the replication interface. + Open(TripleStore, Options) +} From 768ca5c36f3956b8de43dda36f1b724b06f61320 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Tue, 22 Jul 2014 21:26:30 -0400 Subject: [PATCH 02/18] add replication registry and local replication --- graph/replication.go | 34 +++++++++++++++++++++++++++++++--- replication/local.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 replication/local.go diff --git a/graph/replication.go b/graph/replication.go index 57a4b2e..ab86ffe 100644 --- a/graph/replication.go +++ b/graph/replication.go @@ -21,6 +21,10 @@ package graph // instances. The simplest case is to keep an append-only log of triple // changes. +import ( + "errors" +) + type Procedure byte const ( @@ -47,7 +51,31 @@ type Replication interface { // Attempt to acquire the given range of triples from other replicated sources. RequestTransactionRange(start int64, end int64) - - // Opens the replication interface. - Open(TripleStore, Options) +} + +type NewReplicationFunc func(TripleStore, Options) (Replication, error) + +var replicationRegistry = make(map[string]NewReplicationFunc) + +func RegisterReplication(name string, newFunc NewReplicationFunc) { + if _, found := replicationRegistry[name]; found { + panic("already registered Replication " + name) + } + replicationRegistry[name] = newFunc +} + +func NewReplication(name string, ts TripleStore, opts Options) (Replication, error) { + newFunc, hasNew := replicationRegistry[name] + if !hasNew { + return nil, errors.New("replication: name '" + name + "' is not registered") + } + return newFunc(ts, opts) +} + +func ReplicationMethods() []string { + t := make([]string, 0, len(replicationRegistry)) + for n := range replicationRegistry { + t = append(t, n) + } + return t } diff --git a/replication/local.go b/replication/local.go new file mode 100644 index 0000000..0749d55 --- /dev/null +++ b/replication/local.go @@ -0,0 +1,52 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "github.com/google/cayley/graph" + "sync" +) + +type LocalReplication struct { + lastId int64 + ts graph.TripleStore + mut sync.Mutex +} + +func NewLocalReplication(ts graph.TripleStore, opts graph.Options) (*LocalReplication, error) { + rep := &LocalReplication{lastId: ts.Horizon(), ts: ts} + ts.SetReplication(rep) + return rep, nil +} + +func (l *LocalReplication) AcquireNextIds(size int64) (start int64, end int64) { + l.mut.Lock() + defer l.mut.Unlock() + start = l.lastId + 1 + end = l.lastId + size + l.lastId += size + return +} + +func (l *LocalReplication) GetLastId() int64 { + return l.lastId +} + +func (l *LocalReplication) Replicate([]*graph.Transaction) {} +func (l *LocalReplication) RequestTransactionRange(int64, int64) {} + +func init() { + graph.RegisterReplication("local", NewLocalReplication) +} From 929b4f539bd0c0423f5c77ab17a0ebe84ad10d31 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Thu, 24 Jul 2014 16:59:38 -0400 Subject: [PATCH 03/18] update the triplestore interface and local replication --- graph/triplestore.go | 8 ++++++++ replication/local.go | 13 +++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/graph/triplestore.go b/graph/triplestore.go index a640b1d..e72c09d 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -70,6 +70,14 @@ type TripleStore interface { // Returns the number of triples currently stored. Size() int64 + // The last replicated transaction ID that this triplestore has verified. + Horizon() int64 + + // Inform the triplestore of a new replication strategy. Happens at startup and, + // perhaps in the future, if replication changes mid-run. Writes without any replication + // strategy are not allowed. + SetReplication(Replication) + // Creates a fixed iterator which can compare Values FixedIterator() FixedIterator diff --git a/replication/local.go b/replication/local.go index 0749d55..6f7d684 100644 --- a/replication/local.go +++ b/replication/local.go @@ -15,8 +15,9 @@ package local import ( - "github.com/google/cayley/graph" "sync" + + "github.com/google/cayley/graph" ) type LocalReplication struct { @@ -25,7 +26,7 @@ type LocalReplication struct { mut sync.Mutex } -func NewLocalReplication(ts graph.TripleStore, opts graph.Options) (*LocalReplication, error) { +func NewLocalReplication(ts graph.TripleStore, opts graph.Options) (graph.Replication, error) { rep := &LocalReplication{lastId: ts.Horizon(), ts: ts} ts.SetReplication(rep) return rep, nil @@ -44,8 +45,12 @@ func (l *LocalReplication) GetLastId() int64 { return l.lastId } -func (l *LocalReplication) Replicate([]*graph.Transaction) {} -func (l *LocalReplication) RequestTransactionRange(int64, int64) {} +func (l *LocalReplication) Replicate([]*graph.Transaction) { + // Noop, single-machines don't replicate out anywhere. +} +func (l *LocalReplication) RequestTransactionRange(int64, int64) { + // Single machines also can't catch up. +} func init() { graph.RegisterReplication("local", NewLocalReplication) From 9793096b9a21b2cbb36d6f2e3b1f8eb4a5e2c3cc Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Thu, 24 Jul 2014 17:59:39 -0400 Subject: [PATCH 04/18] lint --- graph/replication.go | 5 +++-- replication/local.go | 32 ++++++++++++++++---------------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/graph/replication.go b/graph/replication.go index ab86ffe..cfefc8e 100644 --- a/graph/replication.go +++ b/graph/replication.go @@ -27,13 +27,14 @@ import ( type Procedure byte +// The different types of actions a transaction can do. const ( Add Procedure = iota Delete ) type Transaction struct { - Id int64 + ID int64 Triple *Triple Action Procedure } @@ -44,7 +45,7 @@ type Replication interface { AcquireNextIds(size int64) (start int64, end int64) // Returns the highest current ID. - GetLastId() int64 + GetLastID() int64 // Sends the transactions to the replicas. Replicate([]*Transaction) diff --git a/replication/local.go b/replication/local.go index 6f7d684..ff84fe8 100644 --- a/replication/local.go +++ b/replication/local.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package local +package replication import ( "sync" @@ -20,38 +20,38 @@ import ( "github.com/google/cayley/graph" ) -type LocalReplication struct { - lastId int64 +type Single struct { + lastID int64 ts graph.TripleStore mut sync.Mutex } -func NewLocalReplication(ts graph.TripleStore, opts graph.Options) (graph.Replication, error) { - rep := &LocalReplication{lastId: ts.Horizon(), ts: ts} +func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.Replication, error) { + rep := &Single{lastID: ts.Horizon(), ts: ts} ts.SetReplication(rep) return rep, nil } -func (l *LocalReplication) AcquireNextIds(size int64) (start int64, end int64) { - l.mut.Lock() - defer l.mut.Unlock() - start = l.lastId + 1 - end = l.lastId + size - l.lastId += size +func (s *Single) AcquireNextIds(size int64) (start int64, end int64) { + s.mut.Lock() + defer s.mut.Unlock() + start = s.lastID + 1 + end = s.lastID + size + s.lastID += size return } -func (l *LocalReplication) GetLastId() int64 { - return l.lastId +func (s *Single) GetLastID() int64 { + return s.lastID } -func (l *LocalReplication) Replicate([]*graph.Transaction) { +func (s *Single) Replicate([]*graph.Transaction) { // Noop, single-machines don't replicate out anywhere. } -func (l *LocalReplication) RequestTransactionRange(int64, int64) { +func (s *Single) RequestTransactionRange(int64, int64) { // Single machines also can't catch up. } func init() { - graph.RegisterReplication("local", NewLocalReplication) + graph.RegisterReplication("local", NewSingleReplication) } From 7a8d4194bd79ae2cf8a3d54b288fc7d9aecf6e83 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sun, 27 Jul 2014 15:26:57 -0400 Subject: [PATCH 05/18] wip --- graph/replication.go | 49 ++++++++++++++++++++++++------------------------- graph/triplestore.go | 17 +++-------------- replication/local.go | 32 +++++++++++++------------------- 3 files changed, 40 insertions(+), 58 deletions(-) diff --git a/graph/replication.go b/graph/replication.go index cfefc8e..d890219 100644 --- a/graph/replication.go +++ b/graph/replication.go @@ -23,6 +23,7 @@ package graph import ( "errors" + "time" ) type Procedure byte @@ -34,48 +35,46 @@ const ( ) type Transaction struct { - ID int64 - Triple *Triple - Action Procedure + ID int64 + Triple *Triple + Action Procedure + Timestamp *time.Time } -type Replication interface { - // Get a unique range of triple IDs from the Replication strategy. - // Returns the inclusive set of unique ids. - AcquireNextIds(size int64) (start int64, end int64) +type TripleWriter interface { + // Add a triple to the store. + AddTriple(*Triple) error - // Returns the highest current ID. - GetLastID() int64 + // Add a set of triples to the store, atomically if possible. + AddTripleSet([]*Triple) error - // Sends the transactions to the replicas. - Replicate([]*Transaction) - - // Attempt to acquire the given range of triples from other replicated sources. - RequestTransactionRange(start int64, end int64) + // Removes a triple matching the given one from the database, + // if it exists. Does nothing otherwise. + RemoveTriple(*Triple) error } -type NewReplicationFunc func(TripleStore, Options) (Replication, error) +type NewTripleWriterFunc func(TripleStore, Options) (TripleWriter, error) -var replicationRegistry = make(map[string]NewReplicationFunc) +var writerRegistry = make(map[string]NewTripleWriterFunc) -func RegisterReplication(name string, newFunc NewReplicationFunc) { - if _, found := replicationRegistry[name]; found { - panic("already registered Replication " + name) +func RegisterWriter(name string, newFunc NewTripleWriterFunc) { + if _, found := writerRegistry[name]; found { + panic("already registered TripleWriter " + name) } - replicationRegistry[name] = newFunc + writerRegistry[name] = newFunc } -func NewReplication(name string, ts TripleStore, opts Options) (Replication, error) { - newFunc, hasNew := replicationRegistry[name] +func NewTripleWriter(name string, ts TripleStore, opts Options) (TripleWriter, error) { + newFunc, hasNew := writerRegistry[name] if !hasNew { return nil, errors.New("replication: name '" + name + "' is not registered") } return newFunc(ts, opts) } -func ReplicationMethods() []string { - t := make([]string, 0, len(replicationRegistry)) - for n := range replicationRegistry { +func WriterMethods() []string { + t := make([]string, 0, len(writerRegistry)) + for n := range writerRegistry { t = append(t, n) } return t diff --git a/graph/triplestore.go b/graph/triplestore.go index e72c09d..9927f4c 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -37,15 +37,9 @@ import ( type Value interface{} type TripleStore interface { - // Add a triple to the store. - AddTriple(*Triple) - - // Add a set of triples to the store, atomically if possible. - AddTripleSet([]*Triple) - - // Removes a triple matching the given one from the database, - // if it exists. Does nothing otherwise. - RemoveTriple(*Triple) + // The only way in is through building a transaction, which + // is done by a replication strategy. + ApplyTransactions([]*Transaction) // Given an opaque token, returns the triple for that token from the store. Triple(Value) *Triple @@ -73,11 +67,6 @@ type TripleStore interface { // The last replicated transaction ID that this triplestore has verified. Horizon() int64 - // Inform the triplestore of a new replication strategy. Happens at startup and, - // perhaps in the future, if replication changes mid-run. Writes without any replication - // strategy are not allowed. - SetReplication(Replication) - // Creates a fixed iterator which can compare Values FixedIterator() FixedIterator diff --git a/replication/local.go b/replication/local.go index ff84fe8..5e3f60b 100644 --- a/replication/local.go +++ b/replication/local.go @@ -21,37 +21,31 @@ import ( ) type Single struct { - lastID int64 + nextID int64 ts graph.TripleStore mut sync.Mutex } -func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.Replication, error) { - rep := &Single{lastID: ts.Horizon(), ts: ts} - ts.SetReplication(rep) +func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.TripleWriter, error) { + rep := &Single{nextID: ts.Horizon(), ts: ts} + if rep.nextID == -1 { + rep.nextID = 1 + } return rep, nil } -func (s *Single) AcquireNextIds(size int64) (start int64, end int64) { +func (s *Single) AcquireNextId() int64 { s.mut.Lock() defer s.mut.Unlock() - start = s.lastID + 1 - end = s.lastID + size - s.lastID += size - return + id := s.nextID + s.nextID += 1 + return id } -func (s *Single) GetLastID() int64 { - return s.lastID -} - -func (s *Single) Replicate([]*graph.Transaction) { - // Noop, single-machines don't replicate out anywhere. -} -func (s *Single) RequestTransactionRange(int64, int64) { - // Single machines also can't catch up. +func AddTriple(*graph.Triple) error { + return nil } func init() { - graph.RegisterReplication("local", NewSingleReplication) + graph.RegisterWriter("single", NewSingleReplication) } From e13e65d09bf8eecdb405368668607c82d8e298cd Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sun, 27 Jul 2014 15:39:45 -0400 Subject: [PATCH 06/18] single writer --- graph/replication.go | 2 +- graph/triplestore.go | 2 +- replication/local.go | 35 ++++++++++++++++++++++++++++++++++- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/graph/replication.go b/graph/replication.go index d890219..65e560e 100644 --- a/graph/replication.go +++ b/graph/replication.go @@ -38,7 +38,7 @@ type Transaction struct { ID int64 Triple *Triple Action Procedure - Timestamp *time.Time + Timestamp time.Time } type TripleWriter interface { diff --git a/graph/triplestore.go b/graph/triplestore.go index 9927f4c..360dbc9 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -39,7 +39,7 @@ type Value interface{} type TripleStore interface { // The only way in is through building a transaction, which // is done by a replication strategy. - ApplyTransactions([]*Transaction) + ApplyTransactions([]*Transaction) error // Given an opaque token, returns the triple for that token from the store. Triple(Value) *Triple diff --git a/replication/local.go b/replication/local.go index 5e3f60b..01eb644 100644 --- a/replication/local.go +++ b/replication/local.go @@ -16,6 +16,7 @@ package replication import ( "sync" + "time" "github.com/google/cayley/graph" ) @@ -42,10 +43,42 @@ func (s *Single) AcquireNextId() int64 { return id } -func AddTriple(*graph.Triple) error { +func (s *Single) AddTriple(t *graph.Triple) error { + trans := make([]*graph.Transaction, 1) + trans[0] = &graph.Transaction{ + ID: s.AcquireNextId(), + Triple: t, + Action: graph.Add, + Timestamp: time.Now(), + } + return s.ts.ApplyTransactions(trans) +} + +func (s *Single) AddTripleSet(set []*graph.Triple) error { + trans := make([]*graph.Transaction, len(set)) + for i, t := range set { + trans[i] = &graph.Transaction{ + ID: s.AcquireNextId(), + Triple: t, + Action: graph.Add, + Timestamp: time.Now(), + } + } + s.ts.ApplyTransactions(trans) return nil } +func (s *Single) RemoveTriple(t *graph.Triple) error { + trans := make([]*graph.Transaction, 1) + trans[0] = &graph.Transaction{ + ID: s.AcquireNextId(), + Triple: t, + Action: graph.Delete, + Timestamp: time.Now(), + } + return s.ts.ApplyTransactions(trans) +} + func init() { graph.RegisterWriter("single", NewSingleReplication) } From 1b24d66d8a4c3a2ba0f43a4c3af9931b168138b1 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sun, 27 Jul 2014 15:41:52 -0400 Subject: [PATCH 07/18] rename --- graph/replication.go | 81 ------------------------------------------------- graph/triplewriter.go | 81 +++++++++++++++++++++++++++++++++++++++++++++++++ replication/local.go | 84 --------------------------------------------------- writer/single.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 165 insertions(+), 165 deletions(-) delete mode 100644 graph/replication.go create mode 100644 graph/triplewriter.go delete mode 100644 replication/local.go create mode 100644 writer/single.go diff --git a/graph/replication.go b/graph/replication.go deleted file mode 100644 index 65e560e..0000000 --- a/graph/replication.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package graph - -// Defines the interface for consistent replication of a graph instance. -// -// Separate from the backend, this dictates how individual triples get -// identified and replicated consistently across (potentially) multiple -// instances. The simplest case is to keep an append-only log of triple -// changes. - -import ( - "errors" - "time" -) - -type Procedure byte - -// The different types of actions a transaction can do. -const ( - Add Procedure = iota - Delete -) - -type Transaction struct { - ID int64 - Triple *Triple - Action Procedure - Timestamp time.Time -} - -type TripleWriter interface { - // Add a triple to the store. - AddTriple(*Triple) error - - // Add a set of triples to the store, atomically if possible. - AddTripleSet([]*Triple) error - - // Removes a triple matching the given one from the database, - // if it exists. Does nothing otherwise. - RemoveTriple(*Triple) error -} - -type NewTripleWriterFunc func(TripleStore, Options) (TripleWriter, error) - -var writerRegistry = make(map[string]NewTripleWriterFunc) - -func RegisterWriter(name string, newFunc NewTripleWriterFunc) { - if _, found := writerRegistry[name]; found { - panic("already registered TripleWriter " + name) - } - writerRegistry[name] = newFunc -} - -func NewTripleWriter(name string, ts TripleStore, opts Options) (TripleWriter, error) { - newFunc, hasNew := writerRegistry[name] - if !hasNew { - return nil, errors.New("replication: name '" + name + "' is not registered") - } - return newFunc(ts, opts) -} - -func WriterMethods() []string { - t := make([]string, 0, len(writerRegistry)) - for n := range writerRegistry { - t = append(t, n) - } - return t -} diff --git a/graph/triplewriter.go b/graph/triplewriter.go new file mode 100644 index 0000000..65e560e --- /dev/null +++ b/graph/triplewriter.go @@ -0,0 +1,81 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package graph + +// Defines the interface for consistent replication of a graph instance. +// +// Separate from the backend, this dictates how individual triples get +// identified and replicated consistently across (potentially) multiple +// instances. The simplest case is to keep an append-only log of triple +// changes. + +import ( + "errors" + "time" +) + +type Procedure byte + +// The different types of actions a transaction can do. +const ( + Add Procedure = iota + Delete +) + +type Transaction struct { + ID int64 + Triple *Triple + Action Procedure + Timestamp time.Time +} + +type TripleWriter interface { + // Add a triple to the store. + AddTriple(*Triple) error + + // Add a set of triples to the store, atomically if possible. + AddTripleSet([]*Triple) error + + // Removes a triple matching the given one from the database, + // if it exists. Does nothing otherwise. + RemoveTriple(*Triple) error +} + +type NewTripleWriterFunc func(TripleStore, Options) (TripleWriter, error) + +var writerRegistry = make(map[string]NewTripleWriterFunc) + +func RegisterWriter(name string, newFunc NewTripleWriterFunc) { + if _, found := writerRegistry[name]; found { + panic("already registered TripleWriter " + name) + } + writerRegistry[name] = newFunc +} + +func NewTripleWriter(name string, ts TripleStore, opts Options) (TripleWriter, error) { + newFunc, hasNew := writerRegistry[name] + if !hasNew { + return nil, errors.New("replication: name '" + name + "' is not registered") + } + return newFunc(ts, opts) +} + +func WriterMethods() []string { + t := make([]string, 0, len(writerRegistry)) + for n := range writerRegistry { + t = append(t, n) + } + return t +} diff --git a/replication/local.go b/replication/local.go deleted file mode 100644 index 01eb644..0000000 --- a/replication/local.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package replication - -import ( - "sync" - "time" - - "github.com/google/cayley/graph" -) - -type Single struct { - nextID int64 - ts graph.TripleStore - mut sync.Mutex -} - -func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.TripleWriter, error) { - rep := &Single{nextID: ts.Horizon(), ts: ts} - if rep.nextID == -1 { - rep.nextID = 1 - } - return rep, nil -} - -func (s *Single) AcquireNextId() int64 { - s.mut.Lock() - defer s.mut.Unlock() - id := s.nextID - s.nextID += 1 - return id -} - -func (s *Single) AddTriple(t *graph.Triple) error { - trans := make([]*graph.Transaction, 1) - trans[0] = &graph.Transaction{ - ID: s.AcquireNextId(), - Triple: t, - Action: graph.Add, - Timestamp: time.Now(), - } - return s.ts.ApplyTransactions(trans) -} - -func (s *Single) AddTripleSet(set []*graph.Triple) error { - trans := make([]*graph.Transaction, len(set)) - for i, t := range set { - trans[i] = &graph.Transaction{ - ID: s.AcquireNextId(), - Triple: t, - Action: graph.Add, - Timestamp: time.Now(), - } - } - s.ts.ApplyTransactions(trans) - return nil -} - -func (s *Single) RemoveTriple(t *graph.Triple) error { - trans := make([]*graph.Transaction, 1) - trans[0] = &graph.Transaction{ - ID: s.AcquireNextId(), - Triple: t, - Action: graph.Delete, - Timestamp: time.Now(), - } - return s.ts.ApplyTransactions(trans) -} - -func init() { - graph.RegisterWriter("single", NewSingleReplication) -} diff --git a/writer/single.go b/writer/single.go new file mode 100644 index 0000000..6687055 --- /dev/null +++ b/writer/single.go @@ -0,0 +1,84 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package writer + +import ( + "sync" + "time" + + "github.com/google/cayley/graph" +) + +type Single struct { + nextID int64 + ts graph.TripleStore + mut sync.Mutex +} + +func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.TripleWriter, error) { + rep := &Single{nextID: ts.Horizon(), ts: ts} + if rep.nextID == -1 { + rep.nextID = 1 + } + return rep, nil +} + +func (s *Single) AcquireNextId() int64 { + s.mut.Lock() + defer s.mut.Unlock() + id := s.nextID + s.nextID += 1 + return id +} + +func (s *Single) AddTriple(t *graph.Triple) error { + trans := make([]*graph.Transaction, 1) + trans[0] = &graph.Transaction{ + ID: s.AcquireNextId(), + Triple: t, + Action: graph.Add, + Timestamp: time.Now(), + } + return s.ts.ApplyTransactions(trans) +} + +func (s *Single) AddTripleSet(set []*graph.Triple) error { + trans := make([]*graph.Transaction, len(set)) + for i, t := range set { + trans[i] = &graph.Transaction{ + ID: s.AcquireNextId(), + Triple: t, + Action: graph.Add, + Timestamp: time.Now(), + } + } + s.ts.ApplyTransactions(trans) + return nil +} + +func (s *Single) RemoveTriple(t *graph.Triple) error { + trans := make([]*graph.Transaction, 1) + trans[0] = &graph.Transaction{ + ID: s.AcquireNextId(), + Triple: t, + Action: graph.Delete, + Timestamp: time.Now(), + } + return s.ts.ApplyTransactions(trans) +} + +func init() { + graph.RegisterWriter("single", NewSingleReplication) +} From 81b3bf98816c2390da897bb0b60252b53f7f14f0 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Mon, 4 Aug 2014 00:44:25 -0400 Subject: [PATCH 08/18] rename to quads --- graph/quadwriter.go | 83 +++++++++++++++++++++++++++++++++++++++++++++++++++ graph/triplestore.go | 2 +- graph/triplewriter.go | 81 ------------------------------------------------- writer/single.go | 13 ++++---- 4 files changed, 91 insertions(+), 88 deletions(-) create mode 100644 graph/quadwriter.go delete mode 100644 graph/triplewriter.go diff --git a/graph/quadwriter.go b/graph/quadwriter.go new file mode 100644 index 0000000..bae0ced --- /dev/null +++ b/graph/quadwriter.go @@ -0,0 +1,83 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package graph + +// Defines the interface for consistent replication of a graph instance. +// +// Separate from the backend, this dictates how individual triples get +// identified and replicated consistently across (potentially) multiple +// instances. The simplest case is to keep an append-only log of triple +// changes. + +import ( + "errors" + "time" + + "github.com/google/cayley/quad" +) + +type Procedure byte + +// The different types of actions a transaction can do. +const ( + Add Procedure = iota + Delete +) + +type Delta struct { + ID int64 + Quad *quad.Quad + Action Procedure + Timestamp time.Time +} + +type QuadWriter interface { + // Add a triple to the store. + AddQuad(*quad.Quad) error + + // Add a set of triples to the store, atomically if possible. + AddQuadSet([]*quad.Quad) error + + // Removes a triple matching the given one from the database, + // if it exists. Does nothing otherwise. + RemoveQuad(*quad.Quad) error +} + +type NewQuadWriterFunc func(TripleStore, Options) (QuadWriter, error) + +var writerRegistry = make(map[string]NewQuadWriterFunc) + +func RegisterWriter(name string, newFunc NewQuadWriterFunc) { + if _, found := writerRegistry[name]; found { + panic("already registered TripleWriter " + name) + } + writerRegistry[name] = newFunc +} + +func NewQuadWriter(name string, ts TripleStore, opts Options) (QuadWriter, error) { + newFunc, hasNew := writerRegistry[name] + if !hasNew { + return nil, errors.New("replication: name '" + name + "' is not registered") + } + return newFunc(ts, opts) +} + +func WriterMethods() []string { + t := make([]string, 0, len(writerRegistry)) + for n := range writerRegistry { + t = append(t, n) + } + return t +} diff --git a/graph/triplestore.go b/graph/triplestore.go index ba766fa..00b1a47 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -41,7 +41,7 @@ type Value interface{} type TripleStore interface { // The only way in is through building a transaction, which // is done by a replication strategy. - ApplyTransactions([]*Transaction) error + ApplyDeltas([]*Delta) error // Given an opaque token, returns the triple for that token from the store. Quad(Value) *quad.Quad diff --git a/graph/triplewriter.go b/graph/triplewriter.go deleted file mode 100644 index 65e560e..0000000 --- a/graph/triplewriter.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package graph - -// Defines the interface for consistent replication of a graph instance. -// -// Separate from the backend, this dictates how individual triples get -// identified and replicated consistently across (potentially) multiple -// instances. The simplest case is to keep an append-only log of triple -// changes. - -import ( - "errors" - "time" -) - -type Procedure byte - -// The different types of actions a transaction can do. -const ( - Add Procedure = iota - Delete -) - -type Transaction struct { - ID int64 - Triple *Triple - Action Procedure - Timestamp time.Time -} - -type TripleWriter interface { - // Add a triple to the store. - AddTriple(*Triple) error - - // Add a set of triples to the store, atomically if possible. - AddTripleSet([]*Triple) error - - // Removes a triple matching the given one from the database, - // if it exists. Does nothing otherwise. - RemoveTriple(*Triple) error -} - -type NewTripleWriterFunc func(TripleStore, Options) (TripleWriter, error) - -var writerRegistry = make(map[string]NewTripleWriterFunc) - -func RegisterWriter(name string, newFunc NewTripleWriterFunc) { - if _, found := writerRegistry[name]; found { - panic("already registered TripleWriter " + name) - } - writerRegistry[name] = newFunc -} - -func NewTripleWriter(name string, ts TripleStore, opts Options) (TripleWriter, error) { - newFunc, hasNew := writerRegistry[name] - if !hasNew { - return nil, errors.New("replication: name '" + name + "' is not registered") - } - return newFunc(ts, opts) -} - -func WriterMethods() []string { - t := make([]string, 0, len(writerRegistry)) - for n := range writerRegistry { - t = append(t, n) - } - return t -} diff --git a/writer/single.go b/writer/single.go index 6687055..d797af1 100644 --- a/writer/single.go +++ b/writer/single.go @@ -19,6 +19,7 @@ import ( "time" "github.com/google/cayley/graph" + "github.com/google/cayley/quad" ) type Single struct { @@ -27,7 +28,7 @@ type Single struct { mut sync.Mutex } -func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.TripleWriter, error) { +func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.QuadWriter, error) { rep := &Single{nextID: ts.Horizon(), ts: ts} if rep.nextID == -1 { rep.nextID = 1 @@ -43,23 +44,23 @@ func (s *Single) AcquireNextId() int64 { return id } -func (s *Single) AddTriple(t *graph.Triple) error { +func (s *Single) AddQuad(t *quad.Quad) error { trans := make([]*graph.Transaction, 1) trans[0] = &graph.Transaction{ ID: s.AcquireNextId(), - Triple: t, + Quad: t, Action: graph.Add, Timestamp: time.Now(), } return s.ts.ApplyTransactions(trans) } -func (s *Single) AddTripleSet(set []*graph.Triple) error { +func (s *Single) AddQuadSet(set []*quad.Quad) error { trans := make([]*graph.Transaction, len(set)) for i, t := range set { trans[i] = &graph.Transaction{ ID: s.AcquireNextId(), - Triple: t, + Quad: t, Action: graph.Add, Timestamp: time.Now(), } @@ -68,7 +69,7 @@ func (s *Single) AddTripleSet(set []*graph.Triple) error { return nil } -func (s *Single) RemoveTriple(t *graph.Triple) error { +func (s *Single) RemoveQuad(t *graph.Quad) error { trans := make([]*graph.Transaction, 1) trans[0] = &graph.Transaction{ ID: s.AcquireNextId(), From dcb495d14505e2ac1ec42138fdccd86a20462924 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Mon, 4 Aug 2014 01:56:49 -0400 Subject: [PATCH 09/18] Make Memstore work with the QuadWriter --- graph/memstore/all_iterator.go | 33 +++++-- graph/memstore/iterator.go | 17 +++- graph/memstore/triplestore.go | 188 ++++++++++++++++++------------------- graph/memstore/triplestore_test.go | 19 ++-- graph/quadwriter.go | 11 ++- writer/single.go | 40 ++++---- 6 files changed, 167 insertions(+), 141 deletions(-) diff --git a/graph/memstore/all_iterator.go b/graph/memstore/all_iterator.go index 658a4a1..8e173a5 100644 --- a/graph/memstore/all_iterator.go +++ b/graph/memstore/all_iterator.go @@ -24,8 +24,11 @@ type AllIterator struct { ts *TripleStore } -func NewMemstoreAllIterator(ts *TripleStore) *AllIterator { - var out AllIterator +type NodesAllIterator AllIterator +type QuadsAllIterator AllIterator + +func NewMemstoreNodesAllIterator(ts *TripleStore) *NodesAllIterator { + var out NodesAllIterator out.Int64 = *iterator.NewInt64(1, ts.idCounter-1) out.ts = ts return &out @@ -36,15 +39,33 @@ func (it *AllIterator) SubIterators() []graph.Iterator { return nil } -func (it *AllIterator) Next() (graph.Value, bool) { - next, out := it.Int64.Next() +func (nit *NodesAllIterator) Next() (graph.Value, bool) { + next, out := nit.Int64.Next() if !out { return next, out } i64 := next.(int64) - _, ok := it.ts.revIdMap[i64] + _, ok := nit.ts.revIdMap[i64] if !ok { - return it.Next() + return nit.Next() + } + return next, out +} + +func NewMemstoreQuadsAllIterator(ts *TripleStore) *QuadsAllIterator { + var out QuadsAllIterator + out.Int64 = *iterator.NewInt64(1, ts.quadIdCounter-1) + out.ts = ts + return &out +} + +func (qit *QuadsAllIterator) Next() (graph.Value, bool) { + next, out := qit.Int64.Next() + if out { + i64 := next.(int64) + if qit.ts.log[i64].DeletedBy != 0 || qit.ts.log[i64].Action == graph.Delete { + return qit.Next() + } } return next, out } diff --git a/graph/memstore/iterator.go b/graph/memstore/iterator.go index 8a7e1ef..3ab9d34 100644 --- a/graph/memstore/iterator.go +++ b/graph/memstore/iterator.go @@ -27,6 +27,7 @@ import ( type Iterator struct { uid uint64 + ts *TripleStore tags graph.Tagger tree *llrb.LLRB data string @@ -54,9 +55,10 @@ func IterateOne(tree *llrb.LLRB, last Int64) Int64 { return next } -func NewLlrbIterator(tree *llrb.LLRB, data string) *Iterator { +func NewLlrbIterator(tree *llrb.LLRB, data string, ts *TripleStore) *Iterator { return &Iterator{ uid: iterator.NextUID(), + ts: ts, tree: tree, iterLast: Int64(-1), data: data, @@ -86,19 +88,26 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) { } func (it *Iterator) Clone() graph.Iterator { - m := NewLlrbIterator(it.tree, it.data) + m := NewLlrbIterator(it.tree, it.data, it.ts) m.tags.CopyFrom(it) return m } func (it *Iterator) Close() {} +func (it *Iterator) checkValid(index int64) bool { + return it.ts.log[index].DeletedBy == 0 +} + func (it *Iterator) Next() (graph.Value, bool) { graph.NextLogIn(it) - if it.tree.Max() == nil || it.result == int64(it.tree.Max().(Int64)) { + if it.tree.Max() == nil || it.iterLast == it.tree.Max().(Int64) { return graph.NextLogOut(it, nil, false) } it.iterLast = IterateOne(it.tree, it.iterLast) + if !it.checkValid(int64(it.iterLast)) { + return it.Next() + } it.result = int64(it.iterLast) return graph.NextLogOut(it, it.result, true) } @@ -126,7 +135,7 @@ func (it *Iterator) Size() (int64, bool) { func (it *Iterator) Contains(v graph.Value) bool { graph.ContainsLogIn(it, v) - if it.tree.Has(Int64(v.(int64))) { + if it.tree.Has(Int64(v.(int64))) && it.checkValid(v.(int64)) { it.result = v return graph.ContainsLogOut(it, v, true) } diff --git a/graph/memstore/triplestore.go b/graph/memstore/triplestore.go index 23eb11a..75152df 100644 --- a/graph/memstore/triplestore.go +++ b/graph/memstore/triplestore.go @@ -31,58 +31,63 @@ func init() { }, nil) } -type TripleDirectionIndex struct { +type QuadDirectionIndex struct { subject map[int64]*llrb.LLRB predicate map[int64]*llrb.LLRB object map[int64]*llrb.LLRB label map[int64]*llrb.LLRB } -func NewTripleDirectionIndex() *TripleDirectionIndex { - var tdi TripleDirectionIndex - tdi.subject = make(map[int64]*llrb.LLRB) - tdi.predicate = make(map[int64]*llrb.LLRB) - tdi.object = make(map[int64]*llrb.LLRB) - tdi.label = make(map[int64]*llrb.LLRB) - return &tdi +func NewQuadDirectionIndex() *QuadDirectionIndex { + var qdi QuadDirectionIndex + qdi.subject = make(map[int64]*llrb.LLRB) + qdi.predicate = make(map[int64]*llrb.LLRB) + qdi.object = make(map[int64]*llrb.LLRB) + qdi.label = make(map[int64]*llrb.LLRB) + return &qdi } -func (tdi *TripleDirectionIndex) GetForDir(d quad.Direction) map[int64]*llrb.LLRB { +func (qdi *QuadDirectionIndex) GetForDir(d quad.Direction) map[int64]*llrb.LLRB { switch d { case quad.Subject: - return tdi.subject + return qdi.subject case quad.Object: - return tdi.object + return qdi.object case quad.Predicate: - return tdi.predicate + return qdi.predicate case quad.Label: - return tdi.label + return qdi.label } panic("illegal direction") } -func (tdi *TripleDirectionIndex) GetOrCreate(d quad.Direction, id int64) *llrb.LLRB { - directionIndex := tdi.GetForDir(d) +func (qdi *QuadDirectionIndex) GetOrCreate(d quad.Direction, id int64) *llrb.LLRB { + directionIndex := qdi.GetForDir(d) if _, ok := directionIndex[id]; !ok { directionIndex[id] = llrb.New() } return directionIndex[id] } -func (tdi *TripleDirectionIndex) Get(d quad.Direction, id int64) (*llrb.LLRB, bool) { - directionIndex := tdi.GetForDir(d) +func (qdi *QuadDirectionIndex) Get(d quad.Direction, id int64) (*llrb.LLRB, bool) { + directionIndex := qdi.GetForDir(d) tree, exists := directionIndex[id] return tree, exists } +type LogEntry struct { + graph.Delta + DeletedBy int64 +} + type TripleStore struct { - idCounter int64 - tripleIdCounter int64 - idMap map[string]int64 - revIdMap map[int64]string - triples []quad.Quad - size int64 - index TripleDirectionIndex + idCounter int64 + quadIdCounter int64 + idMap map[string]int64 + revIdMap map[int64]string + log []LogEntry + size int64 + index QuadDirectionIndex // vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree } @@ -90,24 +95,32 @@ func newTripleStore() *TripleStore { var ts TripleStore ts.idMap = make(map[string]int64) ts.revIdMap = make(map[int64]string) - ts.triples = make([]quad.Quad, 1, 200) + ts.log = make([]LogEntry, 1, 200) - // Sentinel null triple so triple indices start at 1 - ts.triples[0] = quad.Quad{} - ts.size = 1 - ts.index = *NewTripleDirectionIndex() + // Sentinel null entry so indices start at 1 + ts.log[0] = LogEntry{} + ts.index = *NewQuadDirectionIndex() ts.idCounter = 1 - ts.tripleIdCounter = 1 + ts.quadIdCounter = 1 return &ts } -func (ts *TripleStore) AddTripleSet(triples []*quad.Quad) { - for _, t := range triples { - ts.AddTriple(t) +func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { + for _, d := range deltas { + var err error + if d.Action == graph.Add { + err = ts.AddDelta(d) + } else { + err = ts.RemoveDelta(d) + } + if err != nil { + return err + } } + return nil } -func (ts *TripleStore) tripleExists(t *quad.Quad) (bool, int64) { +func (ts *TripleStore) quadExists(t *quad.Quad) (bool, int64) { smallest := -1 var smallest_tree *llrb.LLRB for d := quad.Subject; d <= quad.Label; d++ { @@ -130,33 +143,34 @@ func (ts *TripleStore) tripleExists(t *quad.Quad) (bool, int64) { smallest_tree = index } } - it := NewLlrbIterator(smallest_tree, "") + it := NewLlrbIterator(smallest_tree, "", ts) for { val, ok := it.Next() if !ok { break } - if t.Equals(&ts.triples[val.(int64)]) { - return true, val.(int64) + ival := val.(int64) + if t.Equals(&ts.log[ival].Quad) { + return true, ival } } return false, 0 } -func (ts *TripleStore) AddTriple(t *quad.Quad) { - if exists, _ := ts.tripleExists(t); exists { - return +func (ts *TripleStore) AddDelta(d *graph.Delta) error { + if exists, _ := ts.quadExists(&d.Quad); exists { + return graph.ErrQuadExists } - var tripleID int64 - ts.triples = append(ts.triples, *t) - tripleID = ts.tripleIdCounter + var quadID int64 + quadID = ts.quadIdCounter + ts.log = append(ts.log, LogEntry{Delta: *d}) ts.size++ - ts.tripleIdCounter++ + ts.quadIdCounter++ - for d := quad.Subject; d <= quad.Label; d++ { - sid := t.Get(d) - if d == quad.Label && sid == "" { + for dir := quad.Subject; dir <= quad.Label; dir++ { + sid := d.Quad.Get(dir) + if dir == quad.Label && sid == "" { continue } if _, ok := ts.idMap[sid]; !ok { @@ -166,87 +180,63 @@ func (ts *TripleStore) AddTriple(t *quad.Quad) { } } - for d := quad.Subject; d <= quad.Label; d++ { - if d == quad.Label && t.Get(d) == "" { + for dir := quad.Subject; dir <= quad.Label; dir++ { + if dir == quad.Label && d.Quad.Get(dir) == "" { continue } - id := ts.idMap[t.Get(d)] - tree := ts.index.GetOrCreate(d, id) - tree.ReplaceOrInsert(Int64(tripleID)) + id := ts.idMap[d.Quad.Get(dir)] + tree := ts.index.GetOrCreate(dir, id) + tree.ReplaceOrInsert(Int64(quadID)) } // TODO(barakmich): Add VIP indexing + return nil } -func (ts *TripleStore) RemoveTriple(t *quad.Quad) { - var tripleID int64 +func (ts *TripleStore) RemoveDelta(d *graph.Delta) error { + var prevQuadID int64 var exists bool - tripleID = 0 - if exists, tripleID = ts.tripleExists(t); !exists { - return + prevQuadID = 0 + if exists, prevQuadID = ts.quadExists(&d.Quad); !exists { + return graph.ErrQuadNotExist } - ts.triples[tripleID] = quad.Quad{} + var quadID int64 + quadID = ts.quadIdCounter + ts.log = append(ts.log, LogEntry{Delta: *d}) + ts.log[prevQuadID].DeletedBy = quadID ts.size-- - - for d := quad.Subject; d <= quad.Label; d++ { - if d == quad.Label && t.Get(d) == "" { - continue - } - id := ts.idMap[t.Get(d)] - tree := ts.index.GetOrCreate(d, id) - tree.Delete(Int64(tripleID)) - } - - for d := quad.Subject; d <= quad.Label; d++ { - if d == quad.Label && t.Get(d) == "" { - continue - } - id, ok := ts.idMap[t.Get(d)] - if !ok { - continue - } - stillExists := false - for d := quad.Subject; d <= quad.Label; d++ { - if d == quad.Label && t.Get(d) == "" { - continue - } - nodeTree := ts.index.GetOrCreate(d, id) - if nodeTree.Len() != 0 { - stillExists = true - break - } - } - if !stillExists { - delete(ts.idMap, t.Get(d)) - delete(ts.revIdMap, id) - } - } + ts.quadIdCounter++ + return nil } func (ts *TripleStore) Quad(index graph.Value) *quad.Quad { - return &ts.triples[index.(int64)] + return &ts.log[index.(int64)].Quad } func (ts *TripleStore) TripleIterator(d quad.Direction, value graph.Value) graph.Iterator { index, ok := ts.index.Get(d, value.(int64)) data := fmt.Sprintf("dir:%s val:%d", d, value.(int64)) if ok { - return NewLlrbIterator(index, data) + return NewLlrbIterator(index, data, ts) } return &iterator.Null{} } +func (ts *TripleStore) Horizon() int64 { + return ts.log[len(ts.log)-1].ID +} + func (ts *TripleStore) Size() int64 { - return ts.size - 1 // Don't count the sentinel + return ts.size } func (ts *TripleStore) DebugPrint() { - for i, t := range ts.triples { + for i, l := range ts.log { if i == 0 { continue } - glog.V(2).Infof("%d: %s", i, t) + glog.V(2).Infof("%d: %#v", i, l) } } @@ -259,7 +249,7 @@ func (ts *TripleStore) NameOf(id graph.Value) string { } func (ts *TripleStore) TriplesAllIterator() graph.Iterator { - return iterator.NewInt64(0, ts.Size()) + return NewMemstoreQuadsAllIterator(ts) } func (ts *TripleStore) FixedIterator() graph.FixedIterator { @@ -272,6 +262,6 @@ func (ts *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph. } func (ts *TripleStore) NodesAllIterator() graph.Iterator { - return NewMemstoreAllIterator(ts) + return NewMemstoreNodesAllIterator(ts) } func (ts *TripleStore) Close() {} diff --git a/graph/memstore/triplestore_test.go b/graph/memstore/triplestore_test.go index 44c43b4..c711267 100644 --- a/graph/memstore/triplestore_test.go +++ b/graph/memstore/triplestore_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" "github.com/google/cayley/quad" + "github.com/google/cayley/writer" ) // This is a simple test graph. @@ -51,13 +52,14 @@ var simpleGraph = []*quad.Quad{ {"G", "status", "cool", "status_graph"}, } -func makeTestStore(data []*quad.Quad) (*TripleStore, []pair) { +func makeTestStore(data []*quad.Quad) (*TripleStore, graph.QuadWriter, []pair) { seen := make(map[string]struct{}) ts := newTripleStore() var ( val int64 ind []pair ) + writer, _ := writer.NewSingleReplication(ts, nil) for _, t := range data { for _, qp := range []string{t.Subject, t.Predicate, t.Object, t.Label} { if _, ok := seen[qp]; !ok && qp != "" { @@ -66,9 +68,10 @@ func makeTestStore(data []*quad.Quad) (*TripleStore, []pair) { seen[qp] = struct{}{} } } - ts.AddTriple(t) + + writer.AddQuad(t) } - return ts, ind + return ts, writer, ind } type pair struct { @@ -77,7 +80,7 @@ type pair struct { } func TestMemstore(t *testing.T) { - ts, index := makeTestStore(simpleGraph) + ts, _, index := makeTestStore(simpleGraph) if size := ts.Size(); size != int64(len(simpleGraph)) { t.Errorf("Triple store has unexpected size, got:%d expected %d", size, len(simpleGraph)) } @@ -95,7 +98,7 @@ func TestMemstore(t *testing.T) { } func TestIteratorsAndNextResultOrderA(t *testing.T) { - ts, _ := makeTestStore(simpleGraph) + ts, _, _ := makeTestStore(simpleGraph) fixed := ts.FixedIterator() fixed.Add(ts.ValueOf("C")) @@ -145,7 +148,7 @@ func TestIteratorsAndNextResultOrderA(t *testing.T) { } func TestLinksToOptimization(t *testing.T) { - ts, _ := makeTestStore(simpleGraph) + ts, _, _ := makeTestStore(simpleGraph) fixed := ts.FixedIterator() fixed.Add(ts.ValueOf("cool")) @@ -173,9 +176,9 @@ func TestLinksToOptimization(t *testing.T) { } func TestRemoveTriple(t *testing.T) { - ts, _ := makeTestStore(simpleGraph) + ts, w, _ := makeTestStore(simpleGraph) - ts.RemoveTriple(&quad.Quad{"E", "follows", "F", ""}) + w.RemoveQuad(&quad.Quad{"E", "follows", "F", ""}) fixed := ts.FixedIterator() fixed.Add(ts.ValueOf("E")) diff --git a/graph/quadwriter.go b/graph/quadwriter.go index bae0ced..782a2e6 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -38,19 +38,22 @@ const ( type Delta struct { ID int64 - Quad *quad.Quad + Quad quad.Quad Action Procedure Timestamp time.Time } +var ErrQuadExists = errors.New("Quad exists") +var ErrQuadNotExist = errors.New("Quad doesn't exist") + type QuadWriter interface { - // Add a triple to the store. + // Add a quad to the store. AddQuad(*quad.Quad) error - // Add a set of triples to the store, atomically if possible. + // Add a set of quads to the store, atomically if possible. AddQuadSet([]*quad.Quad) error - // Removes a triple matching the given one from the database, + // Removes a quad matching the given one from the database, // if it exists. Does nothing otherwise. RemoveQuad(*quad.Quad) error } diff --git a/writer/single.go b/writer/single.go index d797af1..642b7ee 100644 --- a/writer/single.go +++ b/writer/single.go @@ -36,48 +36,48 @@ func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.QuadW return rep, nil } -func (s *Single) AcquireNextId() int64 { +func (s *Single) AcquireNextID() int64 { s.mut.Lock() defer s.mut.Unlock() id := s.nextID - s.nextID += 1 + s.nextID++ return id } -func (s *Single) AddQuad(t *quad.Quad) error { - trans := make([]*graph.Transaction, 1) - trans[0] = &graph.Transaction{ - ID: s.AcquireNextId(), - Quad: t, +func (s *Single) AddQuad(q *quad.Quad) error { + deltas := make([]*graph.Delta, 1) + deltas[0] = &graph.Delta{ + ID: s.AcquireNextID(), + Quad: *q, Action: graph.Add, Timestamp: time.Now(), } - return s.ts.ApplyTransactions(trans) + return s.ts.ApplyDeltas(deltas) } func (s *Single) AddQuadSet(set []*quad.Quad) error { - trans := make([]*graph.Transaction, len(set)) - for i, t := range set { - trans[i] = &graph.Transaction{ - ID: s.AcquireNextId(), - Quad: t, + deltas := make([]*graph.Delta, len(set)) + for i, q := range set { + deltas[i] = &graph.Delta{ + ID: s.AcquireNextID(), + Quad: *q, Action: graph.Add, Timestamp: time.Now(), } } - s.ts.ApplyTransactions(trans) + s.ts.ApplyDeltas(deltas) return nil } -func (s *Single) RemoveQuad(t *graph.Quad) error { - trans := make([]*graph.Transaction, 1) - trans[0] = &graph.Transaction{ - ID: s.AcquireNextId(), - Triple: t, +func (s *Single) RemoveQuad(q *quad.Quad) error { + deltas := make([]*graph.Delta, 1) + deltas[0] = &graph.Delta{ + ID: s.AcquireNextID(), + Quad: *q, Action: graph.Delete, Timestamp: time.Now(), } - return s.ts.ApplyTransactions(trans) + return s.ts.ApplyDeltas(deltas) } func init() { From d4e5eead32f8674b96a5fe77b98ac7655e6975a2 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Tue, 5 Aug 2014 14:17:38 -0400 Subject: [PATCH 10/18] convert leveldb to log-structure --- graph/leveldb/iterator.go | 40 ++++--- graph/leveldb/leveldb_test.go | 20 +++- graph/leveldb/triplestore.go | 255 +++++++++++++++++++++++------------------- writer/single.go | 2 +- 4 files changed, 177 insertions(+), 140 deletions(-) diff --git a/graph/leveldb/iterator.go b/graph/leveldb/iterator.go index b434dfd..2a82765 100644 --- a/graph/leveldb/iterator.go +++ b/graph/leveldb/iterator.go @@ -16,9 +16,11 @@ package leveldb import ( "bytes" + "encoding/json" "fmt" "strings" + "github.com/barakmich/glog" ldbit "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" @@ -41,7 +43,7 @@ type Iterator struct { result graph.Value } -func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) *Iterator { +func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) graph.Iterator { vb := value.([]byte) p := make([]byte, 0, 2+qs.hasher.Size()) p = append(p, []byte(prefix)...) @@ -65,10 +67,10 @@ func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleS ok := it.iter.Seek(it.nextPrefix) if !ok { - // FIXME(kortschak) What are the semantics here? Is this iterator usable? - // If not, we should return nil *Iterator and an error. it.open = false it.iter.Release() + glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix) + return &iterator.Null{} } return &it @@ -106,7 +108,7 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) { func (it *Iterator) Clone() graph.Iterator { out := NewIterator(it.originalPrefix, it.dir, it.checkId, it.qs) - out.tags.CopyFrom(it) + out.Tagger().CopyFrom(it) return out } @@ -117,6 +119,12 @@ func (it *Iterator) Close() { } } +func (it *Iterator) isLiveValue(val []byte) bool { + var entry IndexEntry + json.Unmarshal(val, &entry) + return len(entry.History)%2 != 0 +} + func (it *Iterator) Next() (graph.Value, bool) { if it.iter == nil { it.result = nil @@ -132,6 +140,9 @@ func (it *Iterator) Next() (graph.Value, bool) { return nil, false } if bytes.HasPrefix(it.iter.Key(), it.nextPrefix) { + if !it.isLiveValue(it.iter.Value()) { + return it.Next() + } out := make([]byte, len(it.iter.Key())) copy(out, it.iter.Key()) it.result = out @@ -173,7 +184,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int { case quad.Object: return 2*qs.hasher.Size() + 2 case quad.Label: - return -1 + return 3*qs.hasher.Size() + 2 } } if bytes.Equal(prefix, []byte("po")) { @@ -185,7 +196,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int { case quad.Object: return qs.hasher.Size() + 2 case quad.Label: - return -1 + return 3*qs.hasher.Size() + 2 } } if bytes.Equal(prefix, []byte("os")) { @@ -197,7 +208,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int { case quad.Object: return 2 case quad.Label: - return -1 + return 3*qs.hasher.Size() + 2 } } if bytes.Equal(prefix, []byte("cp")) { @@ -221,16 +232,13 @@ func (it *Iterator) Contains(v graph.Value) bool { return false } offset := PositionOf(val[0:2], it.dir, it.qs) - if offset != -1 { - if bytes.HasPrefix(val[offset:], it.checkId[1:]) { - return true - } - } else { - nameForDir := it.qs.Quad(v).Get(it.dir) - hashForDir := it.qs.ValueOf(nameForDir).([]byte) - if bytes.Equal(hashForDir, it.checkId) { - return true + if bytes.HasPrefix(val[offset:], it.checkId[1:]) { + data, err := it.qs.db.Get(val, it.ro) + if err != nil { + glog.Error("Couldn't get data for key ", val, " in iterator ", it.UID(), " failing Contains.") + return false } + return it.isLiveValue(data) } return false } diff --git a/graph/leveldb/leveldb_test.go b/graph/leveldb/leveldb_test.go index 0e6772a..f1c9112 100644 --- a/graph/leveldb/leveldb_test.go +++ b/graph/leveldb/leveldb_test.go @@ -24,6 +24,7 @@ import ( "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" "github.com/google/cayley/quad" + "github.com/google/cayley/writer" ) func makeTripleSet() []*quad.Quad { @@ -143,7 +144,8 @@ func TestLoadDatabase(t *testing.T) { t.Error("Failed to create leveldb TripleStore.") } - qs.AddTriple(&quad.Quad{"Something", "points_to", "Something Else", "context"}) + w, _ := writer.NewSingleReplication(qs, nil) + w.AddQuad(&quad.Quad{"Something", "points_to", "Something Else", "context"}) for _, pq := range []string{"Something", "points_to", "Something Else", "context"} { if got := qs.NameOf(qs.ValueOf(pq)); got != pq { t.Errorf("Failed to roundtrip %q, got:%q expect:%q", pq, got, pq) @@ -162,13 +164,14 @@ func TestLoadDatabase(t *testing.T) { if qs == nil || err != nil { t.Error("Failed to create leveldb TripleStore.") } + w, _ = writer.NewSingleReplication(qs, nil) ts2, didConvert := qs.(*TripleStore) if !didConvert { t.Errorf("Could not convert from generic to LevelDB TripleStore") } - qs.AddTripleSet(makeTripleSet()) + w.AddQuadSet(makeTripleSet()) if s := qs.Size(); s != 11 { t.Errorf("Unexpected triplestore size, got:%d expect:11", s) } @@ -176,7 +179,7 @@ func TestLoadDatabase(t *testing.T) { t.Errorf("Unexpected triplestore size, got:%d expect:5", s) } - qs.RemoveTriple(&quad.Quad{"A", "follows", "B", ""}) + w.RemoveQuad(&quad.Quad{"A", "follows", "B", ""}) if s := qs.Size(); s != 10 { t.Errorf("Unexpected triplestore size after RemoveTriple, got:%d expect:10", s) } @@ -204,7 +207,9 @@ func TestIterator(t *testing.T) { if qs == nil || err != nil { t.Error("Failed to create leveldb TripleStore.") } - qs.AddTripleSet(makeTripleSet()) + + w, _ := writer.NewSingleReplication(qs, nil) + w.AddQuadSet(makeTripleSet()) var it graph.Iterator it = qs.NodesAllIterator() @@ -299,7 +304,8 @@ func TestSetIterator(t *testing.T) { } defer qs.Close() - qs.AddTripleSet(makeTripleSet()) + w, _ := writer.NewSingleReplication(qs, nil) + w.AddQuadSet(makeTripleSet()) expect := []*quad.Quad{ {"C", "follows", "B", ""}, @@ -411,7 +417,9 @@ func TestOptimize(t *testing.T) { if qs == nil || err != nil { t.Error("Failed to create leveldb TripleStore.") } - qs.AddTripleSet(makeTripleSet()) + + w, _ := writer.NewSingleReplication(qs, nil) + w.AddQuadSet(makeTripleSet()) // With an linksto-fixed pair fixed := qs.FixedIterator() diff --git a/graph/leveldb/triplestore.go b/graph/leveldb/triplestore.go index 8092d96..3a4d16e 100644 --- a/graph/leveldb/triplestore.go +++ b/graph/leveldb/triplestore.go @@ -19,6 +19,7 @@ import ( "crypto/sha1" "encoding/binary" "encoding/json" + "errors" "fmt" "hash" @@ -48,6 +49,7 @@ type TripleStore struct { path string open bool size int64 + horizon int64 hasher hash.Hash writeopts *opt.WriteOptions readopts *opt.ReadOptions @@ -72,6 +74,7 @@ func createNewLevelDB(path string, _ graph.Options) error { func newTripleStore(path string, options graph.Options) (graph.TripleStore, error) { var qs TripleStore + var err error qs.path = path cache_size := DefaultCacheSize if val, ok := options.IntKey("cache_size_mb"); ok { @@ -94,11 +97,15 @@ func newTripleStore(path string, options graph.Options) (graph.TripleStore, erro qs.readopts = &opt.ReadOptions{} db, err := leveldb.OpenFile(qs.path, qs.dbOpts) if err != nil { - panic("Error, couldn't open! " + err.Error()) + glog.Errorln("Error, couldn't open! ", err) + return nil, err } qs.db = db glog.Infoln(qs.GetStats()) - qs.getSize() + err = qs.getMetadata() + if err != nil { + return nil, err + } return &qs, nil } @@ -116,24 +123,25 @@ func (qs *TripleStore) Size() int64 { return qs.size } -func (qs *TripleStore) createKeyFor(d [3]quad.Direction, triple *quad.Quad) []byte { - key := make([]byte, 0, 2+(qs.hasher.Size()*3)) +func (qs *TripleStore) Horizon() int64 { + return qs.horizon +} + +func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte { + key := make([]byte, 0, 19) + key = append(key, 'd') + key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...) + return key +} + +func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple *quad.Quad) []byte { + key := make([]byte, 0, 2+(qs.hasher.Size()*4)) // TODO(kortschak) Remove dependence on String() method. key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...) key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) - return key -} - -func (qs *TripleStore) createProvKeyFor(d [3]quad.Direction, triple *quad.Quad) []byte { - key := make([]byte, 0, 2+(qs.hasher.Size()*4)) - // TODO(kortschak) Remove dependence on String() method. - key = append(key, []byte{quad.Label.Prefix(), d[0].Prefix()}...) - key = append(key, qs.convertStringToByteHash(triple.Get(quad.Label))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) return key } @@ -144,76 +152,98 @@ func (qs *TripleStore) createValueKeyFor(s string) []byte { return key } -func (qs *TripleStore) AddTriple(t *quad.Quad) { - batch := &leveldb.Batch{} - qs.buildWrite(batch, t) - err := qs.db.Write(batch, qs.writeopts) - if err != nil { - glog.Errorf("Couldn't write to DB for triple %s.", t) - return - } - qs.size++ +type IndexEntry struct { + *quad.Quad + History []int64 } // Short hand for direction permutations. var ( - spo = [3]quad.Direction{quad.Subject, quad.Predicate, quad.Object} - osp = [3]quad.Direction{quad.Object, quad.Subject, quad.Predicate} - pos = [3]quad.Direction{quad.Predicate, quad.Object, quad.Subject} - pso = [3]quad.Direction{quad.Predicate, quad.Subject, quad.Object} + spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} + osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} + pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} + cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} ) -func (qs *TripleStore) RemoveTriple(t *quad.Quad) { - _, err := qs.db.Get(qs.createKeyFor(spo, t), qs.readopts) - if err != nil && err != leveldb.ErrNotFound { - glog.Error("Couldn't access DB to confirm deletion") - return - } - if err == leveldb.ErrNotFound { - // No such triple in the database, forget about it. - return - } +func (qs *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { batch := &leveldb.Batch{} - batch.Delete(qs.createKeyFor(spo, t)) - batch.Delete(qs.createKeyFor(osp, t)) - batch.Delete(qs.createKeyFor(pos, t)) - qs.UpdateValueKeyBy(t.Get(quad.Subject), -1, batch) - qs.UpdateValueKeyBy(t.Get(quad.Predicate), -1, batch) - qs.UpdateValueKeyBy(t.Get(quad.Object), -1, batch) - if t.Get(quad.Label) != "" { - batch.Delete(qs.createProvKeyFor(pso, t)) - qs.UpdateValueKeyBy(t.Get(quad.Label), -1, batch) + resizeMap := make(map[string]int64) + size_change := int64(0) + for _, d := range deltas { + bytes, err := json.Marshal(d) + if err != nil { + return err + } + batch.Put(qs.createDeltaKeyFor(d), bytes) + err = qs.buildQuadWrite(batch, &d.Quad, d.ID, d.Action == graph.Add) + if err != nil { + return err + } + delta := int64(1) + if d.Action == graph.Delete { + delta = int64(-1) + } + resizeMap[d.Quad.Subject] += delta + resizeMap[d.Quad.Predicate] += delta + resizeMap[d.Quad.Object] += delta + if d.Quad.Label != "" { + resizeMap[d.Quad.Label] += delta + } + size_change += delta + qs.horizon = d.ID } - err = qs.db.Write(batch, nil) + for k, v := range resizeMap { + if v != 0 { + err := qs.UpdateValueKeyBy(k, v, batch) + if err != nil { + return err + } + } + } + err := qs.db.Write(batch, qs.writeopts) if err != nil { - glog.Errorf("Couldn't delete triple %s.", t) - return + glog.Error("Couldn't write to DB for tripleset.") + return err } - qs.size-- + qs.size += size_change + return nil } -func (qs *TripleStore) buildTripleWrite(batch *leveldb.Batch, t *quad.Quad) { - bytes, err := json.Marshal(*t) - if err != nil { - glog.Errorf("Couldn't write to buffer for triple %s: %s", t, err) - return +func (qs *TripleStore) buildQuadWrite(batch *leveldb.Batch, q *quad.Quad, id int64, isAdd bool) error { + var entry IndexEntry + data, err := qs.db.Get(qs.createKeyFor(spo, q), qs.readopts) + if err != nil && err != leveldb.ErrNotFound { + glog.Error("Couldn't access DB to prepare index: ", err) + return err } - batch.Put(qs.createKeyFor(spo, t), bytes) - batch.Put(qs.createKeyFor(osp, t), bytes) - batch.Put(qs.createKeyFor(pos, t), bytes) - if t.Get(quad.Label) != "" { - batch.Put(qs.createProvKeyFor(pso, t), bytes) + if err == nil { + // We got something. + err = json.Unmarshal(data, &entry) + if err != nil { + return err + } + } else { + entry.Quad = q } -} + entry.History = append(entry.History, id) -func (qs *TripleStore) buildWrite(batch *leveldb.Batch, t *quad.Quad) { - qs.buildTripleWrite(batch, t) - qs.UpdateValueKeyBy(t.Get(quad.Subject), 1, nil) - qs.UpdateValueKeyBy(t.Get(quad.Predicate), 1, nil) - qs.UpdateValueKeyBy(t.Get(quad.Object), 1, nil) - if t.Get(quad.Label) != "" { - qs.UpdateValueKeyBy(t.Get(quad.Label), 1, nil) + if isAdd && len(entry.History)%2 == 0 { + glog.Error("Entry History is out of sync for", entry) + return errors.New("Odd index history") } + + bytes, err := json.Marshal(entry) + if err != nil { + glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) + return err + } + batch.Put(qs.createKeyFor(spo, q), bytes) + batch.Put(qs.createKeyFor(osp, q), bytes) + batch.Put(qs.createKeyFor(pos, q), bytes) + if q.Get(quad.Label) != "" { + batch.Put(qs.createKeyFor(cps, q), bytes) + } + return nil } type ValueData struct { @@ -221,15 +251,15 @@ type ValueData struct { Size int64 } -func (qs *TripleStore) UpdateValueKeyBy(name string, amount int, batch *leveldb.Batch) { - value := &ValueData{name, int64(amount)} +func (qs *TripleStore) UpdateValueKeyBy(name string, amount int64, batch *leveldb.Batch) error { + value := &ValueData{name, amount} key := qs.createValueKeyFor(name) b, err := qs.db.Get(key, qs.readopts) // Error getting the node from the database. if err != nil && err != leveldb.ErrNotFound { glog.Errorf("Error reading Value %s from the DB.", name) - return + return err } // Node exists in the database -- unmarshal and update. @@ -237,58 +267,28 @@ func (qs *TripleStore) UpdateValueKeyBy(name string, amount int, batch *leveldb. err = json.Unmarshal(b, value) if err != nil { glog.Errorf("Error: couldn't reconstruct value: %v", err) - return + return err } - value.Size += int64(amount) + value.Size += amount } // Are we deleting something? - if amount < 0 { - if value.Size <= 0 { - if batch == nil { - qs.db.Delete(key, qs.writeopts) - } else { - batch.Delete(key) - } - return - } + if value.Size <= 0 { + value.Size = 0 } // Repackage and rewrite. bytes, err := json.Marshal(&value) if err != nil { glog.Errorf("Couldn't write to buffer for value %s: %s", name, err) - return + return err } if batch == nil { qs.db.Put(key, bytes, qs.writeopts) } else { batch.Put(key, bytes) } -} - -func (qs *TripleStore) AddTripleSet(t_s []*quad.Quad) { - batch := &leveldb.Batch{} - newTs := len(t_s) - resizeMap := make(map[string]int) - for _, t := range t_s { - qs.buildTripleWrite(batch, t) - resizeMap[t.Subject]++ - resizeMap[t.Predicate]++ - resizeMap[t.Object]++ - if t.Label != "" { - resizeMap[t.Label]++ - } - } - for k, v := range resizeMap { - qs.UpdateValueKeyBy(k, v, batch) - } - err := qs.db.Write(batch, qs.writeopts) - if err != nil { - glog.Error("Couldn't write to DB for tripleset.") - return - } - qs.size += int64(newTs) + return nil } func (qs *TripleStore) Close() { @@ -302,6 +302,16 @@ func (qs *TripleStore) Close() { } else { glog.Errorf("Couldn't convert size before closing!") } + buf.Reset() + err = binary.Write(buf, binary.LittleEndian, qs.horizon) + if err == nil { + werr := qs.db.Put([]byte("__horizon"), buf.Bytes(), qs.writeopts) + if werr != nil { + glog.Error("Couldn't write horizon before closing!") + } + } else { + glog.Errorf("Couldn't convert horizon before closing!") + } qs.db.Close() qs.open = false } @@ -372,23 +382,34 @@ func (qs *TripleStore) SizeOf(k graph.Value) int64 { return int64(qs.valueData(k.([]byte)).Size) } -func (qs *TripleStore) getSize() { - var size int64 - b, err := qs.db.Get([]byte("__size"), qs.readopts) +func (qs *TripleStore) getInt64ForKey(key string, empty int64) (int64, error) { + var out int64 + b, err := qs.db.Get([]byte(key), qs.readopts) if err != nil && err != leveldb.ErrNotFound { - panic("Couldn't read size " + err.Error()) + glog.Errorln("Couldn't read " + key + ": " + err.Error()) + return 0, err } if err == leveldb.ErrNotFound { // Must be a new database. Cool - qs.size = 0 - return + return empty, nil } buf := bytes.NewBuffer(b) - err = binary.Read(buf, binary.LittleEndian, &size) + err = binary.Read(buf, binary.LittleEndian, &out) if err != nil { - glog.Errorln("Error: couldn't parse size") + glog.Errorln("Error: couldn't parse", key) + return 0, err } - qs.size = size + return out, nil +} + +func (qs *TripleStore) getMetadata() error { + var err error + qs.size, err = qs.getInt64ForKey("__size", 0) + if err != nil { + return err + } + qs.horizon, err = qs.getInt64ForKey("__horizon", 0) + return err } func (qs *TripleStore) SizeOfPrefix(pre []byte) (int64, error) { diff --git a/writer/single.go b/writer/single.go index 642b7ee..a455e02 100644 --- a/writer/single.go +++ b/writer/single.go @@ -30,7 +30,7 @@ type Single struct { func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.QuadWriter, error) { rep := &Single{nextID: ts.Horizon(), ts: ts} - if rep.nextID == -1 { + if rep.nextID <= 0 { rep.nextID = 1 } return rep, nil From c3bd1644a1cad594cffe37f532e9156b0061d8d8 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Tue, 5 Aug 2014 16:28:47 -0400 Subject: [PATCH 11/18] speedup and cleanup --- graph/leveldb/iterator.go | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/graph/leveldb/iterator.go b/graph/leveldb/iterator.go index 2a82765..30b11b0 100644 --- a/graph/leveldb/iterator.go +++ b/graph/leveldb/iterator.go @@ -233,12 +233,16 @@ func (it *Iterator) Contains(v graph.Value) bool { } offset := PositionOf(val[0:2], it.dir, it.qs) if bytes.HasPrefix(val[offset:], it.checkId[1:]) { - data, err := it.qs.db.Get(val, it.ro) - if err != nil { - glog.Error("Couldn't get data for key ", val, " in iterator ", it.UID(), " failing Contains.") - return false - } - return it.isLiveValue(data) + // You may ask, why don't we check to see if it's a valid (not deleted) triple + // again? + // + // We've already done that -- in order to get the graph.Value token in the + // first place, we had to have done the check already; it came from a Next(). + // + // However, if it ever starts coming from somewhere else, it'll be more + // efficient to change the interface of the graph.Value for LevelDB to a + // struct with a flag for isValid, to save another random read. + return true } return false } @@ -249,7 +253,15 @@ func (it *Iterator) Size() (int64, bool) { func (it *Iterator) DebugString(indent int) string { size, _ := it.Size() - return fmt.Sprintf("%s(%s %d tags: %v dir: %s size:%d %s)", strings.Repeat(" ", indent), it.Type(), it.UID(), it.tags.Tags(), it.dir, size, it.qs.NameOf(it.checkId)) + return fmt.Sprintf("%s(%s %d tags: %v dir: %s size:%d %s)", + strings.Repeat(" ", indent), + it.Type(), + it.UID(), + it.tags.Tags(), + it.dir, + size, + it.qs.NameOf(it.checkId), + ) } var levelDBType graph.Type From 6d4738cf0ce78c8bf62d1e73289077366d0b5268 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Wed, 6 Aug 2014 16:24:31 -0400 Subject: [PATCH 12/18] convert to using real quads --- graph/quadwriter.go | 6 +++--- writer/single.go | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/graph/quadwriter.go b/graph/quadwriter.go index 782a2e6..5cfd171 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -48,14 +48,14 @@ var ErrQuadNotExist = errors.New("Quad doesn't exist") type QuadWriter interface { // Add a quad to the store. - AddQuad(*quad.Quad) error + AddQuad(quad.Quad) error // Add a set of quads to the store, atomically if possible. - AddQuadSet([]*quad.Quad) error + AddQuadSet([]quad.Quad) error // Removes a quad matching the given one from the database, // if it exists. Does nothing otherwise. - RemoveQuad(*quad.Quad) error + RemoveQuad(quad.Quad) error } type NewQuadWriterFunc func(TripleStore, Options) (QuadWriter, error) diff --git a/writer/single.go b/writer/single.go index a455e02..51641ca 100644 --- a/writer/single.go +++ b/writer/single.go @@ -44,23 +44,23 @@ func (s *Single) AcquireNextID() int64 { return id } -func (s *Single) AddQuad(q *quad.Quad) error { +func (s *Single) AddQuad(q quad.Quad) error { deltas := make([]*graph.Delta, 1) deltas[0] = &graph.Delta{ ID: s.AcquireNextID(), - Quad: *q, + Quad: q, Action: graph.Add, Timestamp: time.Now(), } return s.ts.ApplyDeltas(deltas) } -func (s *Single) AddQuadSet(set []*quad.Quad) error { +func (s *Single) AddQuadSet(set []quad.Quad) error { deltas := make([]*graph.Delta, len(set)) for i, q := range set { deltas[i] = &graph.Delta{ ID: s.AcquireNextID(), - Quad: *q, + Quad: q, Action: graph.Add, Timestamp: time.Now(), } @@ -69,11 +69,11 @@ func (s *Single) AddQuadSet(set []*quad.Quad) error { return nil } -func (s *Single) RemoveQuad(q *quad.Quad) error { +func (s *Single) RemoveQuad(q quad.Quad) error { deltas := make([]*graph.Delta, 1) deltas[0] = &graph.Delta{ ID: s.AcquireNextID(), - Quad: *q, + Quad: q, Action: graph.Delete, Timestamp: time.Now(), } From 8821c1968df8564e76290e708e8dd48f135b522a Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sun, 10 Aug 2014 18:17:38 -0400 Subject: [PATCH 13/18] add config options and graph.Handle --- config/config.go | 91 ++++++++++++++++++++++++++++++----------------------- db/db.go | 26 +++++++++++++-- graph/quadwriter.go | 5 +++ 3 files changed, 81 insertions(+), 41 deletions(-) diff --git a/config/config.go b/config/config.go index d8cb4d8..ff06e7c 100644 --- a/config/config.go +++ b/config/config.go @@ -26,25 +26,29 @@ import ( ) type Config struct { - DatabaseType string - DatabasePath string - DatabaseOptions map[string]interface{} - ListenHost string - ListenPort string - ReadOnly bool - Timeout time.Duration - LoadSize int + DatabaseType string + DatabasePath string + DatabaseOptions map[string]interface{} + ReplicationType string + ReplicationOptions map[string]interface{} + ListenHost string + ListenPort string + ReadOnly bool + Timeout time.Duration + LoadSize int } type config struct { - DatabaseType string `json:"database"` - DatabasePath string `json:"db_path"` - DatabaseOptions map[string]interface{} `json:"db_options"` - ListenHost string `json:"listen_host"` - ListenPort string `json:"listen_port"` - ReadOnly bool `json:"read_only"` - Timeout duration `json:"timeout"` - LoadSize int `json:"load_size"` + DatabaseType string `json:"database"` + DatabasePath string `json:"db_path"` + DatabaseOptions map[string]interface{} `json:"db_options"` + ReplicationType string `json:"replication"` + ReplicationOptions map[string]interface{} `json:"replication_options"` + ListenHost string `json:"listen_host"` + ListenPort string `json:"listen_port"` + ReadOnly bool `json:"read_only"` + Timeout duration `json:"timeout"` + LoadSize int `json:"load_size"` } func (c *Config) UnmarshalJSON(data []byte) error { @@ -54,28 +58,32 @@ func (c *Config) UnmarshalJSON(data []byte) error { return err } *c = Config{ - DatabaseType: t.DatabaseType, - DatabasePath: t.DatabasePath, - DatabaseOptions: t.DatabaseOptions, - ListenHost: t.ListenHost, - ListenPort: t.ListenPort, - ReadOnly: t.ReadOnly, - Timeout: time.Duration(t.Timeout), - LoadSize: t.LoadSize, + DatabaseType: t.DatabaseType, + DatabasePath: t.DatabasePath, + DatabaseOptions: t.DatabaseOptions, + ReplicationType: t.ReplicationType, + ReplicationOptions: t.ReplicationOptions, + ListenHost: t.ListenHost, + ListenPort: t.ListenPort, + ReadOnly: t.ReadOnly, + Timeout: time.Duration(t.Timeout), + LoadSize: t.LoadSize, } return nil } func (c *Config) MarshalJSON() ([]byte, error) { return json.Marshal(config{ - DatabaseType: c.DatabaseType, - DatabasePath: c.DatabasePath, - DatabaseOptions: c.DatabaseOptions, - ListenHost: c.ListenHost, - ListenPort: c.ListenPort, - ReadOnly: c.ReadOnly, - Timeout: duration(c.Timeout), - LoadSize: c.LoadSize, + DatabaseType: c.DatabaseType, + DatabasePath: c.DatabasePath, + DatabaseOptions: c.DatabaseOptions, + ReplicationType: c.ReplicationType, + ReplicationOptions: c.ReplicationOptions, + ListenHost: c.ListenHost, + ListenPort: c.ListenPort, + ReadOnly: c.ReadOnly, + Timeout: duration(c.Timeout), + LoadSize: c.LoadSize, }) } @@ -115,13 +123,14 @@ func (d *duration) MarshalJSON() ([]byte, error) { } var ( - databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.") - databaseBackend = flag.String("db", "memstore", "Database Backend.") - host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).") - loadSize = flag.Int("load_size", 10000, "Size of triplesets to load") - port = flag.String("port", "64210", "Port to listen on.") - readOnly = flag.Bool("read_only", false, "Disable writing via HTTP.") - timeout = flag.Duration("timeout", 30*time.Second, "Elapsed time until an individual query times out.") + databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.") + databaseBackend = flag.String("db", "memstore", "Database Backend.") + replicationBackend = flag.String("replication", "single", "Replication method.") + host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).") + loadSize = flag.Int("load_size", 10000, "Size of triplesets to load") + port = flag.String("port", "64210", "Port to listen on.") + readOnly = flag.Bool("read_only", false, "Disable writing via HTTP.") + timeout = flag.Duration("timeout", 30*time.Second, "Elapsed time until an individual query times out.") ) func ParseConfigFromFile(filename string) *Config { @@ -175,6 +184,10 @@ func ParseConfigFromFlagsAndFile(fileFlag string) *Config { config.DatabaseType = *databaseBackend } + if config.ReplicationType == "" { + config.ReplicationType = *replicationBackend + } + if config.ListenHost == "" { config.ListenHost = *host } diff --git a/db/db.go b/db/db.go index 8ea30db..10f6898 100644 --- a/db/db.go +++ b/db/db.go @@ -36,8 +36,20 @@ func Init(cfg *config.Config) error { return graph.InitTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions) } -func Open(cfg *config.Config) (graph.TripleStore, error) { - glog.Infof("Opening database %q at %s", cfg.DatabaseType, cfg.DatabasePath) +func Open(cfg *config.Config) (*graph.Handle, error) { + qs, err := OpenQuadStore(cfg) + if err != nil { + return nil, err + } + qw, err := OpenQuadWriter(qs, cfg) + if err != nil { + return nil, err + } + return &graph.Handle{QuadStore: qs, QuadWriter: qw}, nil +} + +func OpenQuadStore(cfg *config.Config) (graph.TripleStore, error) { + glog.Infof("Opening quad store %q at %s", cfg.DatabaseType, cfg.DatabasePath) ts, err := graph.NewTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions) if err != nil { return nil, err @@ -46,6 +58,16 @@ func Open(cfg *config.Config) (graph.TripleStore, error) { return ts, nil } +func OpenQuadWriter(qs graph.TripleStore, cfg *config.Config) (graph.QuadWriter, error) { + glog.Infof("Opening replication method %q", cfg.ReplicationType) + w, err := graph.NewQuadWriter(cfg.ReplicationType, qs, cfg.ReplicationOptions) + if err != nil { + return nil, err + } + + return w, nil +} + func Load(ts graph.TripleStore, cfg *config.Config, dec quad.Unmarshaler) error { bulker, canBulk := ts.(graph.BulkLoader) if canBulk { diff --git a/graph/quadwriter.go b/graph/quadwriter.go index 5cfd171..bc34555 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -43,6 +43,11 @@ type Delta struct { Timestamp time.Time } +type Handle struct { + QuadStore TripleStore + QuadWriter QuadWriter +} + var ErrQuadExists = errors.New("Quad exists") var ErrQuadNotExist = errors.New("Quad doesn't exist") From ff148f58f898e0d20f651323979456bef2073ae2 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sun, 10 Aug 2014 19:35:26 -0400 Subject: [PATCH 14/18] first swing at mongo indexing (iterator todo) --- graph/mongo/triplestore.go | 262 +++++++++++++++++++-------------------------- writer/single.go | 8 +- 2 files changed, 117 insertions(+), 153 deletions(-) diff --git a/graph/mongo/triplestore.go b/graph/mongo/triplestore.go index 364d195..79fe38d 100644 --- a/graph/mongo/triplestore.go +++ b/graph/mongo/triplestore.go @@ -18,7 +18,6 @@ import ( "crypto/sha1" "encoding/hex" "hash" - "io" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" @@ -33,9 +32,6 @@ func init() { graph.RegisterTripleStore("mongo", true, newTripleStore, createNewMongoGraph) } -// Guarantee we satisfy graph.Bulkloader. -var _ graph.BulkLoader = (*TripleStore)(nil) - const DefaultDBName = "cayley" type TripleStore struct { @@ -63,13 +59,21 @@ func createNewMongoGraph(addr string, options graph.Options) error { Background: true, Sparse: true, } - db.C("triples").EnsureIndex(indexOpts) + db.C("quads").EnsureIndex(indexOpts) indexOpts.Key = []string{"Pred"} - db.C("triples").EnsureIndex(indexOpts) + db.C("quads").EnsureIndex(indexOpts) indexOpts.Key = []string{"Obj"} - db.C("triples").EnsureIndex(indexOpts) + db.C("quads").EnsureIndex(indexOpts) indexOpts.Key = []string{"Label"} - db.C("triples").EnsureIndex(indexOpts) + db.C("quads").EnsureIndex(indexOpts) + logOpts := mgo.Index{ + Key: []string{"LogID"}, + Unique: true, + DropDups: false, + Background: true, + Sparse: true, + } + db.C("log").EnsureIndex(logOpts) return nil } @@ -113,114 +117,121 @@ type MongoNode struct { Size int "Size" } -func (qs *TripleStore) updateNodeBy(node_name string, inc int) { - var size MongoNode - node := qs.ValueOf(node_name) - err := qs.db.C("nodes").FindId(node).One(&size) - if err != nil { - if err.Error() == "not found" { - // Not found. Okay. - size.Id = node.(string) - size.Name = node_name - size.Size = inc - } else { - glog.Errorf("Error: %v", err) - return - } - } else { - size.Id = node.(string) - size.Name = node_name - size.Size += inc - } - - // Removing something... - if inc < 0 { - if size.Size <= 0 { - err := qs.db.C("nodes").RemoveId(node) - if err != nil { - glog.Errorf("Error: %v while removing node %s", err, node_name) - return - } - } - } - - _, err2 := qs.db.C("nodes").UpsertId(node, size) - if err2 != nil { - glog.Errorf("Error: %v", err) - } +type MongoLogEntry struct { + LogID int64 "LogID" + Action string "Action" + Key string "Key" + Timestamp int64 } -func (qs *TripleStore) writeTriple(t quad.Quad) bool { +func (qs *TripleStore) updateNodeBy(node_name string, inc int) error { + node := qs.ValueOf(node_name) + doc := bson.M{ + "_id": node.(string), + "Name": node_name, + } + upsert := bson.M{ + "$setOnInsert": doc, + "$inc": bson.M{ + "Size": inc, + }, + } + + _, err := qs.db.C("nodes").UpsertId(node, upsert) + if err != nil { + glog.Errorf("Error updating node: %v", err) + } + return err +} + +func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) error { + var setname string + if proc == graph.Add { + setname = "Added" + } else if proc == graph.Delete { + setname = "Deleted" + } tripledoc := bson.M{ - "_id": qs.getIdForTriple(t), "Subject": t.Subject, "Predicate": t.Predicate, "Object": t.Object, "Label": t.Label, } - err := qs.db.C("triples").Insert(tripledoc) + upsert := bson.M{ + "$setOnInsert": tripledoc, + "$push": bson.M{ + setname: id, + }, + } + _, err := qs.db.C("quads").UpsertId(qs.getIdForTriple(t), upsert) if err != nil { - // Among the reasons I hate MongoDB. "Errors don't happen! Right guys?" - if err.(*mgo.LastError).Code == 11000 { - return false - } glog.Errorf("Error: %v", err) - return false } - return true + return err } -func (qs *TripleStore) AddTriple(t quad.Quad) { - _ = qs.writeTriple(t) - qs.updateNodeBy(t.Subject, 1) - qs.updateNodeBy(t.Predicate, 1) - qs.updateNodeBy(t.Object, 1) - if t.Label != "" { - qs.updateNodeBy(t.Label, 1) +func (qs *TripleStore) updateLog(d *graph.Delta) error { + var action string + if d.Action == graph.Add { + action = "Add" + } else { + action = "Delete" } + entry := MongoLogEntry{ + LogID: d.ID, + Action: action, + Key: qs.getIdForTriple(d.Quad), + Timestamp: d.Timestamp.UnixNano(), + } + err := qs.db.C("log").Insert(entry) + if err != nil { + glog.Errorf("Error updating log: %v", err) + } + return err } -func (qs *TripleStore) AddTripleSet(in []quad.Quad) { +func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { qs.session.SetSafe(nil) ids := make(map[string]int) - for _, t := range in { - wrote := qs.writeTriple(t) - if wrote { - ids[t.Subject]++ - ids[t.Object]++ - ids[t.Predicate]++ - if t.Label != "" { - ids[t.Label]++ - } + for _, d := range in { + err := qs.updateLog(d) + if err != nil { + return err + } + } + for _, d := range in { + err := qs.updateTriple(d.Quad, d.ID, d.Action) + if err != nil { + return err + } + var countdelta int + if d.Action == graph.Add { + countdelta = 1 + } else { + countdelta = -1 + } + ids[d.Quad.Subject] += countdelta + ids[d.Quad.Object] += countdelta + ids[d.Quad.Predicate] += countdelta + if d.Quad.Label != "" { + ids[d.Quad.Label] += countdelta } } for k, v := range ids { - qs.updateNodeBy(k, v) + err := qs.updateNodeBy(k, v) + if err != nil { + return err + } } qs.session.SetSafe(&mgo.Safe{}) -} - -func (qs *TripleStore) RemoveTriple(t quad.Quad) { - err := qs.db.C("triples").RemoveId(qs.getIdForTriple(t)) - if err == mgo.ErrNotFound { - return - } else if err != nil { - glog.Errorf("Error: %v while removing triple %v", err, t) - return - } - qs.updateNodeBy(t.Subject, -1) - qs.updateNodeBy(t.Predicate, -1) - qs.updateNodeBy(t.Object, -1) - if t.Label != "" { - qs.updateNodeBy(t.Label, -1) - } + return nil } func (qs *TripleStore) Quad(val graph.Value) quad.Quad { var bsonDoc bson.M - err := qs.db.C("triples").FindId(val.(string)).One(&bsonDoc) + err := qs.db.C("quads").FindId(val.(string)).One(&bsonDoc) if err != nil { - glog.Errorf("Error: Couldn't retrieve triple %s %v", val, err) + glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err) } return quad.Quad{ bsonDoc["Subject"].(string), @@ -231,7 +242,7 @@ func (qs *TripleStore) Quad(val graph.Value) quad.Quad { } func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { - return NewIterator(qs, "triples", d, val) + return NewIterator(qs, "quads", d, val) } func (qs *TripleStore) NodesAllIterator() graph.Iterator { @@ -239,7 +250,7 @@ func (qs *TripleStore) NodesAllIterator() graph.Iterator { } func (qs *TripleStore) TriplesAllIterator() graph.Iterator { - return NewAllIterator(qs, "triples") + return NewAllIterator(qs, "quads") } func (qs *TripleStore) ValueOf(s string) graph.Value { @@ -261,7 +272,8 @@ func (qs *TripleStore) NameOf(v graph.Value) string { } func (qs *TripleStore) Size() int64 { - count, err := qs.db.C("triples").Count() + // TODO(barakmich): Make size real; store it in the log, and retrieve it. + count, err := qs.db.C("quads").Count() if err != nil { glog.Errorf("Error: %v", err) return 0 @@ -269,6 +281,15 @@ func (qs *TripleStore) Size() int64 { return int64(count) } +func (qs *TripleStore) Horizon() int64 { + var log MongoLogEntry + err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log) + if err != nil { + glog.Errorf("Could not get Horizon from Mongo: %v", err) + } + return log.LogID +} + func compareStrings(a, b graph.Value) bool { return a.(string) == b.(string) } @@ -298,61 +319,4 @@ func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.V return val } -func (qs *TripleStore) BulkLoad(dec quad.Unmarshaler) error { - if qs.Size() != 0 { - return graph.ErrCannotBulkLoad - } - - qs.session.SetSafe(nil) - for { - q, err := dec.Unmarshal() - if err != nil { - if err != io.EOF { - return err - } - break - } - qs.writeTriple(q) - } - - outputTo := bson.M{"replace": "nodes", "sharded": true} - glog.Infoln("Mapreducing") - job := mgo.MapReduce{ - Map: `function() { - var len = this["_id"].length - var s_key = this["_id"].slice(0, len / 4) - var p_key = this["_id"].slice(len / 4, 2 * len / 4) - var o_key = this["_id"].slice(2 * len / 4, 3 * len / 4) - var c_key = this["_id"].slice(3 * len / 4) - emit(s_key, {"_id": s_key, "Name" : this.Subject, "Size" : 1}) - emit(p_key, {"_id": p_key, "Name" : this.Predicate, "Size" : 1}) - emit(o_key, {"_id": o_key, "Name" : this.Object, "Size" : 1}) - if (this.Label != "") { - emit(c_key, {"_id": c_key, "Name" : this.Label, "Size" : 1}) - } - } - `, - Reduce: ` - function(key, value_list) { - out = {"_id": key, "Name": value_list[0].Name} - count = 0 - for (var i = 0; i < value_list.length; i++) { - count = count + value_list[i].Size - - } - out["Size"] = count - return out - } - `, - Out: outputTo, - } - qs.db.C("triples").Find(nil).MapReduce(&job, nil) - glog.Infoln("Fixing") - qs.db.Run(bson.D{{"eval", `function() { db.nodes.find().forEach(function (result) { - db.nodes.update({"_id": result._id}, result.value) - }) }`}, {"args", bson.D{}}}, nil) - - qs.session.SetSafe(&mgo.Safe{}) - - return nil -} +// TODO(barakmich): Rewrite bulk loader. For now, iterating around blocks is the way we'll go about it. diff --git a/writer/single.go b/writer/single.go index 51641ca..0dc4882 100644 --- a/writer/single.go +++ b/writer/single.go @@ -22,6 +22,10 @@ import ( "github.com/google/cayley/quad" ) +func init() { + graph.RegisterWriter("single", NewSingleReplication) +} + type Single struct { nextID int64 ts graph.TripleStore @@ -79,7 +83,3 @@ func (s *Single) RemoveQuad(q quad.Quad) error { } return s.ts.ApplyDeltas(deltas) } - -func init() { - graph.RegisterWriter("single", NewSingleReplication) -} From 6d22037602699c3cc2ca054688cd6938608426c4 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sun, 10 Aug 2014 19:41:22 -0400 Subject: [PATCH 15/18] add iterator check for mongo --- graph/mongo/iterator.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/graph/mongo/iterator.go b/graph/mongo/iterator.go index 9e37089..8485ca9 100644 --- a/graph/mongo/iterator.go +++ b/graph/mongo/iterator.go @@ -140,10 +140,9 @@ func (it *Iterator) Clone() graph.Iterator { func (it *Iterator) Next() bool { var result struct { - Id string "_id" - //Sub string "Sub" - //Pred string "Pred" - //Obj string "Obj" + Id string "_id" + Added []int64 + Deleted []int64 } found := it.iter.Next(&result) if !found { @@ -153,6 +152,9 @@ func (it *Iterator) Next() bool { } return false } + if it.collection == "quads" && len(result.Added) <= len(result.Deleted) { + return it.Next() + } it.result = result.Id return true } From 3770190db53df94bca9df164732bc05fad26dcbf Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sun, 10 Aug 2014 20:10:00 -0400 Subject: [PATCH 16/18] test clean --- cayley.go | 39 +++++++++++++++++++++------------------ cayley_test.go | 17 +++++++++-------- db/db.go | 18 +++--------------- db/repl.go | 24 ++++++++++++------------ graph/iterator/mock_ts_test.go | 6 +++--- graph/leveldb/leveldb_test.go | 2 +- graph/quadwriter.go | 8 ++++++++ http/http.go | 10 +++++----- http/query.go | 13 ++++--------- http/write.go | 8 ++++---- query/gremlin/gremlin_test.go | 4 +++- query/mql/mql_test.go | 4 +++- query/sexp/parser_test.go | 17 +++++++++++------ writer/single.go | 5 +++++ 14 files changed, 92 insertions(+), 83 deletions(-) diff --git a/cayley.go b/cayley.go index 7e58749..0516da5 100644 --- a/cayley.go +++ b/cayley.go @@ -44,6 +44,9 @@ import ( _ "github.com/google/cayley/graph/leveldb" _ "github.com/google/cayley/graph/memstore" _ "github.com/google/cayley/graph/mongo" + + // Load writer registry + _ "github.com/google/cayley/writer" ) var ( @@ -105,8 +108,8 @@ func main() { } var ( - ts graph.TripleStore - err error + handle *graph.Handle + err error ) switch cmd { case "version": @@ -123,60 +126,60 @@ func main() { break } if *tripleFile != "" { - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { break } - err = load(ts, cfg, *tripleFile, *tripleType) + err = load(handle.QuadWriter, cfg, *tripleFile, *tripleType) if err != nil { break } - ts.Close() + handle.Close() } case "load": - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { break } - err = load(ts, cfg, *tripleFile, *tripleType) + err = load(handle.QuadWriter, cfg, *tripleFile, *tripleType) if err != nil { break } - ts.Close() + handle.Close() case "repl": - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { break } if !graph.IsPersistent(cfg.DatabaseType) { - err = load(ts, cfg, "", *tripleType) + err = load(handle.QuadWriter, cfg, "", *tripleType) if err != nil { break } } - err = db.Repl(ts, *queryLanguage, cfg) + err = db.Repl(handle, *queryLanguage, cfg) - ts.Close() + handle.Close() case "http": - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { break } if !graph.IsPersistent(cfg.DatabaseType) { - err = load(ts, cfg, "", *tripleType) + err = load(handle.QuadWriter, cfg, "", *tripleType) if err != nil { break } } - http.Serve(ts, cfg) + http.Serve(handle, cfg) - ts.Close() + handle.Close() default: fmt.Println("No command", cmd) @@ -187,7 +190,7 @@ func main() { } } -func load(ts graph.TripleStore, cfg *config.Config, path, typ string) error { +func load(qw graph.QuadWriter, cfg *config.Config, path, typ string) error { var r io.Reader if path == "" { @@ -230,7 +233,7 @@ func load(ts graph.TripleStore, cfg *config.Config, path, typ string) error { return fmt.Errorf("unknown quad format %q", typ) } - return db.Load(ts, cfg, dec) + return db.Load(qw, cfg, dec) } const ( diff --git a/cayley_test.go b/cayley_test.go index d108f85..b0839e7 100644 --- a/cayley_test.go +++ b/cayley_test.go @@ -298,24 +298,25 @@ var m2_actors = movie2.Save("name","movie2").Follow(filmToActor) var ( once sync.Once cfg = &config.Config{ - DatabasePath: "30kmoviedata.nq.gz", - DatabaseType: "memstore", - Timeout: 300 * time.Second, + DatabasePath: "30kmoviedata.nq.gz", + DatabaseType: "memstore", + ReplicationType: "single", + Timeout: 300 * time.Second, } - ts graph.TripleStore + handle *graph.Handle ) func prepare(t testing.TB) { var err error once.Do(func() { - ts, err = db.Open(cfg) + handle, err = db.Open(cfg) if err != nil { t.Fatalf("Failed to open %q: %v", cfg.DatabasePath, err) } if !graph.IsPersistent(cfg.DatabaseType) { - err = load(ts, cfg, "", "cquad") + err = load(handle.QuadWriter, cfg, "", "cquad") if err != nil { t.Fatalf("Failed to load %q: %v", cfg.DatabasePath, err) } @@ -329,7 +330,7 @@ func TestQueries(t *testing.T) { if testing.Short() && test.long { continue } - ses := gremlin.NewSession(ts, cfg.Timeout, true) + ses := gremlin.NewSession(handle.QuadStore, cfg.Timeout, true) _, err := ses.InputParses(test.query) if err != nil { t.Fatalf("Failed to parse benchmark gremlin %s: %v", test.message, err) @@ -374,7 +375,7 @@ func runBench(n int, b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { c := make(chan interface{}, 5) - ses := gremlin.NewSession(ts, cfg.Timeout, true) + ses := gremlin.NewSession(handle.QuadStore, cfg.Timeout, true) // Do the parsing we know works. ses.InputParses(benchmarkQueries[n].query) b.StartTimer() diff --git a/db/db.go b/db/db.go index 10f6898..e5370ee 100644 --- a/db/db.go +++ b/db/db.go @@ -68,19 +68,7 @@ func OpenQuadWriter(qs graph.TripleStore, cfg *config.Config) (graph.QuadWriter, return w, nil } -func Load(ts graph.TripleStore, cfg *config.Config, dec quad.Unmarshaler) error { - bulker, canBulk := ts.(graph.BulkLoader) - if canBulk { - switch err := bulker.BulkLoad(dec); err { - case nil: - return nil - case graph.ErrCannotBulkLoad: - // Try individual loading. - default: - return err - } - } - +func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error { block := make([]quad.Quad, 0, cfg.LoadSize) for { t, err := dec.Unmarshal() @@ -92,11 +80,11 @@ func Load(ts graph.TripleStore, cfg *config.Config, dec quad.Unmarshaler) error } block = append(block, t) if len(block) == cap(block) { - ts.AddTripleSet(block) + qw.AddQuadSet(block) block = block[:0] } } - ts.AddTripleSet(block) + qw.AddQuadSet(block) return nil } diff --git a/db/repl.go b/db/repl.go index 11ecfea..ac36ea4 100644 --- a/db/repl.go +++ b/db/repl.go @@ -70,17 +70,17 @@ const ( history = ".cayley_history" ) -func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) error { +func Repl(h *graph.Handle, queryLanguage string, cfg *config.Config) error { var ses query.Session switch queryLanguage { case "sexp": - ses = sexp.NewSession(ts) + ses = sexp.NewSession(h.QuadStore) case "mql": - ses = mql.NewSession(ts) + ses = mql.NewSession(h.QuadStore) case "gremlin": fallthrough default: - ses = gremlin.NewSession(ts, cfg.Timeout, true) + ses = gremlin.NewSession(h.QuadStore, cfg.Timeout, true) } term, err := terminal(history) @@ -124,25 +124,25 @@ func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) error continue case strings.HasPrefix(line, ":a"): - triple, err := cquads.Parse(line[3:]) - if !triple.IsValid() { + quad, err := cquads.Parse(line[3:]) + if !quad.IsValid() { if err != nil { - fmt.Printf("not a valid triple: %v\n", err) + fmt.Printf("not a valid quad: %v\n", err) } continue } - ts.AddTriple(triple) + h.QuadWriter.AddQuad(quad) continue case strings.HasPrefix(line, ":d"): - triple, err := cquads.Parse(line[3:]) - if !triple.IsValid() { + quad, err := cquads.Parse(line[3:]) + if !quad.IsValid() { if err != nil { - fmt.Printf("not a valid triple: %v\n", err) + fmt.Printf("not a valid quad: %v\n", err) } continue } - ts.RemoveTriple(triple) + h.QuadWriter.RemoveQuad(quad) continue } } diff --git a/graph/iterator/mock_ts_test.go b/graph/iterator/mock_ts_test.go index 08483c7..4202ffa 100644 --- a/graph/iterator/mock_ts_test.go +++ b/graph/iterator/mock_ts_test.go @@ -36,9 +36,7 @@ func (qs *store) ValueOf(s string) graph.Value { return nil } -func (qs *store) AddTriple(quad.Quad) {} - -func (qs *store) AddTripleSet([]quad.Quad) {} +func (qs *store) ApplyDeltas([]*graph.Delta) error { return nil } func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} } @@ -60,6 +58,8 @@ func (qs *store) NameOf(v graph.Value) string { func (qs *store) Size() int64 { return 0 } +func (qs *store) Horizon() int64 { return 0 } + func (qs *store) DebugPrint() {} func (qs *store) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { diff --git a/graph/leveldb/leveldb_test.go b/graph/leveldb/leveldb_test.go index 3c3a057..45a7404 100644 --- a/graph/leveldb/leveldb_test.go +++ b/graph/leveldb/leveldb_test.go @@ -137,7 +137,7 @@ func TestLoadDatabase(t *testing.T) { } w, _ := writer.NewSingleReplication(qs, nil) - qs.AddQuad(quad.Quad{"Something", "points_to", "Something Else", "context"}) + w.AddQuad(quad.Quad{"Something", "points_to", "Something Else", "context"}) for _, pq := range []string{"Something", "points_to", "Something Else", "context"} { if got := qs.NameOf(qs.ValueOf(pq)); got != pq { t.Errorf("Failed to roundtrip %q, got:%q expect:%q", pq, got, pq) diff --git a/graph/quadwriter.go b/graph/quadwriter.go index bc34555..dddc19a 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -48,6 +48,11 @@ type Handle struct { QuadWriter QuadWriter } +func (h *Handle) Close() { + h.QuadStore.Close() + h.QuadWriter.Close() +} + var ErrQuadExists = errors.New("Quad exists") var ErrQuadNotExist = errors.New("Quad doesn't exist") @@ -61,6 +66,9 @@ type QuadWriter interface { // Removes a quad matching the given one from the database, // if it exists. Does nothing otherwise. RemoveQuad(quad.Quad) error + + // Cleans up replication and closes the writing aspect of the database. + Close() error } type NewQuadWriterFunc func(TripleStore, Options) (QuadWriter, error) diff --git a/http/http.go b/http/http.go index 70321dd..d5f9351 100644 --- a/http/http.go +++ b/http/http.go @@ -107,7 +107,7 @@ func (h *TemplateRequestHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques type Api struct { config *config.Config - ts graph.TripleStore + handle *graph.Handle } func (api *Api) ApiV1(r *httprouter.Router) { @@ -119,7 +119,7 @@ func (api *Api) ApiV1(r *httprouter.Router) { r.POST("/api/v1/delete", LogRequest(api.ServeV1Delete)) } -func SetupRoutes(ts graph.TripleStore, cfg *config.Config) { +func SetupRoutes(handle *graph.Handle, cfg *config.Config) { r := httprouter.New() assets := findAssetsPath() if glog.V(2) { @@ -129,7 +129,7 @@ func SetupRoutes(ts graph.TripleStore, cfg *config.Config) { templates.ParseGlob(fmt.Sprint(assets, "/templates/*.html")) root := &TemplateRequestHandler{templates: templates} docs := &DocRequestHandler{assets: assets} - api := &Api{config: cfg, ts: ts} + api := &Api{config: cfg, handle: handle} api.ApiV1(r) //m.Use(martini.Static("static", martini.StaticOptions{Prefix: "/static", SkipLogging: true})) @@ -141,8 +141,8 @@ func SetupRoutes(ts graph.TripleStore, cfg *config.Config) { http.Handle("/", r) } -func Serve(ts graph.TripleStore, cfg *config.Config) { - SetupRoutes(ts, cfg) +func Serve(handle *graph.Handle, cfg *config.Config) { + SetupRoutes(handle, cfg) glog.Infof("Cayley now listening on %s:%s\n", cfg.ListenHost, cfg.ListenPort) fmt.Printf("Cayley now listening on %s:%s\n", cfg.ListenHost, cfg.ListenPort) err := http.ListenAndServe(fmt.Sprintf("%s:%s", cfg.ListenHost, cfg.ListenPort), nil) diff --git a/http/query.go b/http/query.go index f4f5a34..9d6460a 100644 --- a/http/query.go +++ b/http/query.go @@ -71,9 +71,9 @@ func (api *Api) ServeV1Query(w http.ResponseWriter, r *http.Request, params http var ses query.HttpSession switch params.ByName("query_lang") { case "gremlin": - ses = gremlin.NewSession(api.ts, api.config.Timeout, false) + ses = gremlin.NewSession(api.handle.QuadStore, api.config.Timeout, false) case "mql": - ses = mql.NewSession(api.ts) + ses = mql.NewSession(api.handle.QuadStore) default: return FormatJson400(w, "Need a query language.") } @@ -110,18 +110,15 @@ func (api *Api) ServeV1Query(w http.ResponseWriter, r *http.Request, params http ses = nil return FormatJsonError(w, 500, "Incomplete data?") } - http.Error(w, "", http.StatusNotFound) - ses = nil - return http.StatusNotFound } func (api *Api) ServeV1Shape(w http.ResponseWriter, r *http.Request, params httprouter.Params) int { var ses query.HttpSession switch params.ByName("query_lang") { case "gremlin": - ses = gremlin.NewSession(api.ts, api.config.Timeout, false) + ses = gremlin.NewSession(api.handle.QuadStore, api.config.Timeout, false) case "mql": - ses = mql.NewSession(api.ts) + ses = mql.NewSession(api.handle.QuadStore) default: return FormatJson400(w, "Need a query language.") } @@ -146,6 +143,4 @@ func (api *Api) ServeV1Shape(w http.ResponseWriter, r *http.Request, params http default: return FormatJsonError(w, 500, "Incomplete data?") } - http.Error(w, "", http.StatusNotFound) - return http.StatusNotFound } diff --git a/http/write.go b/http/write.go index 54e5537..018b339 100644 --- a/http/write.go +++ b/http/write.go @@ -55,7 +55,7 @@ func (api *Api) ServeV1Write(w http.ResponseWriter, r *http.Request, _ httproute if terr != nil { return FormatJson400(w, terr) } - api.ts.AddTripleSet(tripleList) + api.handle.QuadWriter.AddQuadSet(tripleList) fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", len(tripleList)) return 200 } @@ -97,11 +97,11 @@ func (api *Api) ServeV1WriteNQuad(w http.ResponseWriter, r *http.Request, params block = append(block, t) n++ if len(block) == cap(block) { - api.ts.AddTripleSet(block) + api.handle.QuadWriter.AddQuadSet(block) block = block[:0] } } - api.ts.AddTripleSet(block) + api.handle.QuadWriter.AddQuadSet(block) fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", n) @@ -122,7 +122,7 @@ func (api *Api) ServeV1Delete(w http.ResponseWriter, r *http.Request, params htt } count := 0 for _, triple := range tripleList { - api.ts.RemoveTriple(triple) + api.handle.QuadWriter.RemoveQuad(triple) count++ } fmt.Fprintf(w, "{\"result\": \"Successfully deleted %d triples.\"}", count) diff --git a/query/gremlin/gremlin_test.go b/query/gremlin/gremlin_test.go index a638eb7..566305a 100644 --- a/query/gremlin/gremlin_test.go +++ b/query/gremlin/gremlin_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/cayley/quad" _ "github.com/google/cayley/graph/memstore" + _ "github.com/google/cayley/writer" ) // This is a simple test graph. @@ -54,8 +55,9 @@ var simpleGraph = []quad.Quad{ func makeTestSession(data []quad.Quad) *Session { ts, _ := graph.NewTripleStore("memstore", "", nil) + w, _ := graph.NewQuadWriter("single", ts, nil) for _, t := range data { - ts.AddTriple(t) + w.AddQuad(t) } return NewSession(ts, -1, false) } diff --git a/query/mql/mql_test.go b/query/mql/mql_test.go index 619ae54..d7e8ef4 100644 --- a/query/mql/mql_test.go +++ b/query/mql/mql_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/cayley/graph" _ "github.com/google/cayley/graph/memstore" "github.com/google/cayley/quad" + _ "github.com/google/cayley/writer" ) // This is a simple test graph. @@ -53,8 +54,9 @@ var simpleGraph = []quad.Quad{ func makeTestSession(data []quad.Quad) *Session { ts, _ := graph.NewTripleStore("memstore", "", nil) + w, _ := graph.NewQuadWriter("single", ts, nil) for _, t := range data { - ts.AddTriple(t) + w.AddQuad(t) } return NewSession(ts) } diff --git a/query/sexp/parser_test.go b/query/sexp/parser_test.go index d4d16af..2026a2f 100644 --- a/query/sexp/parser_test.go +++ b/query/sexp/parser_test.go @@ -21,6 +21,7 @@ import ( "github.com/google/cayley/quad" _ "github.com/google/cayley/graph/memstore" + _ "github.com/google/cayley/writer" ) func TestBadParse(t *testing.T) { @@ -55,13 +56,14 @@ var testQueries = []struct { func TestMemstoreBackedSexp(t *testing.T) { ts, _ := graph.NewTripleStore("memstore", "", nil) + w, _ := graph.NewQuadWriter("single", ts, nil) it := BuildIteratorTreeForQuery(ts, "()") if it.Type() != graph.Null { t.Errorf(`Incorrect type for empty query, got:%q expect: "null"`, it.Type()) } for _, test := range testQueries { if test.add.IsValid() { - ts.AddTriple(test.add) + w.AddQuad(test.add) } it := BuildIteratorTreeForQuery(ts, test.query) if it.Type() != test.typ { @@ -79,8 +81,9 @@ func TestMemstoreBackedSexp(t *testing.T) { func TestTreeConstraintParse(t *testing.T) { ts, _ := graph.NewTripleStore("memstore", "", nil) - ts.AddTriple(quad.Quad{"i", "like", "food", ""}) - ts.AddTriple(quad.Quad{"food", "is", "good", ""}) + w, _ := graph.NewQuadWriter("single", ts, nil) + w.AddQuad(quad.Quad{"i", "like", "food", ""}) + w.AddQuad(quad.Quad{"food", "is", "good", ""}) query := "(\"i\"\n" + "(:like\n" + "($a (:is :good))))" @@ -99,8 +102,9 @@ func TestTreeConstraintParse(t *testing.T) { func TestTreeConstraintTagParse(t *testing.T) { ts, _ := graph.NewTripleStore("memstore", "", nil) - ts.AddTriple(quad.Quad{"i", "like", "food", ""}) - ts.AddTriple(quad.Quad{"food", "is", "good", ""}) + w, _ := graph.NewQuadWriter("single", ts, nil) + w.AddQuad(quad.Quad{"i", "like", "food", ""}) + w.AddQuad(quad.Quad{"food", "is", "good", ""}) query := "(\"i\"\n" + "(:like\n" + "($a (:is :good))))" @@ -118,12 +122,13 @@ func TestTreeConstraintTagParse(t *testing.T) { func TestMultipleConstraintParse(t *testing.T) { ts, _ := graph.NewTripleStore("memstore", "", nil) + w, _ := graph.NewQuadWriter("single", ts, nil) for _, tv := range []quad.Quad{ {"i", "like", "food", ""}, {"i", "like", "beer", ""}, {"you", "like", "beer", ""}, } { - ts.AddTriple(tv) + w.AddQuad(tv) } query := `( $a diff --git a/writer/single.go b/writer/single.go index 0dc4882..794f1a8 100644 --- a/writer/single.go +++ b/writer/single.go @@ -83,3 +83,8 @@ func (s *Single) RemoveQuad(q quad.Quad) error { } return s.ts.ApplyDeltas(deltas) } + +func (s *Single) Close() error { + // Nothing to clean up locally. + return nil +} From 48711af1d9b42ec0ff480dcb6da9cdd6d0a1ef7e Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sun, 10 Aug 2014 21:05:39 -0400 Subject: [PATCH 17/18] Mongo log works (and bug fixed) --- graph/mongo/iterator.go | 6 +++--- graph/mongo/triplestore.go | 39 +++++++++++++++++++++++++++++++++++++++ writer/single.go | 5 +++-- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/graph/mongo/iterator.go b/graph/mongo/iterator.go index 8485ca9..1495eca 100644 --- a/graph/mongo/iterator.go +++ b/graph/mongo/iterator.go @@ -140,9 +140,9 @@ func (it *Iterator) Clone() graph.Iterator { func (it *Iterator) Next() bool { var result struct { - Id string "_id" - Added []int64 - Deleted []int64 + Id string "_id" + Added []int64 "Added" + Deleted []int64 "Deleted" } found := it.iter.Next(&result) if !found { diff --git a/graph/mongo/triplestore.go b/graph/mongo/triplestore.go index 79fe38d..7165900 100644 --- a/graph/mongo/triplestore.go +++ b/graph/mongo/triplestore.go @@ -170,6 +170,25 @@ func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) return err } +func (qs *TripleStore) checkValid(key string) bool { + var indexEntry struct { + Added []int64 "Added" + Deleted []int64 "Deleted" + } + err := qs.db.C("quads").FindId(key).One(&indexEntry) + if err == mgo.ErrNotFound { + return false + } + if err != nil { + glog.Errorln("Other error checking valid quad: %s %v.", key, err) + return false + } + if len(indexEntry.Added) <= len(indexEntry.Deleted) { + return false + } + return true +} + func (qs *TripleStore) updateLog(d *graph.Delta) error { var action string if d.Action == graph.Add { @@ -193,6 +212,23 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error { func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { qs.session.SetSafe(nil) ids := make(map[string]int) + // Pre-check the existence condition. + for _, d := range in { + key := qs.getIdForTriple(d.Quad) + switch d.Action { + case graph.Add: + if qs.checkValid(key) { + return graph.ErrQuadExists + } + case graph.Delete: + if !qs.checkValid(key) { + return graph.ErrQuadNotExist + } + } + } + if glog.V(2) { + glog.Infoln("Existence verified. Proceeding.") + } for _, d := range in { err := qs.updateLog(d) if err != nil { @@ -285,6 +321,9 @@ func (qs *TripleStore) Horizon() int64 { var log MongoLogEntry err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log) if err != nil { + if err == mgo.ErrNotFound { + return 0 + } glog.Errorf("Could not get Horizon from Mongo: %v", err) } return log.LogID diff --git a/writer/single.go b/writer/single.go index 794f1a8..4a3a787 100644 --- a/writer/single.go +++ b/writer/single.go @@ -33,8 +33,9 @@ type Single struct { } func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.QuadWriter, error) { - rep := &Single{nextID: ts.Horizon(), ts: ts} - if rep.nextID <= 0 { + horizon := ts.Horizon() + rep := &Single{nextID: horizon + 1, ts: ts} + if horizon <= 0 { rep.nextID = 1 } return rep, nil From 9ce35aeb459ebd657f0771509a4902ec18996fdb Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sun, 10 Aug 2014 21:34:22 -0400 Subject: [PATCH 18/18] add removal test --- cayley.go | 22 ++++++++++++++++++++++ cayley_test.go | 36 +++++++++++++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/cayley.go b/cayley.go index 0516da5..7bc90f1 100644 --- a/cayley.go +++ b/cayley.go @@ -191,6 +191,28 @@ func main() { } func load(qw graph.QuadWriter, cfg *config.Config, path, typ string) error { + return decompressAndLoad(qw, cfg, path, typ, db.Load) +} + +func removeAll(qw graph.QuadWriter, cfg *config.Config, path, typ string) error { + return decompressAndLoad(qw, cfg, path, typ, remove) +} + +func remove(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error { + for { + t, err := dec.Unmarshal() + if err != nil { + if err == io.EOF { + break + } + return err + } + qw.RemoveQuad(t) + } + return nil +} + +func decompressAndLoad(qw graph.QuadWriter, cfg *config.Config, path, typ string, loadFn func(graph.QuadWriter, *config.Config, quad.Unmarshaler) error) error { var r io.Reader if path == "" { diff --git a/cayley_test.go b/cayley_test.go index b0839e7..056f55e 100644 --- a/cayley_test.go +++ b/cayley_test.go @@ -296,8 +296,9 @@ var m2_actors = movie2.Save("name","movie2").Follow(filmToActor) ` var ( - once sync.Once - cfg = &config.Config{ + create sync.Once + deleteAndRecreate sync.Once + cfg = &config.Config{ DatabasePath: "30kmoviedata.nq.gz", DatabaseType: "memstore", ReplicationType: "single", @@ -309,7 +310,7 @@ var ( func prepare(t testing.TB) { var err error - once.Do(func() { + create.Do(func() { handle, err = db.Open(cfg) if err != nil { t.Fatalf("Failed to open %q: %v", cfg.DatabasePath, err) @@ -324,8 +325,37 @@ func prepare(t testing.TB) { }) } +func deletePrepare(t testing.TB) { + var err error + deleteAndRecreate.Do(func() { + prepare(t) + if !graph.IsPersistent(cfg.DatabaseType) { + err = removeAll(handle.QuadWriter, cfg, "", "cquad") + if err != nil { + t.Fatalf("Failed to remove %q: %v", cfg.DatabasePath, err) + } + err = load(handle.QuadWriter, cfg, "", "cquad") + if err != nil { + t.Fatalf("Failed to load %q: %v", cfg.DatabasePath, err) + } + } + }) +} + func TestQueries(t *testing.T) { prepare(t) + checkQueries(t) +} + +func TestDeletedAndRecreatedQueries(t *testing.T) { + if testing.Short() { + t.Skip() + } + deletePrepare(t) + checkQueries(t) +} + +func checkQueries(t *testing.T) { for _, test := range benchmarkQueries { if testing.Short() && test.long { continue