Merge branch 'master' into boltdb

This commit is contained in:
Barak Michener 2014-08-16 03:32:46 -04:00
commit d74cd3e93e
14 changed files with 1606 additions and 245 deletions

View file

@ -1,13 +1,14 @@
language: go language: go
go: go:
- 1.2
- 1.3 - 1.3
- 1.3.1
- tip - tip
install: install:
- go get github.com/badgerodon/peg - go get github.com/badgerodon/peg
- go get github.com/barakmich/glog - go get github.com/barakmich/glog
- go get github.com/cznic/mathutil
- go get github.com/julienschmidt/httprouter - go get github.com/julienschmidt/httprouter
- go get github.com/petar/GoLLRB/llrb - go get github.com/petar/GoLLRB/llrb
- go get github.com/peterh/liner - go get github.com/peterh/liner

View file

@ -36,7 +36,7 @@ func (qs *store) ValueOf(s string) graph.Value {
return nil return nil
} }
func (qs *store) ApplyDeltas([]*graph.Delta) error { return nil } func (qs *store) ApplyDeltas([]graph.Delta) error { return nil }
func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} } func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} }

View file

@ -43,9 +43,9 @@ type Iterator struct {
result graph.Value result graph.Value
} }
func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) graph.Iterator { func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) *Iterator {
vb := value.(Token) vb := value.(Token)
p := make([]byte, 0, 2+qs.hasherSize) p := make([]byte, 0, 2+hashSize)
p = append(p, []byte(prefix)...) p = append(p, []byte(prefix)...)
p = append(p, []byte(vb[1:])...) p = append(p, []byte(vb[1:])...)
@ -70,7 +70,6 @@ func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleS
it.open = false it.open = false
it.iter.Release() it.iter.Release()
glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix) glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix)
return &iterator.Null{}
} }
return &it return &it
@ -108,7 +107,7 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) {
func (it *Iterator) Clone() graph.Iterator { func (it *Iterator) Clone() graph.Iterator {
out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs) out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs)
out.Tagger().CopyFrom(it) out.tags.CopyFrom(it)
return out return out
} }
@ -180,45 +179,45 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int {
case quad.Subject: case quad.Subject:
return 2 return 2
case quad.Predicate: case quad.Predicate:
return qs.hasherSize + 2 return hashSize + 2
case quad.Object: case quad.Object:
return 2*qs.hasherSize + 2 return 2*hashSize + 2
case quad.Label: case quad.Label:
return 3*qs.hasherSize + 2 return 3*hashSize + 2
} }
} }
if bytes.Equal(prefix, []byte("po")) { if bytes.Equal(prefix, []byte("po")) {
switch d { switch d {
case quad.Subject: case quad.Subject:
return 2*qs.hasherSize + 2 return 2*hashSize + 2
case quad.Predicate: case quad.Predicate:
return 2 return 2
case quad.Object: case quad.Object:
return qs.hasherSize + 2 return hashSize + 2
case quad.Label: case quad.Label:
return 3*qs.hasherSize + 2 return hashSize + 2
} }
} }
if bytes.Equal(prefix, []byte("os")) { if bytes.Equal(prefix, []byte("os")) {
switch d { switch d {
case quad.Subject: case quad.Subject:
return qs.hasherSize + 2 return hashSize + 2
case quad.Predicate: case quad.Predicate:
return 2*qs.hasherSize + 2 return 2*hashSize + 2
case quad.Object: case quad.Object:
return 2 return 2
case quad.Label: case quad.Label:
return 3*qs.hasherSize + 2 return 3*hashSize + 2
} }
} }
if bytes.Equal(prefix, []byte("cp")) { if bytes.Equal(prefix, []byte("cp")) {
switch d { switch d {
case quad.Subject: case quad.Subject:
return 2*qs.hasherSize + 2 return 2*hashSize + 2
case quad.Predicate: case quad.Predicate:
return qs.hasherSize + 2 return hashSize + 2
case quad.Object: case quad.Object:
return 3*qs.hasherSize + 2 return 3*hashSize + 2
case quad.Label: case quad.Label:
return 2 return 2
} }

View file

@ -22,6 +22,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"hash" "hash"
"sync"
"github.com/barakmich/glog" "github.com/barakmich/glog"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
@ -43,6 +44,13 @@ const (
DefaultWriteBufferSize = 20 DefaultWriteBufferSize = 20
) )
var (
hashPool = sync.Pool{
New: func() interface{} { return sha1.New() },
}
hashSize = sha1.Size
)
type Token []byte type Token []byte
func (t Token) Key() interface{} { func (t Token) Key() interface{} {
@ -56,9 +64,6 @@ type TripleStore struct {
open bool open bool
size int64 size int64
horizon int64 horizon int64
hasher hash.Hash
hasherSize int
makeHasher func() hash.Hash
writeopts *opt.WriteOptions writeopts *opt.WriteOptions
readopts *opt.ReadOptions readopts *opt.ReadOptions
} }
@ -98,8 +103,6 @@ func newTripleStore(path string, options graph.Options) (graph.TripleStore, erro
write_buffer_mb = val write_buffer_mb = val
} }
qs.dbOpts.WriteBuffer = write_buffer_mb * opt.MiB qs.dbOpts.WriteBuffer = write_buffer_mb * opt.MiB
qs.hasherSize = sha1.Size
qs.makeHasher = sha1.New
qs.writeopts = &opt.WriteOptions{ qs.writeopts = &opt.WriteOptions{
Sync: false, Sync: false,
} }
@ -136,7 +139,7 @@ func (qs *TripleStore) Horizon() int64 {
return qs.horizon return qs.horizon
} }
func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte { func (qa *TripleStore) createDeltaKeyFor(d graph.Delta) []byte {
key := make([]byte, 0, 19) key := make([]byte, 0, 19)
key = append(key, 'd') key = append(key, 'd')
key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...) key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...)
@ -144,22 +147,20 @@ func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte {
} }
func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte {
hasher := qs.makeHasher() key := make([]byte, 0, 2+(hashSize*3))
key := make([]byte, 0, 2+(qs.hasherSize*3))
// TODO(kortschak) Remove dependence on String() method. // TODO(kortschak) Remove dependence on String() method.
key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...) key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[0]), hasher)...) key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[1]), hasher)...) key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[2]), hasher)...) key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[3]), hasher)...) key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...)
return key return key
} }
func (qs *TripleStore) createValueKeyFor(s string) []byte { func (qs *TripleStore) createValueKeyFor(s string) []byte {
hasher := qs.makeHasher() key := make([]byte, 0, 1+hashSize)
key := make([]byte, 0, 1+qs.hasherSize)
key = append(key, []byte("z")...) key = append(key, []byte("z")...)
key = append(key, qs.convertStringToByteHash(s, hasher)...) key = append(key, qs.convertStringToByteHash(s)...)
return key return key
} }
@ -176,7 +177,7 @@ var (
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
) )
func (qs *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { func (qs *TripleStore) ApplyDeltas(deltas []graph.Delta) error {
batch := &leveldb.Batch{} batch := &leveldb.Batch{}
resizeMap := make(map[string]int64) resizeMap := make(map[string]int64)
size_change := int64(0) size_change := int64(0)
@ -346,11 +347,13 @@ func (qs *TripleStore) Quad(k graph.Value) quad.Quad {
return triple return triple
} }
func (qs *TripleStore) convertStringToByteHash(s string, hasher hash.Hash) []byte { func (qs *TripleStore) convertStringToByteHash(s string) []byte {
hasher.Reset() h := hashPool.Get().(hash.Hash)
key := make([]byte, 0, qs.hasherSize) h.Reset()
hasher.Write([]byte(s)) defer hashPool.Put(h)
key = hasher.Sum(key) key := make([]byte, 0, hashSize)
h.Write([]byte(s))
key = h.Sum(key)
return key return key
} }
@ -467,7 +470,7 @@ func (qs *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph.
v := val.(Token) v := val.(Token)
offset := PositionOf(v[0:2], d, qs) offset := PositionOf(v[0:2], d, qs)
if offset != -1 { if offset != -1 {
return Token(append([]byte("z"), v[offset:offset+qs.hasherSize]...)) return Token(append([]byte("z"), v[offset:offset+hashSize]...))
} else { } else {
return Token(qs.Quad(val).Get(d)) return Token(qs.Quad(val).Get(d))
} }

View file

@ -35,17 +35,17 @@ func NewMemstoreNodesAllIterator(ts *TripleStore) *NodesAllIterator {
} }
// No subiterators. // No subiterators.
func (nit *NodesAllIterator) SubIterators() []graph.Iterator { func (it *NodesAllIterator) SubIterators() []graph.Iterator {
return nil return nil
} }
func (nit *NodesAllIterator) Next() bool { func (it *NodesAllIterator) Next() bool {
if !nit.Int64.Next() { if !it.Int64.Next() {
return false return false
} }
_, ok := nit.ts.revIdMap[nit.Int64.Result().(int64)] _, ok := it.ts.revIdMap[it.Int64.Result().(int64)]
if !ok { if !ok {
return nit.Next() return it.Next()
} }
return true return true
} }

972
graph/memstore/b/keys.go Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,396 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package b
import (
"math"
"runtime/debug"
"testing"
"github.com/cznic/mathutil"
)
func rng() *mathutil.FC32 {
x, err := mathutil.NewFC32(math.MinInt32/4, math.MaxInt32/4, false)
if err != nil {
panic(err)
}
return x
}
func cmp(a, b int64) int {
return int(a - b)
}
func BenchmarkSetSeq1e3(b *testing.B) {
benchmarkSetSeq(b, 1e3)
}
func BenchmarkSetSeq1e4(b *testing.B) {
benchmarkSetSeq(b, 1e4)
}
func BenchmarkSetSeq1e5(b *testing.B) {
benchmarkSetSeq(b, 1e5)
}
func BenchmarkSetSeq1e6(b *testing.B) {
benchmarkSetSeq(b, 1e6)
}
func benchmarkSetSeq(b *testing.B, n int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
r := TreeNew(cmp)
debug.FreeOSMemory()
b.StartTimer()
for j := int64(0); j < int64(n); j++ {
r.Set(j, struct{}{})
}
b.StopTimer()
r.Close()
}
b.StopTimer()
}
func BenchmarkGetSeq1e3(b *testing.B) {
benchmarkGetSeq(b, 1e3)
}
func BenchmarkGetSeq1e4(b *testing.B) {
benchmarkGetSeq(b, 1e4)
}
func BenchmarkGetSeq1e5(b *testing.B) {
benchmarkGetSeq(b, 1e5)
}
func BenchmarkGetSeq1e6(b *testing.B) {
benchmarkGetSeq(b, 1e6)
}
func benchmarkGetSeq(b *testing.B, n int) {
r := TreeNew(cmp)
for i := int64(0); i < int64(n); i++ {
r.Set(i, struct{}{})
}
debug.FreeOSMemory()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := int64(0); j < int64(n); j++ {
r.Get(j)
}
}
b.StopTimer()
r.Close()
}
func BenchmarkSetRnd1e3(b *testing.B) {
benchmarkSetRnd(b, 1e3)
}
func BenchmarkSetRnd1e4(b *testing.B) {
benchmarkSetRnd(b, 1e4)
}
func BenchmarkSetRnd1e5(b *testing.B) {
benchmarkSetRnd(b, 1e5)
}
func BenchmarkSetRnd1e6(b *testing.B) {
benchmarkSetRnd(b, 1e6)
}
func benchmarkSetRnd(b *testing.B, n int) {
rng := rng()
a := make([]int, n)
for i := range a {
a[i] = rng.Next()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
r := TreeNew(cmp)
debug.FreeOSMemory()
b.StartTimer()
for _, v := range a {
r.Set(int64(v), struct{}{})
}
b.StopTimer()
r.Close()
}
b.StopTimer()
}
func BenchmarkGetRnd1e3(b *testing.B) {
benchmarkGetRnd(b, 1e3)
}
func BenchmarkGetRnd1e4(b *testing.B) {
benchmarkGetRnd(b, 1e4)
}
func BenchmarkGetRnd1e5(b *testing.B) {
benchmarkGetRnd(b, 1e5)
}
func BenchmarkGetRnd1e6(b *testing.B) {
benchmarkGetRnd(b, 1e6)
}
func benchmarkGetRnd(b *testing.B, n int) {
r := TreeNew(cmp)
rng := rng()
a := make([]int64, n)
for i := range a {
a[i] = int64(rng.Next())
}
for _, v := range a {
r.Set(v, struct{}{})
}
debug.FreeOSMemory()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, v := range a {
r.Get(v)
}
}
b.StopTimer()
r.Close()
}
func BenchmarkDelSeq1e3(b *testing.B) {
benchmarkDelSeq(b, 1e3)
}
func BenchmarkDelSeq1e4(b *testing.B) {
benchmarkDelSeq(b, 1e4)
}
func BenchmarkDelSeq1e5(b *testing.B) {
benchmarkDelSeq(b, 1e5)
}
func BenchmarkDelSeq1e6(b *testing.B) {
benchmarkDelSeq(b, 1e6)
}
func benchmarkDelSeq(b *testing.B, n int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
r := TreeNew(cmp)
for j := int64(0); j < int64(n); j++ {
r.Set(j, struct{}{})
}
debug.FreeOSMemory()
b.StartTimer()
for j := int64(0); j < int64(n); j++ {
r.Delete(j)
}
}
b.StopTimer()
}
func BenchmarkDelRnd1e3(b *testing.B) {
benchmarkDelRnd(b, 1e3)
}
func BenchmarkDelRnd1e4(b *testing.B) {
benchmarkDelRnd(b, 1e4)
}
func BenchmarkDelRnd1e5(b *testing.B) {
benchmarkDelRnd(b, 1e5)
}
func BenchmarkDelRnd1e6(b *testing.B) {
benchmarkDelRnd(b, 1e6)
}
func benchmarkDelRnd(b *testing.B, n int) {
rng := rng()
a := make([]int64, n)
for i := range a {
a[i] = int64(rng.Next())
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
r := TreeNew(cmp)
for _, v := range a {
r.Set(v, struct{}{})
}
debug.FreeOSMemory()
b.StartTimer()
for _, v := range a {
r.Delete(v)
}
b.StopTimer()
r.Close()
}
b.StopTimer()
}
func BenchmarkSeekSeq1e3(b *testing.B) {
benchmarkSeekSeq(b, 1e3)
}
func BenchmarkSeekSeq1e4(b *testing.B) {
benchmarkSeekSeq(b, 1e4)
}
func BenchmarkSeekSeq1e5(b *testing.B) {
benchmarkSeekSeq(b, 1e5)
}
func BenchmarkSeekSeq1e6(b *testing.B) {
benchmarkSeekSeq(b, 1e6)
}
func benchmarkSeekSeq(b *testing.B, n int) {
for i := 0; i < b.N; i++ {
b.StopTimer()
t := TreeNew(cmp)
for j := int64(0); j < int64(n); j++ {
t.Set(j, struct{}{})
}
debug.FreeOSMemory()
b.StartTimer()
for j := int64(0); j < int64(n); j++ {
e, _ := t.Seek(j)
e.Close()
}
b.StopTimer()
t.Close()
}
b.StopTimer()
}
func BenchmarkSeekRnd1e3(b *testing.B) {
benchmarkSeekRnd(b, 1e3)
}
func BenchmarkSeekRnd1e4(b *testing.B) {
benchmarkSeekRnd(b, 1e4)
}
func BenchmarkSeekRnd1e5(b *testing.B) {
benchmarkSeekRnd(b, 1e5)
}
func BenchmarkSeekRnd1e6(b *testing.B) {
benchmarkSeekRnd(b, 1e6)
}
func benchmarkSeekRnd(b *testing.B, n int) {
r := TreeNew(cmp)
rng := rng()
a := make([]int64, n)
for i := range a {
a[i] = int64(rng.Next())
}
for _, v := range a {
r.Set(v, struct{}{})
}
debug.FreeOSMemory()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, v := range a {
e, _ := r.Seek(v)
e.Close()
}
}
b.StopTimer()
r.Close()
}
func BenchmarkNext1e3(b *testing.B) {
benchmarkNext(b, 1e3)
}
func BenchmarkNext1e4(b *testing.B) {
benchmarkNext(b, 1e4)
}
func BenchmarkNext1e5(b *testing.B) {
benchmarkNext(b, 1e5)
}
func BenchmarkNext1e6(b *testing.B) {
benchmarkNext(b, 1e6)
}
func benchmarkNext(b *testing.B, n int) {
t := TreeNew(cmp)
for i := int64(0); i < int64(n); i++ {
t.Set(i, struct{}{})
}
debug.FreeOSMemory()
b.ResetTimer()
for i := 0; i < b.N; i++ {
en, err := t.SeekFirst()
if err != nil {
b.Fatal(err)
}
m := 0
for {
if _, _, err = en.Next(); err != nil {
break
}
m++
}
if m != n {
b.Fatal(m)
}
}
b.StopTimer()
t.Close()
}
func BenchmarkPrev1e3(b *testing.B) {
benchmarkPrev(b, 1e3)
}
func BenchmarkPrev1e4(b *testing.B) {
benchmarkPrev(b, 1e4)
}
func BenchmarkPrev1e5(b *testing.B) {
benchmarkPrev(b, 1e5)
}
func BenchmarkPrev1e6(b *testing.B) {
benchmarkPrev(b, 1e6)
}
func benchmarkPrev(b *testing.B, n int) {
t := TreeNew(cmp)
for i := int64(0); i < int64(n); i++ {
t.Set(i, struct{}{})
}
debug.FreeOSMemory()
b.ResetTimer()
for i := 0; i < b.N; i++ {
en, err := t.SeekLast()
if err != nil {
b.Fatal(err)
}
m := 0
for {
if _, _, err = en.Prev(); err != nil {
break
}
m++
}
if m != n {
b.Fatal(m)
}
}
}

View file

@ -19,48 +19,35 @@ import (
"math" "math"
"strings" "strings"
"github.com/petar/GoLLRB/llrb"
"github.com/google/cayley/graph" "github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator" "github.com/google/cayley/graph/iterator"
"github.com/google/cayley/graph/memstore/b"
) )
type Iterator struct { type Iterator struct {
uid uint64 uid uint64
ts *TripleStore ts *TripleStore
tags graph.Tagger tags graph.Tagger
tree *llrb.LLRB tree *b.Tree
iter *b.Enumerator
data string data string
isRunning bool
iterLast Int64
result graph.Value result graph.Value
} }
type Int64 int64 func cmp(a, b int64) int {
return int(a - b)
func (i Int64) Less(than llrb.Item) bool {
return i < than.(Int64)
} }
func IterateOne(tree *llrb.LLRB, last Int64) Int64 { func NewIterator(tree *b.Tree, data string, ts *TripleStore) *Iterator {
var next Int64 iter, err := tree.SeekFirst()
tree.AscendGreaterOrEqual(last, func(i llrb.Item) bool { if err != nil {
if i.(Int64) == last { iter = nil
return true
} else {
next = i.(Int64)
return false
} }
})
return next
}
func NewLlrbIterator(tree *llrb.LLRB, data string, ts *TripleStore) *Iterator {
return &Iterator{ return &Iterator{
uid: iterator.NextUID(), uid: iterator.NextUID(),
ts: ts, ts: ts,
tree: tree, tree: tree,
iterLast: Int64(-1), iter: iter,
data: data, data: data,
} }
} }
@ -70,7 +57,11 @@ func (it *Iterator) UID() uint64 {
} }
func (it *Iterator) Reset() { func (it *Iterator) Reset() {
it.iterLast = Int64(-1) var err error
it.iter, err = it.tree.SeekFirst()
if err != nil {
it.iter = nil
}
} }
func (it *Iterator) Tagger() *graph.Tagger { func (it *Iterator) Tagger() *graph.Tagger {
@ -88,8 +79,30 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) {
} }
func (it *Iterator) Clone() graph.Iterator { func (it *Iterator) Clone() graph.Iterator {
m := NewLlrbIterator(it.tree, it.data, it.ts) var iter *b.Enumerator
if it.result != nil {
var ok bool
iter, ok = it.tree.Seek(it.result.(int64))
if !ok {
panic("value unexpectedly missing")
}
} else {
var err error
iter, err = it.tree.SeekFirst()
if err != nil {
iter = nil
}
}
m := &Iterator{
uid: iterator.NextUID(),
ts: it.ts,
tree: it.tree,
iter: iter,
data: it.data,
}
m.tags.CopyFrom(it) m.tags.CopyFrom(it)
return m return m
} }
@ -101,14 +114,18 @@ func (it *Iterator) checkValid(index int64) bool {
func (it *Iterator) Next() bool { func (it *Iterator) Next() bool {
graph.NextLogIn(it) graph.NextLogIn(it)
if it.tree.Max() == nil || it.iterLast == it.tree.Max().(Int64) {
if it.iter == nil {
return graph.NextLogOut(it, nil, false) return graph.NextLogOut(it, nil, false)
} }
it.iterLast = IterateOne(it.tree, it.iterLast) result, _, err := it.iter.Next()
if !it.checkValid(int64(it.iterLast)) { if err != nil {
return graph.NextLogOut(it, nil, false)
}
if !it.checkValid(result) {
return it.Next() return it.Next()
} }
it.result = int64(it.iterLast) it.result = result
return graph.NextLogOut(it, it.result, true) return graph.NextLogOut(it, it.result, true)
} }
@ -135,7 +152,7 @@ func (it *Iterator) Size() (int64, bool) {
func (it *Iterator) Contains(v graph.Value) bool { func (it *Iterator) Contains(v graph.Value) bool {
graph.ContainsLogIn(it, v) graph.ContainsLogIn(it, v)
if it.tree.Has(Int64(v.(int64))) && it.checkValid(v.(int64)) { if _, ok := it.tree.Get(v.(int64)); ok {
it.result = v it.result = v
return graph.ContainsLogOut(it, v, true) return graph.ContainsLogOut(it, v, true)
} }
@ -150,7 +167,7 @@ func (it *Iterator) DebugString(indent int) string {
var memType graph.Type var memType graph.Type
func init() { func init() {
memType = graph.RegisterIterator("llrb") memType = graph.RegisterIterator("b+tree")
} }
func Type() graph.Type { return memType } func Type() graph.Type { return memType }

View file

@ -18,11 +18,11 @@ import (
"fmt" "fmt"
"github.com/barakmich/glog" "github.com/barakmich/glog"
"github.com/google/cayley/graph" "github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator" "github.com/google/cayley/graph/iterator"
"github.com/google/cayley/graph/memstore/b"
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
"github.com/petar/GoLLRB/llrb"
) )
func init() { func init() {
@ -32,47 +32,36 @@ func init() {
} }
type QuadDirectionIndex struct { type QuadDirectionIndex struct {
subject map[int64]*llrb.LLRB index [4]map[int64]*b.Tree
predicate map[int64]*llrb.LLRB
object map[int64]*llrb.LLRB
label map[int64]*llrb.LLRB
} }
func NewQuadDirectionIndex() *QuadDirectionIndex { func NewQuadDirectionIndex() QuadDirectionIndex {
var qdi QuadDirectionIndex return QuadDirectionIndex{[...]map[int64]*b.Tree{
qdi.subject = make(map[int64]*llrb.LLRB) quad.Subject - 1: make(map[int64]*b.Tree),
qdi.predicate = make(map[int64]*llrb.LLRB) quad.Predicate - 1: make(map[int64]*b.Tree),
qdi.object = make(map[int64]*llrb.LLRB) quad.Object - 1: make(map[int64]*b.Tree),
qdi.label = make(map[int64]*llrb.LLRB) quad.Label - 1: make(map[int64]*b.Tree),
return &qdi }}
} }
func (qdi *QuadDirectionIndex) GetForDir(d quad.Direction) map[int64]*llrb.LLRB { func (qdi QuadDirectionIndex) Tree(d quad.Direction, id int64) *b.Tree {
switch d { if d < quad.Subject || d > quad.Label {
case quad.Subject:
return qdi.subject
case quad.Object:
return qdi.object
case quad.Predicate:
return qdi.predicate
case quad.Label:
return qdi.label
}
panic("illegal direction") panic("illegal direction")
}
func (qdi *QuadDirectionIndex) GetOrCreate(d quad.Direction, id int64) *llrb.LLRB {
directionIndex := qdi.GetForDir(d)
if _, ok := directionIndex[id]; !ok {
directionIndex[id] = llrb.New()
} }
return directionIndex[id] tree, ok := qdi.index[d-1][id]
if !ok {
tree = b.TreeNew(cmp)
qdi.index[d-1][id] = tree
}
return tree
} }
func (qdi *QuadDirectionIndex) Get(d quad.Direction, id int64) (*llrb.LLRB, bool) { func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*b.Tree, bool) {
directionIndex := qdi.GetForDir(d) if d < quad.Subject || d > quad.Label {
tree, exists := directionIndex[id] panic("illegal direction")
return tree, exists }
tree, ok := qdi.index[d-1][id]
return tree, ok
} }
type LogEntry struct { type LogEntry struct {
@ -88,24 +77,24 @@ type TripleStore struct {
log []LogEntry log []LogEntry
size int64 size int64
index QuadDirectionIndex index QuadDirectionIndex
// vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree // vip_index map[string]map[int64]map[string]map[int64]*b.Tree
} }
func newTripleStore() *TripleStore { func newTripleStore() *TripleStore {
var ts TripleStore return &TripleStore{
ts.idMap = make(map[string]int64) idMap: make(map[string]int64),
ts.revIdMap = make(map[int64]string) revIdMap: make(map[int64]string),
ts.log = make([]LogEntry, 1, 200)
// Sentinel null entry so indices start at 1 // Sentinel null entry so indices start at 1
ts.log[0] = LogEntry{} log: make([]LogEntry, 1, 200),
ts.index = *NewQuadDirectionIndex()
ts.idCounter = 1 index: NewQuadDirectionIndex(),
ts.quadIdCounter = 1 idCounter: 1,
return &ts quadIdCounter: 1,
}
} }
func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { func (ts *TripleStore) ApplyDeltas(deltas []graph.Delta) error {
for _, d := range deltas { for _, d := range deltas {
var err error var err error
if d.Action == graph.Add { if d.Action == graph.Add {
@ -120,47 +109,47 @@ func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error {
return nil return nil
} }
func (ts *TripleStore) quadExists(t quad.Quad) (bool, int64) { const maxInt = int(^uint(0) >> 1)
smallest := -1
var smallest_tree *llrb.LLRB func (ts *TripleStore) indexOf(t quad.Quad) (int64, bool) {
min := maxInt
var tree *b.Tree
for d := quad.Subject; d <= quad.Label; d++ { for d := quad.Subject; d <= quad.Label; d++ {
sid := t.Get(d) sid := t.Get(d)
if d == quad.Label && sid == "" { if d == quad.Label && sid == "" {
continue continue
} }
id, ok := ts.idMap[sid] id, ok := ts.idMap[sid]
// If we've never heard about a node, it most not exist // If we've never heard about a node, it must not exist
if !ok { if !ok {
return false, 0 return 0, false
} }
index, exists := ts.index.Get(d, id) index, ok := ts.index.Get(d, id)
if !exists { if !ok {
// If it's never been indexed in this direction, it can't exist. // If it's never been indexed in this direction, it can't exist.
return false, 0 return 0, false
} }
if smallest == -1 || index.Len() < smallest { if l := index.Len(); l < min {
smallest = index.Len() min, tree = l, index
smallest_tree = index
} }
} }
it := NewLlrbIterator(smallest_tree, "", ts) it := NewIterator(tree, "", ts)
for it.Next() { for it.Next() {
val := it.Result() val := it.Result()
if t == ts.log[val.(int64)].Quad { if t == ts.log[val.(int64)].Quad {
return true, val.(int64) return val.(int64), true
} }
} }
return false, 0 return 0, false
} }
func (ts *TripleStore) AddDelta(d *graph.Delta) error { func (ts *TripleStore) AddDelta(d graph.Delta) error {
if exists, _ := ts.quadExists(d.Quad); exists { if _, exists := ts.indexOf(d.Quad); exists {
return graph.ErrQuadExists return graph.ErrQuadExists
} }
var quadID int64 qid := ts.quadIdCounter
quadID = ts.quadIdCounter ts.log = append(ts.log, LogEntry{Delta: d})
ts.log = append(ts.log, LogEntry{Delta: *d})
ts.size++ ts.size++
ts.quadIdCounter++ ts.quadIdCounter++
@ -181,25 +170,22 @@ func (ts *TripleStore) AddDelta(d *graph.Delta) error {
continue continue
} }
id := ts.idMap[d.Quad.Get(dir)] id := ts.idMap[d.Quad.Get(dir)]
tree := ts.index.GetOrCreate(dir, id) tree := ts.index.Tree(dir, id)
tree.ReplaceOrInsert(Int64(quadID)) tree.Set(qid, struct{}{})
} }
// TODO(barakmich): Add VIP indexing // TODO(barakmich): Add VIP indexing
return nil return nil
} }
func (ts *TripleStore) RemoveDelta(d *graph.Delta) error { func (ts *TripleStore) RemoveDelta(d graph.Delta) error {
var prevQuadID int64 prevQuadID, exists := ts.indexOf(d.Quad)
var exists bool if !exists {
prevQuadID = 0
if exists, prevQuadID = ts.quadExists(d.Quad); !exists {
return graph.ErrQuadNotExist return graph.ErrQuadNotExist
} }
var quadID int64 quadID := ts.quadIdCounter
quadID = ts.quadIdCounter ts.log = append(ts.log, LogEntry{Delta: d})
ts.log = append(ts.log, LogEntry{Delta: *d})
ts.log[prevQuadID].DeletedBy = quadID ts.log[prevQuadID].DeletedBy = quadID
ts.size-- ts.size--
ts.quadIdCounter++ ts.quadIdCounter++
@ -214,7 +200,7 @@ func (ts *TripleStore) TripleIterator(d quad.Direction, value graph.Value) graph
index, ok := ts.index.Get(d, value.(int64)) index, ok := ts.index.Get(d, value.(int64))
data := fmt.Sprintf("dir:%s val:%d", d, value.(int64)) data := fmt.Sprintf("dir:%s val:%d", d, value.(int64))
if ok { if ok {
return NewLlrbIterator(index, data, ts) return NewIterator(index, data, ts)
} }
return &iterator.Null{} return &iterator.Null{}
} }
@ -260,4 +246,5 @@ func (ts *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph.
func (ts *TripleStore) NodesAllIterator() graph.Iterator { func (ts *TripleStore) NodesAllIterator() graph.Iterator {
return NewMemstoreNodesAllIterator(ts) return NewMemstoreNodesAllIterator(ts)
} }
func (ts *TripleStore) Close() {} func (ts *TripleStore) Close() {}

View file

@ -45,17 +45,7 @@ type Iterator struct {
func NewIterator(qs *TripleStore, collection string, d quad.Direction, val graph.Value) *Iterator { func NewIterator(qs *TripleStore, collection string, d quad.Direction, val graph.Value) *Iterator {
name := qs.NameOf(val) name := qs.NameOf(val)
var constraint bson.M constraint := bson.M{d.String(): name}
switch d {
case quad.Subject:
constraint = bson.M{"Subject": name}
case quad.Predicate:
constraint = bson.M{"Predicate": name}
case quad.Object:
constraint = bson.M{"Object": name}
case quad.Label:
constraint = bson.M{"Label": name}
}
size, err := qs.db.C(collection).Find(constraint).Count() size, err := qs.db.C(collection).Find(constraint).Count()
if err != nil { if err != nil {
@ -187,13 +177,13 @@ func (it *Iterator) Contains(v graph.Value) bool {
case quad.Subject: case quad.Subject:
offset = 0 offset = 0
case quad.Predicate: case quad.Predicate:
offset = (it.qs.hasherSize * 2) offset = (hashSize * 2)
case quad.Object: case quad.Object:
offset = (it.qs.hasherSize * 2) * 2 offset = (hashSize * 2) * 2
case quad.Label: case quad.Label:
offset = (it.qs.hasherSize * 2) * 3 offset = (hashSize * 2) * 3
} }
val := v.(string)[offset : it.qs.hasherSize*2+offset] val := v.(string)[offset : hashSize*2+offset]
if val == it.hash { if val == it.hash {
it.result = v it.result = v
return graph.ContainsLogOut(it, v, true) return graph.ContainsLogOut(it, v, true)

View file

@ -18,6 +18,7 @@ import (
"crypto/sha1" "crypto/sha1"
"encoding/hex" "encoding/hex"
"hash" "hash"
"sync"
"gopkg.in/mgo.v2" "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
@ -34,11 +35,16 @@ func init() {
const DefaultDBName = "cayley" const DefaultDBName = "cayley"
var (
hashPool = sync.Pool{
New: func() interface{} { return sha1.New() },
}
hashSize = sha1.Size
)
type TripleStore struct { type TripleStore struct {
session *mgo.Session session *mgo.Session
db *mgo.Database db *mgo.Database
hasherSize int
makeHasher func() hash.Hash
idCache *IDLru idCache *IDLru
} }
@ -54,18 +60,18 @@ func createNewMongoGraph(addr string, options graph.Options) error {
} }
db := conn.DB(dbName) db := conn.DB(dbName)
indexOpts := mgo.Index{ indexOpts := mgo.Index{
Key: []string{"Sub"}, Key: []string{"subject"},
Unique: false, Unique: false,
DropDups: false, DropDups: false,
Background: true, Background: true,
Sparse: true, Sparse: true,
} }
db.C("quads").EnsureIndex(indexOpts) db.C("quads").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Pred"} indexOpts.Key = []string{"predicate"}
db.C("quads").EnsureIndex(indexOpts) db.C("quads").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Obj"} indexOpts.Key = []string{"object"}
db.C("quads").EnsureIndex(indexOpts) db.C("quads").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Label"} indexOpts.Key = []string{"label"}
db.C("quads").EnsureIndex(indexOpts) db.C("quads").EnsureIndex(indexOpts)
logOpts := mgo.Index{ logOpts := mgo.Index{
Key: []string{"LogID"}, Key: []string{"LogID"},
@ -91,26 +97,26 @@ func newTripleStore(addr string, options graph.Options) (graph.TripleStore, erro
} }
qs.db = conn.DB(dbName) qs.db = conn.DB(dbName)
qs.session = conn qs.session = conn
qs.hasherSize = sha1.Size
qs.makeHasher = sha1.New
qs.idCache = NewIDLru(1 << 16) qs.idCache = NewIDLru(1 << 16)
return &qs, nil return &qs, nil
} }
func (qs *TripleStore) getIdForTriple(t quad.Quad) string { func (qs *TripleStore) getIdForQuad(t quad.Quad) string {
hasher := qs.makeHasher() id := qs.convertStringToByteHash(t.Subject)
id := qs.convertStringToByteHash(t.Subject, hasher) id += qs.convertStringToByteHash(t.Predicate)
id += qs.convertStringToByteHash(t.Predicate, hasher) id += qs.convertStringToByteHash(t.Object)
id += qs.convertStringToByteHash(t.Object, hasher) id += qs.convertStringToByteHash(t.Label)
id += qs.convertStringToByteHash(t.Label, hasher)
return id return id
} }
func (qs *TripleStore) convertStringToByteHash(s string, hasher hash.Hash) string { func (qs *TripleStore) convertStringToByteHash(s string) string {
hasher.Reset() h := hashPool.Get().(hash.Hash)
key := make([]byte, 0, qs.hasherSize) h.Reset()
hasher.Write([]byte(s)) defer hashPool.Put(h)
key = hasher.Sum(key)
key := make([]byte, 0, hashSize)
h.Write([]byte(s))
key = h.Sum(key)
return hex.EncodeToString(key) return hex.EncodeToString(key)
} }
@ -147,26 +153,20 @@ func (qs *TripleStore) updateNodeBy(node_name string, inc int) error {
return err return err
} }
func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) error { func (qs *TripleStore) updateQuad(q quad.Quad, id int64, proc graph.Procedure) error {
var setname string var setname string
if proc == graph.Add { if proc == graph.Add {
setname = "Added" setname = "Added"
} else if proc == graph.Delete { } else if proc == graph.Delete {
setname = "Deleted" setname = "Deleted"
} }
tripledoc := bson.M{
"Subject": t.Subject,
"Predicate": t.Predicate,
"Object": t.Object,
"Label": t.Label,
}
upsert := bson.M{ upsert := bson.M{
"$setOnInsert": tripledoc, "$setOnInsert": q,
"$push": bson.M{ "$push": bson.M{
setname: id, setname: id,
}, },
} }
_, err := qs.db.C("quads").UpsertId(qs.getIdForTriple(t), upsert) _, err := qs.db.C("quads").UpsertId(qs.getIdForQuad(q), upsert)
if err != nil { if err != nil {
glog.Errorf("Error: %v", err) glog.Errorf("Error: %v", err)
} }
@ -192,7 +192,7 @@ func (qs *TripleStore) checkValid(key string) bool {
return true return true
} }
func (qs *TripleStore) updateLog(d *graph.Delta) error { func (qs *TripleStore) updateLog(d graph.Delta) error {
var action string var action string
if d.Action == graph.Add { if d.Action == graph.Add {
action = "Add" action = "Add"
@ -202,7 +202,7 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error {
entry := MongoLogEntry{ entry := MongoLogEntry{
LogID: d.ID, LogID: d.ID,
Action: action, Action: action,
Key: qs.getIdForTriple(d.Quad), Key: qs.getIdForQuad(d.Quad),
Timestamp: d.Timestamp.UnixNano(), Timestamp: d.Timestamp.UnixNano(),
} }
err := qs.db.C("log").Insert(entry) err := qs.db.C("log").Insert(entry)
@ -212,12 +212,12 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error {
return err return err
} }
func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { func (qs *TripleStore) ApplyDeltas(in []graph.Delta) error {
qs.session.SetSafe(nil) qs.session.SetSafe(nil)
ids := make(map[string]int) ids := make(map[string]int)
// Pre-check the existence condition. // Pre-check the existence condition.
for _, d := range in { for _, d := range in {
key := qs.getIdForTriple(d.Quad) key := qs.getIdForQuad(d.Quad)
switch d.Action { switch d.Action {
case graph.Add: case graph.Add:
if qs.checkValid(key) { if qs.checkValid(key) {
@ -239,7 +239,7 @@ func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error {
} }
} }
for _, d := range in { for _, d := range in {
err := qs.updateTriple(d.Quad, d.ID, d.Action) err := qs.updateQuad(d.Quad, d.ID, d.Action)
if err != nil { if err != nil {
return err return err
} }
@ -267,17 +267,12 @@ func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error {
} }
func (qs *TripleStore) Quad(val graph.Value) quad.Quad { func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
var bsonDoc bson.M var q quad.Quad
err := qs.db.C("quads").FindId(val.(string)).One(&bsonDoc) err := qs.db.C("quads").FindId(val.(string)).One(&q)
if err != nil { if err != nil {
glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err) glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err)
} }
return quad.Quad{ return q
bsonDoc["Subject"].(string),
bsonDoc["Predicate"].(string),
bsonDoc["Object"].(string),
bsonDoc["Label"].(string),
}
} }
func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator {
@ -293,8 +288,7 @@ func (qs *TripleStore) TriplesAllIterator() graph.Iterator {
} }
func (qs *TripleStore) ValueOf(s string) graph.Value { func (qs *TripleStore) ValueOf(s string) graph.Value {
h := qs.makeHasher() return qs.convertStringToByteHash(s)
return qs.convertStringToByteHash(s, h)
} }
func (qs *TripleStore) NameOf(v graph.Value) string { func (qs *TripleStore) NameOf(v graph.Value) string {
@ -352,13 +346,13 @@ func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.V
case quad.Subject: case quad.Subject:
offset = 0 offset = 0
case quad.Predicate: case quad.Predicate:
offset = (qs.hasherSize * 2) offset = (hashSize * 2)
case quad.Object: case quad.Object:
offset = (qs.hasherSize * 2) * 2 offset = (hashSize * 2) * 2
case quad.Label: case quad.Label:
offset = (qs.hasherSize * 2) * 3 offset = (hashSize * 2) * 3
} }
val := in.(string)[offset : qs.hasherSize*2+offset] val := in.(string)[offset : hashSize*2+offset]
return val return val
} }

View file

@ -53,8 +53,10 @@ func (h *Handle) Close() {
h.QuadWriter.Close() h.QuadWriter.Close()
} }
var ErrQuadExists = errors.New("Quad exists") var (
var ErrQuadNotExist = errors.New("Quad doesn't exist") ErrQuadExists = errors.New("Quad exists")
ErrQuadNotExist = errors.New("Quad doesn't exist")
)
type QuadWriter interface { type QuadWriter interface {
// Add a quad to the store. // Add a quad to the store.

View file

@ -44,7 +44,7 @@ type Value interface{}
type TripleStore interface { type TripleStore interface {
// The only way in is through building a transaction, which // The only way in is through building a transaction, which
// is done by a replication strategy. // is done by a replication strategy.
ApplyDeltas([]*Delta) error ApplyDeltas([]Delta) error
// Given an opaque token, returns the triple for that token from the store. // Given an opaque token, returns the triple for that token from the store.
Quad(Value) quad.Quad Quad(Value) quad.Quad

View file

@ -50,8 +50,8 @@ func (s *Single) AcquireNextID() int64 {
} }
func (s *Single) AddQuad(q quad.Quad) error { func (s *Single) AddQuad(q quad.Quad) error {
deltas := make([]*graph.Delta, 1) deltas := make([]graph.Delta, 1)
deltas[0] = &graph.Delta{ deltas[0] = graph.Delta{
ID: s.AcquireNextID(), ID: s.AcquireNextID(),
Quad: q, Quad: q,
Action: graph.Add, Action: graph.Add,
@ -61,9 +61,9 @@ func (s *Single) AddQuad(q quad.Quad) error {
} }
func (s *Single) AddQuadSet(set []quad.Quad) error { func (s *Single) AddQuadSet(set []quad.Quad) error {
deltas := make([]*graph.Delta, len(set)) deltas := make([]graph.Delta, len(set))
for i, q := range set { for i, q := range set {
deltas[i] = &graph.Delta{ deltas[i] = graph.Delta{
ID: s.AcquireNextID(), ID: s.AcquireNextID(),
Quad: q, Quad: q,
Action: graph.Add, Action: graph.Add,
@ -75,8 +75,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
} }
func (s *Single) RemoveQuad(q quad.Quad) error { func (s *Single) RemoveQuad(q quad.Quad) error {
deltas := make([]*graph.Delta, 1) deltas := make([]graph.Delta, 1)
deltas[0] = &graph.Delta{ deltas[0] = graph.Delta{
ID: s.AcquireNextID(), ID: s.AcquireNextID(),
Quad: q, Quad: q,
Action: graph.Delete, Action: graph.Delete,