Merge branch 'log_database' into b

Conflicts:
	graph/memstore/iterator.go
	graph/memstore/triplestore.go
This commit is contained in:
kortschak 2014-08-13 17:15:11 +09:30
commit 4a92ae9758
24 changed files with 863 additions and 531 deletions

View file

@ -36,9 +36,7 @@ func (qs *store) ValueOf(s string) graph.Value {
return nil
}
func (qs *store) AddTriple(quad.Quad) {}
func (qs *store) AddTripleSet([]quad.Quad) {}
func (qs *store) ApplyDeltas([]*graph.Delta) error { return nil }
func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} }
@ -60,6 +58,8 @@ func (qs *store) NameOf(v graph.Value) string {
func (qs *store) Size() int64 { return 0 }
func (qs *store) Horizon() int64 { return 0 }
func (qs *store) DebugPrint() {}
func (qs *store) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) {

View file

@ -16,9 +16,11 @@ package leveldb
import (
"bytes"
"encoding/json"
"fmt"
"strings"
"github.com/barakmich/glog"
ldbit "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@ -41,7 +43,7 @@ type Iterator struct {
result graph.Value
}
func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) *Iterator {
func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) graph.Iterator {
vb := value.(Token)
p := make([]byte, 0, 2+qs.hasher.Size())
p = append(p, []byte(prefix)...)
@ -65,10 +67,10 @@ func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleS
ok := it.iter.Seek(it.nextPrefix)
if !ok {
// FIXME(kortschak) What are the semantics here? Is this iterator usable?
// If not, we should return nil *Iterator and an error.
it.open = false
it.iter.Release()
glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix)
return &iterator.Null{}
}
return &it
@ -106,7 +108,7 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) {
func (it *Iterator) Clone() graph.Iterator {
out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs)
out.tags.CopyFrom(it)
out.Tagger().CopyFrom(it)
return out
}
@ -117,6 +119,12 @@ func (it *Iterator) Close() {
}
}
func (it *Iterator) isLiveValue(val []byte) bool {
var entry IndexEntry
json.Unmarshal(val, &entry)
return len(entry.History)%2 != 0
}
func (it *Iterator) Next() bool {
if it.iter == nil {
it.result = nil
@ -132,6 +140,9 @@ func (it *Iterator) Next() bool {
return false
}
if bytes.HasPrefix(it.iter.Key(), it.nextPrefix) {
if !it.isLiveValue(it.iter.Value()) {
return it.Next()
}
out := make([]byte, len(it.iter.Key()))
copy(out, it.iter.Key())
it.result = Token(out)
@ -173,7 +184,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int {
case quad.Object:
return 2*qs.hasher.Size() + 2
case quad.Label:
return -1
return 3*qs.hasher.Size() + 2
}
}
if bytes.Equal(prefix, []byte("po")) {
@ -185,7 +196,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int {
case quad.Object:
return qs.hasher.Size() + 2
case quad.Label:
return -1
return 3*qs.hasher.Size() + 2
}
}
if bytes.Equal(prefix, []byte("os")) {
@ -197,7 +208,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int {
case quad.Object:
return 2
case quad.Label:
return -1
return 3*qs.hasher.Size() + 2
}
}
if bytes.Equal(prefix, []byte("cp")) {
@ -221,16 +232,17 @@ func (it *Iterator) Contains(v graph.Value) bool {
return false
}
offset := PositionOf(val[0:2], it.dir, it.qs)
if offset != -1 {
if bytes.HasPrefix(val[offset:], it.checkId[1:]) {
return true
}
} else {
nameForDir := it.qs.Quad(v).Get(it.dir)
hashForDir := it.qs.ValueOf(nameForDir).(Token)
if bytes.Equal(hashForDir, it.checkId) {
return true
}
if bytes.HasPrefix(val[offset:], it.checkId[1:]) {
// You may ask, why don't we check to see if it's a valid (not deleted) triple
// again?
//
// We've already done that -- in order to get the graph.Value token in the
// first place, we had to have done the check already; it came from a Next().
//
// However, if it ever starts coming from somewhere else, it'll be more
// efficient to change the interface of the graph.Value for LevelDB to a
// struct with a flag for isValid, to save another random read.
return true
}
return false
}

View file

@ -24,6 +24,7 @@ import (
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
"github.com/google/cayley/writer"
)
func makeTripleSet() []quad.Quad {
@ -135,7 +136,8 @@ func TestLoadDatabase(t *testing.T) {
t.Error("Failed to create leveldb TripleStore.")
}
qs.AddTriple(quad.Quad{"Something", "points_to", "Something Else", "context"})
w, _ := writer.NewSingleReplication(qs, nil)
w.AddQuad(quad.Quad{"Something", "points_to", "Something Else", "context"})
for _, pq := range []string{"Something", "points_to", "Something Else", "context"} {
if got := qs.NameOf(qs.ValueOf(pq)); got != pq {
t.Errorf("Failed to roundtrip %q, got:%q expect:%q", pq, got, pq)
@ -154,13 +156,14 @@ func TestLoadDatabase(t *testing.T) {
if qs == nil || err != nil {
t.Error("Failed to create leveldb TripleStore.")
}
w, _ = writer.NewSingleReplication(qs, nil)
ts2, didConvert := qs.(*TripleStore)
if !didConvert {
t.Errorf("Could not convert from generic to LevelDB TripleStore")
}
qs.AddTripleSet(makeTripleSet())
w.AddQuadSet(makeTripleSet())
if s := qs.Size(); s != 11 {
t.Errorf("Unexpected triplestore size, got:%d expect:11", s)
}
@ -168,7 +171,7 @@ func TestLoadDatabase(t *testing.T) {
t.Errorf("Unexpected triplestore size, got:%d expect:5", s)
}
qs.RemoveTriple(quad.Quad{"A", "follows", "B", ""})
w.RemoveQuad(quad.Quad{"A", "follows", "B", ""})
if s := qs.Size(); s != 10 {
t.Errorf("Unexpected triplestore size after RemoveTriple, got:%d expect:10", s)
}
@ -196,7 +199,9 @@ func TestIterator(t *testing.T) {
if qs == nil || err != nil {
t.Error("Failed to create leveldb TripleStore.")
}
qs.AddTripleSet(makeTripleSet())
w, _ := writer.NewSingleReplication(qs, nil)
w.AddQuadSet(makeTripleSet())
var it graph.Iterator
it = qs.NodesAllIterator()
@ -291,7 +296,8 @@ func TestSetIterator(t *testing.T) {
}
defer qs.Close()
qs.AddTripleSet(makeTripleSet())
w, _ := writer.NewSingleReplication(qs, nil)
w.AddQuadSet(makeTripleSet())
expect := []quad.Quad{
{"C", "follows", "B", ""},
@ -403,7 +409,9 @@ func TestOptimize(t *testing.T) {
if qs == nil || err != nil {
t.Error("Failed to create leveldb TripleStore.")
}
qs.AddTripleSet(makeTripleSet())
w, _ := writer.NewSingleReplication(qs, nil)
w.AddQuadSet(makeTripleSet())
// With an linksto-fixed pair
fixed := qs.FixedIterator()

View file

@ -19,6 +19,7 @@ import (
"crypto/sha1"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash"
@ -54,6 +55,7 @@ type TripleStore struct {
path string
open bool
size int64
horizon int64
hasher hash.Hash
writeopts *opt.WriteOptions
readopts *opt.ReadOptions
@ -78,6 +80,7 @@ func createNewLevelDB(path string, _ graph.Options) error {
func newTripleStore(path string, options graph.Options) (graph.TripleStore, error) {
var qs TripleStore
var err error
qs.path = path
cache_size := DefaultCacheSize
if val, ok := options.IntKey("cache_size_mb"); ok {
@ -100,11 +103,15 @@ func newTripleStore(path string, options graph.Options) (graph.TripleStore, erro
qs.readopts = &opt.ReadOptions{}
db, err := leveldb.OpenFile(qs.path, qs.dbOpts)
if err != nil {
panic("Error, couldn't open! " + err.Error())
glog.Errorln("Error, couldn't open! ", err)
return nil, err
}
qs.db = db
glog.Infoln(qs.GetStats())
qs.getSize()
err = qs.getMetadata()
if err != nil {
return nil, err
}
return &qs, nil
}
@ -122,24 +129,25 @@ func (qs *TripleStore) Size() int64 {
return qs.size
}
func (qs *TripleStore) createKeyFor(d [3]quad.Direction, triple quad.Quad) []byte {
key := make([]byte, 0, 2+(qs.hasher.Size()*3))
func (qs *TripleStore) Horizon() int64 {
return qs.horizon
}
func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte {
key := make([]byte, 0, 19)
key = append(key, 'd')
key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...)
return key
}
func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte {
key := make([]byte, 0, 2+(qs.hasher.Size()*4))
// TODO(kortschak) Remove dependence on String() method.
key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...)
return key
}
func (qs *TripleStore) createProvKeyFor(d [3]quad.Direction, triple quad.Quad) []byte {
key := make([]byte, 0, 2+(qs.hasher.Size()*4))
// TODO(kortschak) Remove dependence on String() method.
key = append(key, []byte{quad.Label.Prefix(), d[0].Prefix()}...)
key = append(key, qs.convertStringToByteHash(triple.Get(quad.Label))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...)
return key
}
@ -150,76 +158,98 @@ func (qs *TripleStore) createValueKeyFor(s string) []byte {
return key
}
func (qs *TripleStore) AddTriple(t quad.Quad) {
batch := &leveldb.Batch{}
qs.buildWrite(batch, t)
err := qs.db.Write(batch, qs.writeopts)
if err != nil {
glog.Errorf("Couldn't write to DB for triple %s.", t)
return
}
qs.size++
type IndexEntry struct {
quad.Quad
History []int64
}
// Short hand for direction permutations.
var (
spo = [3]quad.Direction{quad.Subject, quad.Predicate, quad.Object}
osp = [3]quad.Direction{quad.Object, quad.Subject, quad.Predicate}
pos = [3]quad.Direction{quad.Predicate, quad.Object, quad.Subject}
pso = [3]quad.Direction{quad.Predicate, quad.Subject, quad.Object}
spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label}
osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label}
pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label}
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
)
func (qs *TripleStore) RemoveTriple(t quad.Quad) {
_, err := qs.db.Get(qs.createKeyFor(spo, t), qs.readopts)
if err != nil && err != leveldb.ErrNotFound {
glog.Error("Couldn't access DB to confirm deletion")
return
}
if err == leveldb.ErrNotFound {
// No such triple in the database, forget about it.
return
}
func (qs *TripleStore) ApplyDeltas(deltas []*graph.Delta) error {
batch := &leveldb.Batch{}
batch.Delete(qs.createKeyFor(spo, t))
batch.Delete(qs.createKeyFor(osp, t))
batch.Delete(qs.createKeyFor(pos, t))
qs.UpdateValueKeyBy(t.Get(quad.Subject), -1, batch)
qs.UpdateValueKeyBy(t.Get(quad.Predicate), -1, batch)
qs.UpdateValueKeyBy(t.Get(quad.Object), -1, batch)
if t.Get(quad.Label) != "" {
batch.Delete(qs.createProvKeyFor(pso, t))
qs.UpdateValueKeyBy(t.Get(quad.Label), -1, batch)
resizeMap := make(map[string]int64)
size_change := int64(0)
for _, d := range deltas {
bytes, err := json.Marshal(d)
if err != nil {
return err
}
batch.Put(qs.createDeltaKeyFor(d), bytes)
err = qs.buildQuadWrite(batch, d.Quad, d.ID, d.Action == graph.Add)
if err != nil {
return err
}
delta := int64(1)
if d.Action == graph.Delete {
delta = int64(-1)
}
resizeMap[d.Quad.Subject] += delta
resizeMap[d.Quad.Predicate] += delta
resizeMap[d.Quad.Object] += delta
if d.Quad.Label != "" {
resizeMap[d.Quad.Label] += delta
}
size_change += delta
qs.horizon = d.ID
}
err = qs.db.Write(batch, nil)
for k, v := range resizeMap {
if v != 0 {
err := qs.UpdateValueKeyBy(k, v, batch)
if err != nil {
return err
}
}
}
err := qs.db.Write(batch, qs.writeopts)
if err != nil {
glog.Errorf("Couldn't delete triple %s.", t)
return
glog.Error("Couldn't write to DB for tripleset.")
return err
}
qs.size--
qs.size += size_change
return nil
}
func (qs *TripleStore) buildTripleWrite(batch *leveldb.Batch, t quad.Quad) {
bytes, err := json.Marshal(t)
if err != nil {
glog.Errorf("Couldn't write to buffer for triple %s: %s", t, err)
return
func (qs *TripleStore) buildQuadWrite(batch *leveldb.Batch, q quad.Quad, id int64, isAdd bool) error {
var entry IndexEntry
data, err := qs.db.Get(qs.createKeyFor(spo, q), qs.readopts)
if err != nil && err != leveldb.ErrNotFound {
glog.Error("Couldn't access DB to prepare index: ", err)
return err
}
batch.Put(qs.createKeyFor(spo, t), bytes)
batch.Put(qs.createKeyFor(osp, t), bytes)
batch.Put(qs.createKeyFor(pos, t), bytes)
if t.Get(quad.Label) != "" {
batch.Put(qs.createProvKeyFor(pso, t), bytes)
if err == nil {
// We got something.
err = json.Unmarshal(data, &entry)
if err != nil {
return err
}
} else {
entry.Quad = q
}
}
entry.History = append(entry.History, id)
func (qs *TripleStore) buildWrite(batch *leveldb.Batch, t quad.Quad) {
qs.buildTripleWrite(batch, t)
qs.UpdateValueKeyBy(t.Get(quad.Subject), 1, nil)
qs.UpdateValueKeyBy(t.Get(quad.Predicate), 1, nil)
qs.UpdateValueKeyBy(t.Get(quad.Object), 1, nil)
if t.Get(quad.Label) != "" {
qs.UpdateValueKeyBy(t.Get(quad.Label), 1, nil)
if isAdd && len(entry.History)%2 == 0 {
glog.Error("Entry History is out of sync for", entry)
return errors.New("Odd index history")
}
bytes, err := json.Marshal(entry)
if err != nil {
glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err)
return err
}
batch.Put(qs.createKeyFor(spo, q), bytes)
batch.Put(qs.createKeyFor(osp, q), bytes)
batch.Put(qs.createKeyFor(pos, q), bytes)
if q.Get(quad.Label) != "" {
batch.Put(qs.createKeyFor(cps, q), bytes)
}
return nil
}
type ValueData struct {
@ -227,15 +257,15 @@ type ValueData struct {
Size int64
}
func (qs *TripleStore) UpdateValueKeyBy(name string, amount int, batch *leveldb.Batch) {
value := &ValueData{name, int64(amount)}
func (qs *TripleStore) UpdateValueKeyBy(name string, amount int64, batch *leveldb.Batch) error {
value := &ValueData{name, amount}
key := qs.createValueKeyFor(name)
b, err := qs.db.Get(key, qs.readopts)
// Error getting the node from the database.
if err != nil && err != leveldb.ErrNotFound {
glog.Errorf("Error reading Value %s from the DB.", name)
return
return err
}
// Node exists in the database -- unmarshal and update.
@ -243,58 +273,28 @@ func (qs *TripleStore) UpdateValueKeyBy(name string, amount int, batch *leveldb.
err = json.Unmarshal(b, value)
if err != nil {
glog.Errorf("Error: couldn't reconstruct value: %v", err)
return
return err
}
value.Size += int64(amount)
value.Size += amount
}
// Are we deleting something?
if amount < 0 {
if value.Size <= 0 {
if batch == nil {
qs.db.Delete(key, qs.writeopts)
} else {
batch.Delete(key)
}
return
}
if value.Size <= 0 {
value.Size = 0
}
// Repackage and rewrite.
bytes, err := json.Marshal(&value)
if err != nil {
glog.Errorf("Couldn't write to buffer for value %s: %s", name, err)
return
return err
}
if batch == nil {
qs.db.Put(key, bytes, qs.writeopts)
} else {
batch.Put(key, bytes)
}
}
func (qs *TripleStore) AddTripleSet(t_s []quad.Quad) {
batch := &leveldb.Batch{}
newTs := len(t_s)
resizeMap := make(map[string]int)
for _, t := range t_s {
qs.buildTripleWrite(batch, t)
resizeMap[t.Subject]++
resizeMap[t.Predicate]++
resizeMap[t.Object]++
if t.Label != "" {
resizeMap[t.Label]++
}
}
for k, v := range resizeMap {
qs.UpdateValueKeyBy(k, v, batch)
}
err := qs.db.Write(batch, qs.writeopts)
if err != nil {
glog.Error("Couldn't write to DB for tripleset.")
return
}
qs.size += int64(newTs)
return nil
}
func (qs *TripleStore) Close() {
@ -308,6 +308,16 @@ func (qs *TripleStore) Close() {
} else {
glog.Errorf("Couldn't convert size before closing!")
}
buf.Reset()
err = binary.Write(buf, binary.LittleEndian, qs.horizon)
if err == nil {
werr := qs.db.Put([]byte("__horizon"), buf.Bytes(), qs.writeopts)
if werr != nil {
glog.Error("Couldn't write horizon before closing!")
}
} else {
glog.Errorf("Couldn't convert horizon before closing!")
}
qs.db.Close()
qs.open = false
}
@ -378,23 +388,34 @@ func (qs *TripleStore) SizeOf(k graph.Value) int64 {
return int64(qs.valueData(k.(Token)).Size)
}
func (qs *TripleStore) getSize() {
var size int64
b, err := qs.db.Get([]byte("__size"), qs.readopts)
func (qs *TripleStore) getInt64ForKey(key string, empty int64) (int64, error) {
var out int64
b, err := qs.db.Get([]byte(key), qs.readopts)
if err != nil && err != leveldb.ErrNotFound {
panic("Couldn't read size " + err.Error())
glog.Errorln("Couldn't read " + key + ": " + err.Error())
return 0, err
}
if err == leveldb.ErrNotFound {
// Must be a new database. Cool
qs.size = 0
return
return empty, nil
}
buf := bytes.NewBuffer(b)
err = binary.Read(buf, binary.LittleEndian, &size)
err = binary.Read(buf, binary.LittleEndian, &out)
if err != nil {
glog.Errorln("Error: couldn't parse size")
glog.Errorln("Error: couldn't parse", key)
return 0, err
}
qs.size = size
return out, nil
}
func (qs *TripleStore) getMetadata() error {
var err error
qs.size, err = qs.getInt64ForKey("__size", 0)
if err != nil {
return err
}
qs.horizon, err = qs.getInt64ForKey("__horizon", 0)
return err
}
func (qs *TripleStore) SizeOfPrefix(pre []byte) (int64, error) {

View file

@ -24,25 +24,46 @@ type AllIterator struct {
ts *TripleStore
}
func NewMemstoreAllIterator(ts *TripleStore) *AllIterator {
var out AllIterator
type NodesAllIterator AllIterator
type QuadsAllIterator AllIterator
func NewMemstoreNodesAllIterator(ts *TripleStore) *NodesAllIterator {
var out NodesAllIterator
out.Int64 = *iterator.NewInt64(1, ts.idCounter-1)
out.ts = ts
return &out
}
// No subiterators.
func (it *AllIterator) SubIterators() []graph.Iterator {
func (nit *NodesAllIterator) SubIterators() []graph.Iterator {
return nil
}
func (it *AllIterator) Next() bool {
if !it.Int64.Next() {
func (nit *NodesAllIterator) Next() bool {
if !nit.Int64.Next() {
return false
}
_, ok := it.ts.revIdMap[it.Int64.Result().(int64)]
_, ok := nit.ts.revIdMap[nit.Int64.Result().(int64)]
if !ok {
return it.Next()
return nit.Next()
}
return true
}
func NewMemstoreQuadsAllIterator(ts *TripleStore) *QuadsAllIterator {
var out QuadsAllIterator
out.Int64 = *iterator.NewInt64(1, ts.quadIdCounter-1)
out.ts = ts
return &out
}
func (qit *QuadsAllIterator) Next() bool {
out := qit.Int64.Next()
if out {
i64 := qit.Int64.Result().(int64)
if qit.ts.log[i64].DeletedBy != 0 || qit.ts.log[i64].Action == graph.Delete {
return qit.Next()
}
}
return out
}

View file

@ -26,6 +26,7 @@ import (
type Iterator struct {
uid uint64
ts *TripleStore
tags graph.Tagger
tree *b.Tree
iter *b.Enumerator
@ -37,13 +38,14 @@ func cmp(a, b int64) int {
return int(a - b)
}
func NewIterator(tree *b.Tree, data string) *Iterator {
func NewIterator(tree *b.Tree, data string, ts *TripleStore) *Iterator {
iter, err := tree.SeekFirst()
if err != nil {
iter = nil
}
return &Iterator{
uid: iterator.NextUID(),
ts: ts,
tree: tree,
iter: iter,
data: data,
@ -94,6 +96,7 @@ func (it *Iterator) Clone() graph.Iterator {
m := &Iterator{
uid: iterator.NextUID(),
ts: it.ts,
tree: it.tree,
iter: iter,
data: it.data,
@ -103,11 +106,10 @@ func (it *Iterator) Clone() graph.Iterator {
return m
}
func (it *Iterator) Close() {
if it.iter != nil {
it.iter.Close()
it.iter = nil
}
func (it *Iterator) Close() {}
func (it *Iterator) checkValid(index int64) bool {
return it.ts.log[index].DeletedBy == 0
}
func (it *Iterator) Next() bool {
@ -120,8 +122,10 @@ func (it *Iterator) Next() bool {
if err != nil {
return graph.NextLogOut(it, nil, false)
}
if !it.checkValid(result) {
return it.Next()
}
it.result = result
return graph.NextLogOut(it, it.result, true)
}

View file

@ -31,12 +31,12 @@ func init() {
}, nil)
}
type TripleDirectionIndex struct {
type QuadDirectionIndex struct {
index [4]map[int64]*b.Tree
}
func NewTripleDirectionIndex() TripleDirectionIndex {
return TripleDirectionIndex{[...]map[int64]*b.Tree{
func NewQuadDirectionIndex() QuadDirectionIndex {
return QuadDirectionIndex{[...]map[int64]*b.Tree{
quad.Subject - 1: make(map[int64]*b.Tree),
quad.Predicate - 1: make(map[int64]*b.Tree),
quad.Object - 1: make(map[int64]*b.Tree),
@ -44,34 +44,39 @@ func NewTripleDirectionIndex() TripleDirectionIndex {
}}
}
func (tdi TripleDirectionIndex) Tree(d quad.Direction, id int64) *b.Tree {
func (qdi QuadDirectionIndex) Tree(d quad.Direction, id int64) *b.Tree {
if d < quad.Subject || d > quad.Label {
panic("illegal direction")
}
tree, ok := tdi.index[d-1][id]
tree, ok := qdi.index[d-1][id]
if !ok {
tree = b.TreeNew(cmp)
tdi.index[d-1][id] = tree
qdi.index[d-1][id] = tree
}
return tree
}
func (tdi TripleDirectionIndex) Get(d quad.Direction, id int64) (*b.Tree, bool) {
func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*b.Tree, bool) {
if d < quad.Subject || d > quad.Label {
panic("illegal direction")
}
tree, ok := tdi.index[d-1][id]
tree, ok := qdi.index[d-1][id]
return tree, ok
}
type LogEntry struct {
graph.Delta
DeletedBy int64
}
type TripleStore struct {
idCounter int64
tripleIdCounter int64
idMap map[string]int64
revIdMap map[int64]string
triples []quad.Quad
size int64
index TripleDirectionIndex
idCounter int64
quadIdCounter int64
idMap map[string]int64
revIdMap map[int64]string
log []LogEntry
size int64
index QuadDirectionIndex
// vip_index map[string]map[int64]map[string]map[int64]*b.Tree
}
@ -80,20 +85,28 @@ func newTripleStore() *TripleStore {
idMap: make(map[string]int64),
revIdMap: make(map[int64]string),
// Sentinel null triple so triple indices start at 1
triples: make([]quad.Quad, 1, 200),
// Sentinel null entry so indices start at 1
log: make([]LogEntry, 1, 200),
size: 1,
index: NewTripleDirectionIndex(),
idCounter: 1,
tripleIdCounter: 1,
index: NewQuadDirectionIndex(),
idCounter: 1,
quadIdCounter: 1,
}
}
func (ts *TripleStore) AddTripleSet(triples []quad.Quad) {
for _, t := range triples {
ts.AddTriple(t)
func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error {
for _, d := range deltas {
var err error
if d.Action == graph.Add {
err = ts.AddDelta(d)
} else {
err = ts.RemoveDelta(d)
}
if err != nil {
return err
}
}
return nil
}
const maxInt = int(^uint(0) >> 1)
@ -120,29 +133,29 @@ func (ts *TripleStore) indexOf(t quad.Quad) (int64, bool) {
min, tree = l, index
}
}
it := NewIterator(tree, "")
it := NewIterator(tree, "", ts)
for it.Next() {
val := it.Result()
if t == ts.triples[val.(int64)] {
if t == ts.log[val.(int64)].Quad {
return val.(int64), true
}
}
return 0, false
}
func (ts *TripleStore) AddTriple(t quad.Quad) {
if _, exists := ts.indexOf(t); exists {
return
func (ts *TripleStore) AddDelta(d *graph.Delta) error {
if _, exists := ts.indexOf(d.Quad); exists {
return graph.ErrQuadExists
}
ts.triples = append(ts.triples, t)
tid := ts.tripleIdCounter
qid := ts.quadIdCounter
ts.log = append(ts.log, LogEntry{Delta: *d})
ts.size++
ts.tripleIdCounter++
ts.quadIdCounter++
for d := quad.Subject; d <= quad.Label; d++ {
sid := t.Get(d)
if d == quad.Label && sid == "" {
for dir := quad.Subject; dir <= quad.Label; dir++ {
sid := d.Quad.Get(dir)
if dir == quad.Label && sid == "" {
continue
}
if _, ok := ts.idMap[sid]; !ok {
@ -152,85 +165,60 @@ func (ts *TripleStore) AddTriple(t quad.Quad) {
}
}
for d := quad.Subject; d <= quad.Label; d++ {
if d == quad.Label && t.Get(d) == "" {
for dir := quad.Subject; dir <= quad.Label; dir++ {
if dir == quad.Label && d.Quad.Get(dir) == "" {
continue
}
id := ts.idMap[t.Get(d)]
tree := ts.index.Tree(d, id)
tree.Set(tid, struct{}{})
id := ts.idMap[d.Quad.Get(dir)]
tree := ts.index.Tree(dir, id)
tree.Set(qid, struct{}{})
}
// TODO(barakmich): Add VIP indexing
return nil
}
func (ts *TripleStore) RemoveTriple(t quad.Quad) {
tid, ok := ts.indexOf(t)
if !ok {
return
func (ts *TripleStore) RemoveDelta(d *graph.Delta) error {
prevQuadID, exists := ts.indexOf(d.Quad)
if !exists {
return graph.ErrQuadNotExist
}
ts.triples[tid] = quad.Quad{}
quadID := ts.quadIdCounter
ts.log = append(ts.log, LogEntry{Delta: *d})
ts.log[prevQuadID].DeletedBy = quadID
ts.size--
for d := quad.Subject; d <= quad.Label; d++ {
if d == quad.Label && t.Get(d) == "" {
continue
}
id := ts.idMap[t.Get(d)]
tree := ts.index.Tree(d, id)
tree.Delete(tid)
}
for d := quad.Subject; d <= quad.Label; d++ {
if d == quad.Label && t.Get(d) == "" {
continue
}
id, ok := ts.idMap[t.Get(d)]
if !ok {
continue
}
stillExists := false
for d := quad.Subject; d <= quad.Label; d++ {
if d == quad.Label && t.Get(d) == "" {
continue
}
nodeTree := ts.index.Tree(d, id)
if nodeTree.Len() != 0 {
stillExists = true
break
}
}
if !stillExists {
delete(ts.idMap, t.Get(d))
delete(ts.revIdMap, id)
}
}
ts.quadIdCounter++
return nil
}
func (ts *TripleStore) Quad(index graph.Value) quad.Quad {
return ts.triples[index.(int64)]
return ts.log[index.(int64)].Quad
}
func (ts *TripleStore) TripleIterator(d quad.Direction, value graph.Value) graph.Iterator {
index, ok := ts.index.Get(d, value.(int64))
data := fmt.Sprintf("dir:%s val:%d", d, value.(int64))
if ok {
return NewIterator(index, data)
return NewIterator(index, data, ts)
}
return &iterator.Null{}
}
func (ts *TripleStore) Horizon() int64 {
return ts.log[len(ts.log)-1].ID
}
func (ts *TripleStore) Size() int64 {
return ts.size - 1 // Don't count the sentinel
return ts.size
}
func (ts *TripleStore) DebugPrint() {
for i, t := range ts.triples {
for i, l := range ts.log {
if i == 0 {
continue
}
glog.V(2).Infof("%d: %s", i, t)
glog.V(2).Infof("%d: %#v", i, l)
}
}
@ -243,7 +231,7 @@ func (ts *TripleStore) NameOf(id graph.Value) string {
}
func (ts *TripleStore) TriplesAllIterator() graph.Iterator {
return iterator.NewInt64(0, ts.Size())
return NewMemstoreQuadsAllIterator(ts)
}
func (ts *TripleStore) FixedIterator() graph.FixedIterator {
@ -256,7 +244,7 @@ func (ts *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph.
}
func (ts *TripleStore) NodesAllIterator() graph.Iterator {
return NewMemstoreAllIterator(ts)
return NewMemstoreNodesAllIterator(ts)
}
func (ts *TripleStore) Close() {}

View file

@ -22,6 +22,7 @@ import (
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
"github.com/google/cayley/writer"
)
// This is a simple test graph.
@ -51,13 +52,14 @@ var simpleGraph = []quad.Quad{
{"G", "status", "cool", "status_graph"},
}
func makeTestStore(data []quad.Quad) (*TripleStore, []pair) {
func makeTestStore(data []quad.Quad) (*TripleStore, graph.QuadWriter, []pair) {
seen := make(map[string]struct{})
ts := newTripleStore()
var (
val int64
ind []pair
)
writer, _ := writer.NewSingleReplication(ts, nil)
for _, t := range data {
for _, qp := range []string{t.Subject, t.Predicate, t.Object, t.Label} {
if _, ok := seen[qp]; !ok && qp != "" {
@ -66,9 +68,10 @@ func makeTestStore(data []quad.Quad) (*TripleStore, []pair) {
seen[qp] = struct{}{}
}
}
ts.AddTriple(t)
writer.AddQuad(t)
}
return ts, ind
return ts, writer, ind
}
type pair struct {
@ -77,7 +80,7 @@ type pair struct {
}
func TestMemstore(t *testing.T) {
ts, index := makeTestStore(simpleGraph)
ts, _, index := makeTestStore(simpleGraph)
if size := ts.Size(); size != int64(len(simpleGraph)) {
t.Errorf("Triple store has unexpected size, got:%d expected %d", size, len(simpleGraph))
}
@ -95,7 +98,7 @@ func TestMemstore(t *testing.T) {
}
func TestIteratorsAndNextResultOrderA(t *testing.T) {
ts, _ := makeTestStore(simpleGraph)
ts, _, _ := makeTestStore(simpleGraph)
fixed := ts.FixedIterator()
fixed.Add(ts.ValueOf("C"))
@ -144,7 +147,7 @@ func TestIteratorsAndNextResultOrderA(t *testing.T) {
}
func TestLinksToOptimization(t *testing.T) {
ts, _ := makeTestStore(simpleGraph)
ts, _, _ := makeTestStore(simpleGraph)
fixed := ts.FixedIterator()
fixed.Add(ts.ValueOf("cool"))
@ -172,9 +175,9 @@ func TestLinksToOptimization(t *testing.T) {
}
func TestRemoveTriple(t *testing.T) {
ts, _ := makeTestStore(simpleGraph)
ts, w, _ := makeTestStore(simpleGraph)
ts.RemoveTriple(quad.Quad{"E", "follows", "F", ""})
w.RemoveQuad(quad.Quad{"E", "follows", "F", ""})
fixed := ts.FixedIterator()
fixed.Add(ts.ValueOf("E"))

View file

@ -140,10 +140,9 @@ func (it *Iterator) Clone() graph.Iterator {
func (it *Iterator) Next() bool {
var result struct {
Id string "_id"
//Sub string "Sub"
//Pred string "Pred"
//Obj string "Obj"
Id string "_id"
Added []int64 "Added"
Deleted []int64 "Deleted"
}
found := it.iter.Next(&result)
if !found {
@ -153,6 +152,9 @@ func (it *Iterator) Next() bool {
}
return false
}
if it.collection == "quads" && len(result.Added) <= len(result.Deleted) {
return it.Next()
}
it.result = result.Id
return true
}

View file

@ -18,7 +18,6 @@ import (
"crypto/sha1"
"encoding/hex"
"hash"
"io"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
@ -33,9 +32,6 @@ func init() {
graph.RegisterTripleStore("mongo", true, newTripleStore, createNewMongoGraph)
}
// Guarantee we satisfy graph.Bulkloader.
var _ graph.BulkLoader = (*TripleStore)(nil)
const DefaultDBName = "cayley"
type TripleStore struct {
@ -63,13 +59,21 @@ func createNewMongoGraph(addr string, options graph.Options) error {
Background: true,
Sparse: true,
}
db.C("triples").EnsureIndex(indexOpts)
db.C("quads").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Pred"}
db.C("triples").EnsureIndex(indexOpts)
db.C("quads").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Obj"}
db.C("triples").EnsureIndex(indexOpts)
db.C("quads").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Label"}
db.C("triples").EnsureIndex(indexOpts)
db.C("quads").EnsureIndex(indexOpts)
logOpts := mgo.Index{
Key: []string{"LogID"},
Unique: true,
DropDups: false,
Background: true,
Sparse: true,
}
db.C("log").EnsureIndex(logOpts)
return nil
}
@ -113,114 +117,157 @@ type MongoNode struct {
Size int "Size"
}
func (qs *TripleStore) updateNodeBy(node_name string, inc int) {
var size MongoNode
node := qs.ValueOf(node_name)
err := qs.db.C("nodes").FindId(node).One(&size)
if err != nil {
if err.Error() == "not found" {
// Not found. Okay.
size.Id = node.(string)
size.Name = node_name
size.Size = inc
} else {
glog.Errorf("Error: %v", err)
return
}
} else {
size.Id = node.(string)
size.Name = node_name
size.Size += inc
}
// Removing something...
if inc < 0 {
if size.Size <= 0 {
err := qs.db.C("nodes").RemoveId(node)
if err != nil {
glog.Errorf("Error: %v while removing node %s", err, node_name)
return
}
}
}
_, err2 := qs.db.C("nodes").UpsertId(node, size)
if err2 != nil {
glog.Errorf("Error: %v", err)
}
type MongoLogEntry struct {
LogID int64 "LogID"
Action string "Action"
Key string "Key"
Timestamp int64
}
func (qs *TripleStore) writeTriple(t quad.Quad) bool {
func (qs *TripleStore) updateNodeBy(node_name string, inc int) error {
node := qs.ValueOf(node_name)
doc := bson.M{
"_id": node.(string),
"Name": node_name,
}
upsert := bson.M{
"$setOnInsert": doc,
"$inc": bson.M{
"Size": inc,
},
}
_, err := qs.db.C("nodes").UpsertId(node, upsert)
if err != nil {
glog.Errorf("Error updating node: %v", err)
}
return err
}
func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) error {
var setname string
if proc == graph.Add {
setname = "Added"
} else if proc == graph.Delete {
setname = "Deleted"
}
tripledoc := bson.M{
"_id": qs.getIdForTriple(t),
"Subject": t.Subject,
"Predicate": t.Predicate,
"Object": t.Object,
"Label": t.Label,
}
err := qs.db.C("triples").Insert(tripledoc)
upsert := bson.M{
"$setOnInsert": tripledoc,
"$push": bson.M{
setname: id,
},
}
_, err := qs.db.C("quads").UpsertId(qs.getIdForTriple(t), upsert)
if err != nil {
// Among the reasons I hate MongoDB. "Errors don't happen! Right guys?"
if err.(*mgo.LastError).Code == 11000 {
return false
}
glog.Errorf("Error: %v", err)
}
return err
}
func (qs *TripleStore) checkValid(key string) bool {
var indexEntry struct {
Added []int64 "Added"
Deleted []int64 "Deleted"
}
err := qs.db.C("quads").FindId(key).One(&indexEntry)
if err == mgo.ErrNotFound {
return false
}
if err != nil {
glog.Errorln("Other error checking valid quad: %s %v.", key, err)
return false
}
if len(indexEntry.Added) <= len(indexEntry.Deleted) {
return false
}
return true
}
func (qs *TripleStore) AddTriple(t quad.Quad) {
_ = qs.writeTriple(t)
qs.updateNodeBy(t.Subject, 1)
qs.updateNodeBy(t.Predicate, 1)
qs.updateNodeBy(t.Object, 1)
if t.Label != "" {
qs.updateNodeBy(t.Label, 1)
func (qs *TripleStore) updateLog(d *graph.Delta) error {
var action string
if d.Action == graph.Add {
action = "Add"
} else {
action = "Delete"
}
entry := MongoLogEntry{
LogID: d.ID,
Action: action,
Key: qs.getIdForTriple(d.Quad),
Timestamp: d.Timestamp.UnixNano(),
}
err := qs.db.C("log").Insert(entry)
if err != nil {
glog.Errorf("Error updating log: %v", err)
}
return err
}
func (qs *TripleStore) AddTripleSet(in []quad.Quad) {
func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error {
qs.session.SetSafe(nil)
ids := make(map[string]int)
for _, t := range in {
wrote := qs.writeTriple(t)
if wrote {
ids[t.Subject]++
ids[t.Object]++
ids[t.Predicate]++
if t.Label != "" {
ids[t.Label]++
// Pre-check the existence condition.
for _, d := range in {
key := qs.getIdForTriple(d.Quad)
switch d.Action {
case graph.Add:
if qs.checkValid(key) {
return graph.ErrQuadExists
}
case graph.Delete:
if !qs.checkValid(key) {
return graph.ErrQuadNotExist
}
}
}
if glog.V(2) {
glog.Infoln("Existence verified. Proceeding.")
}
for _, d := range in {
err := qs.updateLog(d)
if err != nil {
return err
}
}
for _, d := range in {
err := qs.updateTriple(d.Quad, d.ID, d.Action)
if err != nil {
return err
}
var countdelta int
if d.Action == graph.Add {
countdelta = 1
} else {
countdelta = -1
}
ids[d.Quad.Subject] += countdelta
ids[d.Quad.Object] += countdelta
ids[d.Quad.Predicate] += countdelta
if d.Quad.Label != "" {
ids[d.Quad.Label] += countdelta
}
}
for k, v := range ids {
qs.updateNodeBy(k, v)
err := qs.updateNodeBy(k, v)
if err != nil {
return err
}
}
qs.session.SetSafe(&mgo.Safe{})
}
func (qs *TripleStore) RemoveTriple(t quad.Quad) {
err := qs.db.C("triples").RemoveId(qs.getIdForTriple(t))
if err == mgo.ErrNotFound {
return
} else if err != nil {
glog.Errorf("Error: %v while removing triple %v", err, t)
return
}
qs.updateNodeBy(t.Subject, -1)
qs.updateNodeBy(t.Predicate, -1)
qs.updateNodeBy(t.Object, -1)
if t.Label != "" {
qs.updateNodeBy(t.Label, -1)
}
return nil
}
func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
var bsonDoc bson.M
err := qs.db.C("triples").FindId(val.(string)).One(&bsonDoc)
err := qs.db.C("quads").FindId(val.(string)).One(&bsonDoc)
if err != nil {
glog.Errorf("Error: Couldn't retrieve triple %s %v", val, err)
glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err)
}
return quad.Quad{
bsonDoc["Subject"].(string),
@ -231,7 +278,7 @@ func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
}
func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator {
return NewIterator(qs, "triples", d, val)
return NewIterator(qs, "quads", d, val)
}
func (qs *TripleStore) NodesAllIterator() graph.Iterator {
@ -239,7 +286,7 @@ func (qs *TripleStore) NodesAllIterator() graph.Iterator {
}
func (qs *TripleStore) TriplesAllIterator() graph.Iterator {
return NewAllIterator(qs, "triples")
return NewAllIterator(qs, "quads")
}
func (qs *TripleStore) ValueOf(s string) graph.Value {
@ -261,7 +308,8 @@ func (qs *TripleStore) NameOf(v graph.Value) string {
}
func (qs *TripleStore) Size() int64 {
count, err := qs.db.C("triples").Count()
// TODO(barakmich): Make size real; store it in the log, and retrieve it.
count, err := qs.db.C("quads").Count()
if err != nil {
glog.Errorf("Error: %v", err)
return 0
@ -269,6 +317,18 @@ func (qs *TripleStore) Size() int64 {
return int64(count)
}
func (qs *TripleStore) Horizon() int64 {
var log MongoLogEntry
err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log)
if err != nil {
if err == mgo.ErrNotFound {
return 0
}
glog.Errorf("Could not get Horizon from Mongo: %v", err)
}
return log.LogID
}
func compareStrings(a, b graph.Value) bool {
return a.(string) == b.(string)
}
@ -298,61 +358,4 @@ func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.V
return val
}
func (qs *TripleStore) BulkLoad(dec quad.Unmarshaler) error {
if qs.Size() != 0 {
return graph.ErrCannotBulkLoad
}
qs.session.SetSafe(nil)
for {
q, err := dec.Unmarshal()
if err != nil {
if err != io.EOF {
return err
}
break
}
qs.writeTriple(q)
}
outputTo := bson.M{"replace": "nodes", "sharded": true}
glog.Infoln("Mapreducing")
job := mgo.MapReduce{
Map: `function() {
var len = this["_id"].length
var s_key = this["_id"].slice(0, len / 4)
var p_key = this["_id"].slice(len / 4, 2 * len / 4)
var o_key = this["_id"].slice(2 * len / 4, 3 * len / 4)
var c_key = this["_id"].slice(3 * len / 4)
emit(s_key, {"_id": s_key, "Name" : this.Subject, "Size" : 1})
emit(p_key, {"_id": p_key, "Name" : this.Predicate, "Size" : 1})
emit(o_key, {"_id": o_key, "Name" : this.Object, "Size" : 1})
if (this.Label != "") {
emit(c_key, {"_id": c_key, "Name" : this.Label, "Size" : 1})
}
}
`,
Reduce: `
function(key, value_list) {
out = {"_id": key, "Name": value_list[0].Name}
count = 0
for (var i = 0; i < value_list.length; i++) {
count = count + value_list[i].Size
}
out["Size"] = count
return out
}
`,
Out: outputTo,
}
qs.db.C("triples").Find(nil).MapReduce(&job, nil)
glog.Infoln("Fixing")
qs.db.Run(bson.D{{"eval", `function() { db.nodes.find().forEach(function (result) {
db.nodes.update({"_id": result._id}, result.value)
}) }`}, {"args", bson.D{}}}, nil)
qs.session.SetSafe(&mgo.Safe{})
return nil
}
// TODO(barakmich): Rewrite bulk loader. For now, iterating around blocks is the way we'll go about it.

99
graph/quadwriter.go Normal file
View file

@ -0,0 +1,99 @@
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package graph
// Defines the interface for consistent replication of a graph instance.
//
// Separate from the backend, this dictates how individual triples get
// identified and replicated consistently across (potentially) multiple
// instances. The simplest case is to keep an append-only log of triple
// changes.
import (
"errors"
"time"
"github.com/google/cayley/quad"
)
type Procedure byte
// The different types of actions a transaction can do.
const (
Add Procedure = iota
Delete
)
type Delta struct {
ID int64
Quad quad.Quad
Action Procedure
Timestamp time.Time
}
type Handle struct {
QuadStore TripleStore
QuadWriter QuadWriter
}
func (h *Handle) Close() {
h.QuadStore.Close()
h.QuadWriter.Close()
}
var ErrQuadExists = errors.New("Quad exists")
var ErrQuadNotExist = errors.New("Quad doesn't exist")
type QuadWriter interface {
// Add a quad to the store.
AddQuad(quad.Quad) error
// Add a set of quads to the store, atomically if possible.
AddQuadSet([]quad.Quad) error
// Removes a quad matching the given one from the database,
// if it exists. Does nothing otherwise.
RemoveQuad(quad.Quad) error
// Cleans up replication and closes the writing aspect of the database.
Close() error
}
type NewQuadWriterFunc func(TripleStore, Options) (QuadWriter, error)
var writerRegistry = make(map[string]NewQuadWriterFunc)
func RegisterWriter(name string, newFunc NewQuadWriterFunc) {
if _, found := writerRegistry[name]; found {
panic("already registered TripleWriter " + name)
}
writerRegistry[name] = newFunc
}
func NewQuadWriter(name string, ts TripleStore, opts Options) (QuadWriter, error) {
newFunc, hasNew := writerRegistry[name]
if !hasNew {
return nil, errors.New("replication: name '" + name + "' is not registered")
}
return newFunc(ts, opts)
}
func WriterMethods() []string {
t := make([]string, 0, len(writerRegistry))
for n := range writerRegistry {
t = append(t, n)
}
return t
}

View file

@ -42,15 +42,9 @@ import (
type Value interface{}
type TripleStore interface {
// Add a triple to the store.
AddTriple(quad.Quad)
// Add a set of triples to the store, atomically if possible.
AddTripleSet([]quad.Quad)
// Removes a triple matching the given one from the database,
// if it exists. Does nothing otherwise.
RemoveTriple(quad.Quad)
// The only way in is through building a transaction, which
// is done by a replication strategy.
ApplyDeltas([]*Delta) error
// Given an opaque token, returns the triple for that token from the store.
Quad(Value) quad.Quad
@ -75,6 +69,9 @@ type TripleStore interface {
// Returns the number of triples currently stored.
Size() int64
// The last replicated transaction ID that this triplestore has verified.
Horizon() int64
// Creates a fixed iterator which can compare Values
FixedIterator() FixedIterator