diff --git a/.travis.yml b/.travis.yml index e3b9987..f4a9eb3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,14 @@ language: go go: - - 1.2 - 1.3 + - 1.3.1 - tip install: - go get github.com/badgerodon/peg - go get github.com/barakmich/glog + - go get github.com/cznic/mathutil - go get github.com/julienschmidt/httprouter - go get github.com/petar/GoLLRB/llrb - go get github.com/peterh/liner diff --git a/graph/iterator/mock_ts_test.go b/graph/iterator/mock_ts_test.go index 4202ffa..4b186ac 100644 --- a/graph/iterator/mock_ts_test.go +++ b/graph/iterator/mock_ts_test.go @@ -36,7 +36,7 @@ func (qs *store) ValueOf(s string) graph.Value { return nil } -func (qs *store) ApplyDeltas([]*graph.Delta) error { return nil } +func (qs *store) ApplyDeltas([]graph.Delta) error { return nil } func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} } diff --git a/graph/leveldb/iterator.go b/graph/leveldb/iterator.go index df0ed05..a23d15d 100644 --- a/graph/leveldb/iterator.go +++ b/graph/leveldb/iterator.go @@ -43,9 +43,9 @@ type Iterator struct { result graph.Value } -func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) graph.Iterator { +func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) *Iterator { vb := value.(Token) - p := make([]byte, 0, 2+qs.hasherSize) + p := make([]byte, 0, 2+hashSize) p = append(p, []byte(prefix)...) p = append(p, []byte(vb[1:])...) @@ -70,7 +70,6 @@ func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleS it.open = false it.iter.Release() glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix) - return &iterator.Null{} } return &it @@ -108,7 +107,7 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) { func (it *Iterator) Clone() graph.Iterator { out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs) - out.Tagger().CopyFrom(it) + out.tags.CopyFrom(it) return out } @@ -180,45 +179,45 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int { case quad.Subject: return 2 case quad.Predicate: - return qs.hasherSize + 2 + return hashSize + 2 case quad.Object: - return 2*qs.hasherSize + 2 + return 2*hashSize + 2 case quad.Label: - return 3*qs.hasherSize + 2 + return 3*hashSize + 2 } } if bytes.Equal(prefix, []byte("po")) { switch d { case quad.Subject: - return 2*qs.hasherSize + 2 + return 2*hashSize + 2 case quad.Predicate: return 2 case quad.Object: - return qs.hasherSize + 2 + return hashSize + 2 case quad.Label: - return 3*qs.hasherSize + 2 + return hashSize + 2 } } if bytes.Equal(prefix, []byte("os")) { switch d { case quad.Subject: - return qs.hasherSize + 2 + return hashSize + 2 case quad.Predicate: - return 2*qs.hasherSize + 2 + return 2*hashSize + 2 case quad.Object: return 2 case quad.Label: - return 3*qs.hasherSize + 2 + return 3*hashSize + 2 } } if bytes.Equal(prefix, []byte("cp")) { switch d { case quad.Subject: - return 2*qs.hasherSize + 2 + return 2*hashSize + 2 case quad.Predicate: - return qs.hasherSize + 2 + return hashSize + 2 case quad.Object: - return 3*qs.hasherSize + 2 + return 3*hashSize + 2 case quad.Label: return 2 } diff --git a/graph/leveldb/triplestore.go b/graph/leveldb/triplestore.go index 06b57bd..dee63f8 100644 --- a/graph/leveldb/triplestore.go +++ b/graph/leveldb/triplestore.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "hash" + "sync" "github.com/barakmich/glog" "github.com/syndtr/goleveldb/leveldb" @@ -43,6 +44,13 @@ const ( DefaultWriteBufferSize = 20 ) +var ( + hashPool = sync.Pool{ + New: func() interface{} { return sha1.New() }, + } + hashSize = sha1.Size +) + type Token []byte func (t Token) Key() interface{} { @@ -50,17 +58,14 @@ func (t Token) Key() interface{} { } type TripleStore struct { - dbOpts *opt.Options - db *leveldb.DB - path string - open bool - size int64 - horizon int64 - hasher hash.Hash - hasherSize int - makeHasher func() hash.Hash - writeopts *opt.WriteOptions - readopts *opt.ReadOptions + dbOpts *opt.Options + db *leveldb.DB + path string + open bool + size int64 + horizon int64 + writeopts *opt.WriteOptions + readopts *opt.ReadOptions } func createNewLevelDB(path string, _ graph.Options) error { @@ -98,8 +103,6 @@ func newTripleStore(path string, options graph.Options) (graph.TripleStore, erro write_buffer_mb = val } qs.dbOpts.WriteBuffer = write_buffer_mb * opt.MiB - qs.hasherSize = sha1.Size - qs.makeHasher = sha1.New qs.writeopts = &opt.WriteOptions{ Sync: false, } @@ -136,7 +139,7 @@ func (qs *TripleStore) Horizon() int64 { return qs.horizon } -func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte { +func (qa *TripleStore) createDeltaKeyFor(d graph.Delta) []byte { key := make([]byte, 0, 19) key = append(key, 'd') key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...) @@ -144,22 +147,20 @@ func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte { } func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { - hasher := qs.makeHasher() - key := make([]byte, 0, 2+(qs.hasherSize*3)) + key := make([]byte, 0, 2+(hashSize*3)) // TODO(kortschak) Remove dependence on String() method. key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[0]), hasher)...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[1]), hasher)...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[2]), hasher)...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[3]), hasher)...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) return key } func (qs *TripleStore) createValueKeyFor(s string) []byte { - hasher := qs.makeHasher() - key := make([]byte, 0, 1+qs.hasherSize) + key := make([]byte, 0, 1+hashSize) key = append(key, []byte("z")...) - key = append(key, qs.convertStringToByteHash(s, hasher)...) + key = append(key, qs.convertStringToByteHash(s)...) return key } @@ -176,7 +177,7 @@ var ( cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} ) -func (qs *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { +func (qs *TripleStore) ApplyDeltas(deltas []graph.Delta) error { batch := &leveldb.Batch{} resizeMap := make(map[string]int64) size_change := int64(0) @@ -346,11 +347,13 @@ func (qs *TripleStore) Quad(k graph.Value) quad.Quad { return triple } -func (qs *TripleStore) convertStringToByteHash(s string, hasher hash.Hash) []byte { - hasher.Reset() - key := make([]byte, 0, qs.hasherSize) - hasher.Write([]byte(s)) - key = hasher.Sum(key) +func (qs *TripleStore) convertStringToByteHash(s string) []byte { + h := hashPool.Get().(hash.Hash) + h.Reset() + defer hashPool.Put(h) + key := make([]byte, 0, hashSize) + h.Write([]byte(s)) + key = h.Sum(key) return key } @@ -467,7 +470,7 @@ func (qs *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph. v := val.(Token) offset := PositionOf(v[0:2], d, qs) if offset != -1 { - return Token(append([]byte("z"), v[offset:offset+qs.hasherSize]...)) + return Token(append([]byte("z"), v[offset:offset+hashSize]...)) } else { return Token(qs.Quad(val).Get(d)) } diff --git a/graph/memstore/all_iterator.go b/graph/memstore/all_iterator.go index eb01f78..6717a13 100644 --- a/graph/memstore/all_iterator.go +++ b/graph/memstore/all_iterator.go @@ -35,17 +35,17 @@ func NewMemstoreNodesAllIterator(ts *TripleStore) *NodesAllIterator { } // No subiterators. -func (nit *NodesAllIterator) SubIterators() []graph.Iterator { +func (it *NodesAllIterator) SubIterators() []graph.Iterator { return nil } -func (nit *NodesAllIterator) Next() bool { - if !nit.Int64.Next() { +func (it *NodesAllIterator) Next() bool { + if !it.Int64.Next() { return false } - _, ok := nit.ts.revIdMap[nit.Int64.Result().(int64)] + _, ok := it.ts.revIdMap[it.Int64.Result().(int64)] if !ok { - return nit.Next() + return it.Next() } return true } diff --git a/graph/memstore/b/keys.go b/graph/memstore/b/keys.go new file mode 100644 index 0000000..4593298 --- /dev/null +++ b/graph/memstore/b/keys.go @@ -0,0 +1,972 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package b implements a B+tree. +// +// Changelog +// +// 2014-06-26: Lower GC presure by recycling things. +// +// 2014-04-18: Added new method Put. +// +// Generic types +// +// Keys and their associated values are interface{} typed, similar to all of +// the containers in the standard library. +// +// Semiautomatic production of a type specific variant of this package is +// supported via +// +// $ make generic +// +// This command will write to stdout a version of the btree.go file where +// every key type occurrence is replaced by the word 'key' (written in all +// CAPS) and every value type occurrence is replaced by the word 'value' +// (written in all CAPS). Then you have to replace these tokens with your +// desired type(s), using any technique you're comfortable with. +// +// This is how, for example, 'example/int.go' was created: +// +// $ mkdir example +// $ +// $ # Note: the command bellow must be actually written using the words +// $ # 'key' and 'value' in all CAPS. The proper form is avoided in this +// $ # documentation to not confuse any text replacement mechanism. +// $ +// $ make generic | sed -e 's/key/int/g' -e 's/value/int/g' > example/int.go +// +// No other changes to int.go are necessary, it compiles just fine. +// +// Running the benchmarks for 1000 keys on a machine with Intel i5-4670 CPU @ +// 3.4GHz, Go release 1.3. +// +// $ go test -bench 1e3 example/all_test.go example/int.go +// PASS +// BenchmarkSetSeq1e3 10000 146740 ns/op +// BenchmarkGetSeq1e3 10000 108261 ns/op +// BenchmarkSetRnd1e3 10000 254359 ns/op +// BenchmarkGetRnd1e3 10000 134621 ns/op +// BenchmarkDelRnd1e3 10000 211864 ns/op +// BenchmarkSeekSeq1e3 10000 148628 ns/op +// BenchmarkSeekRnd1e3 10000 215166 ns/op +// BenchmarkNext1e3 200000 9211 ns/op +// BenchmarkPrev1e3 200000 8843 ns/op +// ok command-line-arguments 25.071s +// $ +package b + +import ( + "fmt" + "io" + "sync" +) + +const ( + kx = 32 //TODO benchmark tune this number if using custom key/value type(s). + kd = 32 //TODO benchmark tune this number if using custom key/value type(s). +) + +func init() { + if kd < 1 { + panic(fmt.Errorf("kd %d: out of range", kd)) + } + + if kx < 2 { + panic(fmt.Errorf("kx %d: out of range", kx)) + } +} + +var ( + btDPool = sync.Pool{New: func() interface{} { return &d{} }} + btEPool = btEpool{sync.Pool{New: func() interface{} { return &Enumerator{} }}} + btTPool = btTpool{sync.Pool{New: func() interface{} { return &Tree{} }}} + btXPool = sync.Pool{New: func() interface{} { return &x{} }} +) + +type btTpool struct{ sync.Pool } + +func (p *btTpool) get(cmp Cmp) *Tree { + x := p.Get().(*Tree) + x.cmp = cmp + return x +} + +type btEpool struct{ sync.Pool } + +func (p *btEpool) get(err error, hit bool, i int, k int64, q *d, t *Tree, ver int64) *Enumerator { + x := p.Get().(*Enumerator) + x.err, x.hit, x.i, x.k, x.q, x.t, x.ver = err, hit, i, k, q, t, ver + return x +} + +type ( + // Cmp compares a and b. Return value is: + // + // < 0 if a < b + // 0 if a == b + // > 0 if a > b + // + Cmp func(a, b int64) int + + d struct { // data page + c int + d [2*kd + 1]de + n *d + p *d + } + + de struct { // d element + k int64 + v struct{} + } + + // Enumerator captures the state of enumerating a tree. It is returned + // from the Seek* methods. The enumerator is aware of any mutations + // made to the tree in the process of enumerating it and automatically + // resumes the enumeration at the proper key, if possible. + // + // However, once an Enumerator returns io.EOF to signal "no more + // items", it does no more attempt to "resync" on tree mutation(s). In + // other words, io.EOF from an Enumaretor is "sticky" (idempotent). + Enumerator struct { + err error + hit bool + i int + k int64 + q *d + t *Tree + ver int64 + } + + // Tree is a B+tree. + Tree struct { + c int + cmp Cmp + first *d + last *d + r interface{} + ver int64 + } + + xe struct { // x element + ch interface{} + k int64 + } + + x struct { // index page + c int + x [2*kx + 2]xe + } +) + +var ( // R/O zero values + zd d + zde de + ze Enumerator + zk int64 + zt Tree + zx x + zxe xe +) + +func clr(q interface{}) { + switch x := q.(type) { + case *x: + for i := 0; i <= x.c; i++ { // Ch0 Sep0 ... Chn-1 Sepn-1 Chn + clr(x.x[i].ch) + } + *x = zx + btXPool.Put(x) + case *d: + *x = zd + btDPool.Put(x) + } +} + +// -------------------------------------------------------------------------- x + +func newX(ch0 interface{}) *x { + r := btXPool.Get().(*x) + r.x[0].ch = ch0 + return r +} + +func (q *x) extract(i int) { + q.c-- + if i < q.c { + copy(q.x[i:], q.x[i+1:q.c+1]) + q.x[q.c].ch = q.x[q.c+1].ch + q.x[q.c].k = zk // GC + q.x[q.c+1] = zxe // GC + } +} + +func (q *x) insert(i int, k int64, ch interface{}) *x { + c := q.c + if i < c { + q.x[c+1].ch = q.x[c].ch + copy(q.x[i+2:], q.x[i+1:c]) + q.x[i+1].k = q.x[i].k + } + c++ + q.c = c + q.x[i].k = k + q.x[i+1].ch = ch + return q +} + +func (q *x) siblings(i int) (l, r *d) { + if i >= 0 { + if i > 0 { + l = q.x[i-1].ch.(*d) + } + if i < q.c { + r = q.x[i+1].ch.(*d) + } + } + return +} + +// -------------------------------------------------------------------------- d + +func (l *d) mvL(r *d, c int) { + copy(l.d[l.c:], r.d[:c]) + copy(r.d[:], r.d[c:r.c]) + l.c += c + r.c -= c +} + +func (l *d) mvR(r *d, c int) { + copy(r.d[c:], r.d[:r.c]) + copy(r.d[:c], l.d[l.c-c:]) + r.c += c + l.c -= c +} + +// ----------------------------------------------------------------------- Tree + +// TreeNew returns a newly created, empty Tree. The compare function is used +// for key collation. +func TreeNew(cmp Cmp) *Tree { + return btTPool.get(cmp) +} + +// Clear removes all K/V pairs from the tree. +func (t *Tree) Clear() { + if t.r == nil { + return + } + + clr(t.r) + t.c, t.first, t.last, t.r = 0, nil, nil, nil + t.ver++ +} + +// Close performs Clear and recycles t to a pool for possible later reuse. No +// references to t should exist or such references must not be used afterwards. +func (t *Tree) Close() { + t.Clear() + *t = zt + btTPool.Put(t) +} + +func (t *Tree) cat(p *x, q, r *d, pi int) { + t.ver++ + q.mvL(r, r.c) + if r.n != nil { + r.n.p = q + } else { + t.last = q + } + q.n = r.n + *r = zd + btDPool.Put(r) + if p.c > 1 { + p.extract(pi) + p.x[pi].ch = q + } else { + switch x := t.r.(type) { + case *x: + *x = zx + btXPool.Put(x) + case *d: + *x = zd + btDPool.Put(x) + } + t.r = q + } +} + +func (t *Tree) catX(p, q, r *x, pi int) { + t.ver++ + q.x[q.c].k = p.x[pi].k + copy(q.x[q.c+1:], r.x[:r.c]) + q.c += r.c + 1 + q.x[q.c].ch = r.x[r.c].ch + *r = zx + btXPool.Put(r) + if p.c > 1 { + p.c-- + pc := p.c + if pi < pc { + p.x[pi].k = p.x[pi+1].k + copy(p.x[pi+1:], p.x[pi+2:pc+1]) + p.x[pc].ch = p.x[pc+1].ch + p.x[pc].k = zk // GC + p.x[pc+1].ch = nil // GC + } + return + } + + switch x := t.r.(type) { + case *x: + *x = zx + btXPool.Put(x) + case *d: + *x = zd + btDPool.Put(x) + } + t.r = q +} + +// Delete removes the k's KV pair, if it exists, in which case Delete returns +// true. +func (t *Tree) Delete(k int64) (ok bool) { + pi := -1 + var p *x + q := t.r + if q == nil { + return false + } + + for { + var i int + i, ok = t.find(q, k) + if ok { + switch x := q.(type) { + case *x: + if x.c < kx && q != t.r { + x, i = t.underflowX(p, x, pi, i) + } + pi = i + 1 + p = x + q = x.x[pi].ch + ok = false + continue + case *d: + t.extract(x, i) + if x.c >= kd { + return true + } + + if q != t.r { + t.underflow(p, x, pi) + } else if t.c == 0 { + t.Clear() + } + return true + } + } + + switch x := q.(type) { + case *x: + if x.c < kx && q != t.r { + x, i = t.underflowX(p, x, pi, i) + } + pi = i + p = x + q = x.x[i].ch + case *d: + return false + } + } +} + +func (t *Tree) extract(q *d, i int) { // (r struct{}) { + t.ver++ + //r = q.d[i].v // prepared for Extract + q.c-- + if i < q.c { + copy(q.d[i:], q.d[i+1:q.c+1]) + } + q.d[q.c] = zde // GC + t.c-- + return +} + +func (t *Tree) find(q interface{}, k int64) (i int, ok bool) { + var mk int64 + l := 0 + switch x := q.(type) { + case *x: + h := x.c - 1 + for l <= h { + m := (l + h) >> 1 + mk = x.x[m].k + switch cmp := t.cmp(k, mk); { + case cmp > 0: + l = m + 1 + case cmp == 0: + return m, true + default: + h = m - 1 + } + } + case *d: + h := x.c - 1 + for l <= h { + m := (l + h) >> 1 + mk = x.d[m].k + switch cmp := t.cmp(k, mk); { + case cmp > 0: + l = m + 1 + case cmp == 0: + return m, true + default: + h = m - 1 + } + } + } + return l, false +} + +// First returns the first item of the tree in the key collating order, or +// (zero-value, zero-value) if the tree is empty. +func (t *Tree) First() (k int64, v struct{}) { + if q := t.first; q != nil { + q := &q.d[0] + k, v = q.k, q.v + } + return +} + +// Get returns the value associated with k and true if it exists. Otherwise Get +// returns (zero-value, false). +func (t *Tree) Get(k int64) (v struct{}, ok bool) { + q := t.r + if q == nil { + return + } + + for { + var i int + if i, ok = t.find(q, k); ok { + switch x := q.(type) { + case *x: + q = x.x[i+1].ch + continue + case *d: + return x.d[i].v, true + } + } + switch x := q.(type) { + case *x: + q = x.x[i].ch + default: + return + } + } +} + +func (t *Tree) insert(q *d, i int, k int64, v struct{}) *d { + t.ver++ + c := q.c + if i < c { + copy(q.d[i+1:], q.d[i:c]) + } + c++ + q.c = c + q.d[i].k, q.d[i].v = k, v + t.c++ + return q +} + +// Last returns the last item of the tree in the key collating order, or +// (zero-value, zero-value) if the tree is empty. +func (t *Tree) Last() (k int64, v struct{}) { + if q := t.last; q != nil { + q := &q.d[q.c-1] + k, v = q.k, q.v + } + return +} + +// Len returns the number of items in the tree. +func (t *Tree) Len() int { + return t.c +} + +func (t *Tree) overflow(p *x, q *d, pi, i int, k int64, v struct{}) { + t.ver++ + l, r := p.siblings(pi) + + if l != nil && l.c < 2*kd { + l.mvL(q, 1) + t.insert(q, i-1, k, v) + p.x[pi-1].k = q.d[0].k + return + } + + if r != nil && r.c < 2*kd { + if i < 2*kd { + q.mvR(r, 1) + t.insert(q, i, k, v) + p.x[pi].k = r.d[0].k + } else { + t.insert(r, 0, k, v) + p.x[pi].k = k + } + return + } + + t.split(p, q, pi, i, k, v) +} + +// Seek returns an Enumerator positioned on a an item such that k >= item's +// key. ok reports if k == item.key The Enumerator's position is possibly +// after the last item in the tree. +func (t *Tree) Seek(k int64) (e *Enumerator, ok bool) { + q := t.r + if q == nil { + e = btEPool.get(nil, false, 0, k, nil, t, t.ver) + return + } + + for { + var i int + if i, ok = t.find(q, k); ok { + switch x := q.(type) { + case *x: + q = x.x[i+1].ch + continue + case *d: + return btEPool.get(nil, ok, i, k, x, t, t.ver), true + } + } + + switch x := q.(type) { + case *x: + q = x.x[i].ch + case *d: + return btEPool.get(nil, ok, i, k, x, t, t.ver), false + } + } +} + +// SeekFirst returns an enumerator positioned on the first KV pair in the tree, +// if any. For an empty tree, err == io.EOF is returned and e will be nil. +func (t *Tree) SeekFirst() (e *Enumerator, err error) { + q := t.first + if q == nil { + return nil, io.EOF + } + + return btEPool.get(nil, true, 0, q.d[0].k, q, t, t.ver), nil +} + +// SeekLast returns an enumerator positioned on the last KV pair in the tree, +// if any. For an empty tree, err == io.EOF is returned and e will be nil. +func (t *Tree) SeekLast() (e *Enumerator, err error) { + q := t.last + if q == nil { + return nil, io.EOF + } + + return btEPool.get(nil, true, q.c-1, q.d[q.c-1].k, q, t, t.ver), nil +} + +// Set sets the value associated with k. +func (t *Tree) Set(k int64, v struct{}) { + //dbg("--- PRE Set(%v, %v)\n%s", k, v, t.dump()) + //defer func() { + // dbg("--- POST\n%s\n====\n", t.dump()) + //}() + + pi := -1 + var p *x + q := t.r + if q == nil { + z := t.insert(btDPool.Get().(*d), 0, k, v) + t.r, t.first, t.last = z, z, z + return + } + + for { + i, ok := t.find(q, k) + if ok { + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + 1 + p = x + q = x.x[i+1].ch + continue + case *d: + x.d[i].v = v + } + return + } + + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + p = x + q = x.x[i].ch + case *d: + switch { + case x.c < 2*kd: + t.insert(x, i, k, v) + default: + t.overflow(p, x, pi, i, k, v) + } + return + } + } +} + +// Put combines Get and Set in a more efficient way where the tree is walked +// only once. The upd(ater) receives (old-value, true) if a KV pair for k +// exists or (zero-value, false) otherwise. It can then return a (new-value, +// true) to create or overwrite the existing value in the KV pair, or +// (whatever, false) if it decides not to create or not to update the value of +// the KV pair. +// +// tree.Set(k, v) call conceptually equals calling +// +// tree.Put(k, func(int64, bool){ return v, true }) +// +// modulo the differing return values. +func (t *Tree) Put(k int64, upd func(oldV struct{}, exists bool) (newV struct{}, write bool)) (oldV struct{}, written bool) { + pi := -1 + var p *x + q := t.r + var newV struct{} + if q == nil { + // new KV pair in empty tree + newV, written = upd(newV, false) + if !written { + return + } + + z := t.insert(btDPool.Get().(*d), 0, k, newV) + t.r, t.first, t.last = z, z, z + return + } + + for { + i, ok := t.find(q, k) + if ok { + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + 1 + p = x + q = x.x[i+1].ch + continue + case *d: + oldV = x.d[i].v + newV, written = upd(oldV, true) + if !written { + return + } + + x.d[i].v = newV + } + return + } + + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + p = x + q = x.x[i].ch + case *d: // new KV pair + newV, written = upd(newV, false) + if !written { + return + } + + switch { + case x.c < 2*kd: + t.insert(x, i, k, newV) + default: + t.overflow(p, x, pi, i, k, newV) + } + return + } + } +} + +func (t *Tree) split(p *x, q *d, pi, i int, k int64, v struct{}) { + t.ver++ + r := btDPool.Get().(*d) + if q.n != nil { + r.n = q.n + r.n.p = r + } else { + t.last = r + } + q.n = r + r.p = q + + copy(r.d[:], q.d[kd:2*kd]) + for i := range q.d[kd:] { + q.d[kd+i] = zde + } + q.c = kd + r.c = kd + var done bool + if i > kd { + done = true + t.insert(r, i-kd, k, v) + } + if pi >= 0 { + p.insert(pi, r.d[0].k, r) + } else { + t.r = newX(q).insert(0, r.d[0].k, r) + } + if done { + return + } + + t.insert(q, i, k, v) +} + +func (t *Tree) splitX(p *x, q *x, pi int, i int) (*x, int) { + t.ver++ + r := btXPool.Get().(*x) + copy(r.x[:], q.x[kx+1:]) + q.c = kx + r.c = kx + if pi >= 0 { + p.insert(pi, q.x[kx].k, r) + q.x[kx].k = zk + for i := range q.x[kx+1:] { + q.x[kx+i+1] = zxe + } + + switch { + case i < kx: + return q, i + case i == kx: + return p, pi + default: // i > kx + return r, i - kx - 1 + } + } + + nr := newX(q).insert(0, q.x[kx].k, r) + t.r = nr + q.x[kx].k = zk + for i := range q.x[kx+1:] { + q.x[kx+i+1] = zxe + } + + switch { + case i < kx: + return q, i + case i == kx: + return nr, 0 + default: // i > kx + return r, i - kx - 1 + } +} + +func (t *Tree) underflow(p *x, q *d, pi int) { + t.ver++ + l, r := p.siblings(pi) + + if l != nil && l.c+q.c >= 2*kd { + l.mvR(q, 1) + p.x[pi-1].k = q.d[0].k + } else if r != nil && q.c+r.c >= 2*kd { + q.mvL(r, 1) + p.x[pi].k = r.d[0].k + r.d[r.c] = zde // GC + } else if l != nil { + t.cat(p, l, q, pi-1) + } else { + t.cat(p, q, r, pi) + } +} + +func (t *Tree) underflowX(p *x, q *x, pi int, i int) (*x, int) { + t.ver++ + var l, r *x + + if pi >= 0 { + if pi > 0 { + l = p.x[pi-1].ch.(*x) + } + if pi < p.c { + r = p.x[pi+1].ch.(*x) + } + } + + if l != nil && l.c > kx { + q.x[q.c+1].ch = q.x[q.c].ch + copy(q.x[1:], q.x[:q.c]) + q.x[0].ch = l.x[l.c].ch + q.x[0].k = p.x[pi-1].k + q.c++ + i++ + l.c-- + p.x[pi-1].k = l.x[l.c].k + return q, i + } + + if r != nil && r.c > kx { + q.x[q.c].k = p.x[pi].k + q.c++ + q.x[q.c].ch = r.x[0].ch + p.x[pi].k = r.x[0].k + copy(r.x[:], r.x[1:r.c]) + r.c-- + rc := r.c + r.x[rc].ch = r.x[rc+1].ch + r.x[rc].k = zk + r.x[rc+1].ch = nil + return q, i + } + + if l != nil { + i += l.c + 1 + t.catX(p, l, q, pi-1) + q = l + return q, i + } + + t.catX(p, q, r, pi) + return q, i +} + +// ----------------------------------------------------------------- Enumerator + +// Close recycles e to a pool for possible later reuse. No references to e +// should exist or such references must not be used afterwards. +func (e *Enumerator) Close() { + *e = ze + btEPool.Put(e) +} + +// Next returns the currently enumerated item, if it exists and moves to the +// next item in the key collation order. If there is no item to return, err == +// io.EOF is returned. +func (e *Enumerator) Next() (k int64, v struct{}, err error) { + if err = e.err; err != nil { + return + } + + if e.ver != e.t.ver { + f, hit := e.t.Seek(e.k) + if !e.hit && hit { + if err = f.next(); err != nil { + return + } + } + + *e = *f + f.Close() + } + if e.q == nil { + e.err, err = io.EOF, io.EOF + return + } + + if e.i >= e.q.c { + if err = e.next(); err != nil { + return + } + } + + i := e.q.d[e.i] + k, v = i.k, i.v + e.k, e.hit = k, false + e.next() + return +} + +func (e *Enumerator) next() error { + if e.q == nil { + e.err = io.EOF + return io.EOF + } + + switch { + case e.i < e.q.c-1: + e.i++ + default: + if e.q, e.i = e.q.n, 0; e.q == nil { + e.err = io.EOF + } + } + return e.err +} + +// Prev returns the currently enumerated item, if it exists and moves to the +// previous item in the key collation order. If there is no item to return, err +// == io.EOF is returned. +func (e *Enumerator) Prev() (k int64, v struct{}, err error) { + if err = e.err; err != nil { + return + } + + if e.ver != e.t.ver { + f, hit := e.t.Seek(e.k) + if !e.hit && hit { + if err = f.prev(); err != nil { + return + } + } + + *e = *f + f.Close() + } + if e.q == nil { + e.err, err = io.EOF, io.EOF + return + } + + if e.i >= e.q.c { + if err = e.next(); err != nil { + return + } + } + + i := e.q.d[e.i] + k, v = i.k, i.v + e.k, e.hit = k, false + e.prev() + return +} + +func (e *Enumerator) prev() error { + if e.q == nil { + e.err = io.EOF + return io.EOF + } + + switch { + case e.i > 0: + e.i-- + default: + if e.q = e.q.p; e.q == nil { + e.err = io.EOF + break + } + + e.i = e.q.c - 1 + } + return e.err +} diff --git a/graph/memstore/b/keys_test.go b/graph/memstore/b/keys_test.go new file mode 100644 index 0000000..0425531 --- /dev/null +++ b/graph/memstore/b/keys_test.go @@ -0,0 +1,396 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package b + +import ( + "math" + "runtime/debug" + "testing" + + "github.com/cznic/mathutil" +) + +func rng() *mathutil.FC32 { + x, err := mathutil.NewFC32(math.MinInt32/4, math.MaxInt32/4, false) + if err != nil { + panic(err) + } + + return x +} + +func cmp(a, b int64) int { + return int(a - b) +} + +func BenchmarkSetSeq1e3(b *testing.B) { + benchmarkSetSeq(b, 1e3) +} + +func BenchmarkSetSeq1e4(b *testing.B) { + benchmarkSetSeq(b, 1e4) +} + +func BenchmarkSetSeq1e5(b *testing.B) { + benchmarkSetSeq(b, 1e5) +} + +func BenchmarkSetSeq1e6(b *testing.B) { + benchmarkSetSeq(b, 1e6) +} + +func benchmarkSetSeq(b *testing.B, n int) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + r := TreeNew(cmp) + debug.FreeOSMemory() + b.StartTimer() + for j := int64(0); j < int64(n); j++ { + r.Set(j, struct{}{}) + } + b.StopTimer() + r.Close() + } + b.StopTimer() +} + +func BenchmarkGetSeq1e3(b *testing.B) { + benchmarkGetSeq(b, 1e3) +} + +func BenchmarkGetSeq1e4(b *testing.B) { + benchmarkGetSeq(b, 1e4) +} + +func BenchmarkGetSeq1e5(b *testing.B) { + benchmarkGetSeq(b, 1e5) +} + +func BenchmarkGetSeq1e6(b *testing.B) { + benchmarkGetSeq(b, 1e6) +} + +func benchmarkGetSeq(b *testing.B, n int) { + r := TreeNew(cmp) + for i := int64(0); i < int64(n); i++ { + r.Set(i, struct{}{}) + } + debug.FreeOSMemory() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := int64(0); j < int64(n); j++ { + r.Get(j) + } + } + b.StopTimer() + r.Close() +} + +func BenchmarkSetRnd1e3(b *testing.B) { + benchmarkSetRnd(b, 1e3) +} + +func BenchmarkSetRnd1e4(b *testing.B) { + benchmarkSetRnd(b, 1e4) +} + +func BenchmarkSetRnd1e5(b *testing.B) { + benchmarkSetRnd(b, 1e5) +} + +func BenchmarkSetRnd1e6(b *testing.B) { + benchmarkSetRnd(b, 1e6) +} + +func benchmarkSetRnd(b *testing.B, n int) { + rng := rng() + a := make([]int, n) + for i := range a { + a[i] = rng.Next() + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + r := TreeNew(cmp) + debug.FreeOSMemory() + b.StartTimer() + for _, v := range a { + r.Set(int64(v), struct{}{}) + } + b.StopTimer() + r.Close() + } + b.StopTimer() +} + +func BenchmarkGetRnd1e3(b *testing.B) { + benchmarkGetRnd(b, 1e3) +} + +func BenchmarkGetRnd1e4(b *testing.B) { + benchmarkGetRnd(b, 1e4) +} + +func BenchmarkGetRnd1e5(b *testing.B) { + benchmarkGetRnd(b, 1e5) +} + +func BenchmarkGetRnd1e6(b *testing.B) { + benchmarkGetRnd(b, 1e6) +} + +func benchmarkGetRnd(b *testing.B, n int) { + r := TreeNew(cmp) + rng := rng() + a := make([]int64, n) + for i := range a { + a[i] = int64(rng.Next()) + } + for _, v := range a { + r.Set(v, struct{}{}) + } + debug.FreeOSMemory() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, v := range a { + r.Get(v) + } + } + b.StopTimer() + r.Close() +} + +func BenchmarkDelSeq1e3(b *testing.B) { + benchmarkDelSeq(b, 1e3) +} + +func BenchmarkDelSeq1e4(b *testing.B) { + benchmarkDelSeq(b, 1e4) +} + +func BenchmarkDelSeq1e5(b *testing.B) { + benchmarkDelSeq(b, 1e5) +} + +func BenchmarkDelSeq1e6(b *testing.B) { + benchmarkDelSeq(b, 1e6) +} + +func benchmarkDelSeq(b *testing.B, n int) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + r := TreeNew(cmp) + for j := int64(0); j < int64(n); j++ { + r.Set(j, struct{}{}) + } + debug.FreeOSMemory() + b.StartTimer() + for j := int64(0); j < int64(n); j++ { + r.Delete(j) + } + } + b.StopTimer() +} + +func BenchmarkDelRnd1e3(b *testing.B) { + benchmarkDelRnd(b, 1e3) +} + +func BenchmarkDelRnd1e4(b *testing.B) { + benchmarkDelRnd(b, 1e4) +} + +func BenchmarkDelRnd1e5(b *testing.B) { + benchmarkDelRnd(b, 1e5) +} + +func BenchmarkDelRnd1e6(b *testing.B) { + benchmarkDelRnd(b, 1e6) +} + +func benchmarkDelRnd(b *testing.B, n int) { + rng := rng() + a := make([]int64, n) + for i := range a { + a[i] = int64(rng.Next()) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + r := TreeNew(cmp) + for _, v := range a { + r.Set(v, struct{}{}) + } + debug.FreeOSMemory() + b.StartTimer() + for _, v := range a { + r.Delete(v) + } + b.StopTimer() + r.Close() + } + b.StopTimer() +} + +func BenchmarkSeekSeq1e3(b *testing.B) { + benchmarkSeekSeq(b, 1e3) +} + +func BenchmarkSeekSeq1e4(b *testing.B) { + benchmarkSeekSeq(b, 1e4) +} + +func BenchmarkSeekSeq1e5(b *testing.B) { + benchmarkSeekSeq(b, 1e5) +} + +func BenchmarkSeekSeq1e6(b *testing.B) { + benchmarkSeekSeq(b, 1e6) +} + +func benchmarkSeekSeq(b *testing.B, n int) { + for i := 0; i < b.N; i++ { + b.StopTimer() + t := TreeNew(cmp) + for j := int64(0); j < int64(n); j++ { + t.Set(j, struct{}{}) + } + debug.FreeOSMemory() + b.StartTimer() + for j := int64(0); j < int64(n); j++ { + e, _ := t.Seek(j) + e.Close() + } + b.StopTimer() + t.Close() + } + b.StopTimer() +} + +func BenchmarkSeekRnd1e3(b *testing.B) { + benchmarkSeekRnd(b, 1e3) +} + +func BenchmarkSeekRnd1e4(b *testing.B) { + benchmarkSeekRnd(b, 1e4) +} + +func BenchmarkSeekRnd1e5(b *testing.B) { + benchmarkSeekRnd(b, 1e5) +} + +func BenchmarkSeekRnd1e6(b *testing.B) { + benchmarkSeekRnd(b, 1e6) +} + +func benchmarkSeekRnd(b *testing.B, n int) { + r := TreeNew(cmp) + rng := rng() + a := make([]int64, n) + for i := range a { + a[i] = int64(rng.Next()) + } + for _, v := range a { + r.Set(v, struct{}{}) + } + debug.FreeOSMemory() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, v := range a { + e, _ := r.Seek(v) + e.Close() + } + } + b.StopTimer() + r.Close() +} + +func BenchmarkNext1e3(b *testing.B) { + benchmarkNext(b, 1e3) +} + +func BenchmarkNext1e4(b *testing.B) { + benchmarkNext(b, 1e4) +} + +func BenchmarkNext1e5(b *testing.B) { + benchmarkNext(b, 1e5) +} + +func BenchmarkNext1e6(b *testing.B) { + benchmarkNext(b, 1e6) +} + +func benchmarkNext(b *testing.B, n int) { + t := TreeNew(cmp) + for i := int64(0); i < int64(n); i++ { + t.Set(i, struct{}{}) + } + debug.FreeOSMemory() + b.ResetTimer() + for i := 0; i < b.N; i++ { + en, err := t.SeekFirst() + if err != nil { + b.Fatal(err) + } + + m := 0 + for { + if _, _, err = en.Next(); err != nil { + break + } + m++ + } + if m != n { + b.Fatal(m) + } + } + b.StopTimer() + t.Close() +} + +func BenchmarkPrev1e3(b *testing.B) { + benchmarkPrev(b, 1e3) +} + +func BenchmarkPrev1e4(b *testing.B) { + benchmarkPrev(b, 1e4) +} + +func BenchmarkPrev1e5(b *testing.B) { + benchmarkPrev(b, 1e5) +} + +func BenchmarkPrev1e6(b *testing.B) { + benchmarkPrev(b, 1e6) +} + +func benchmarkPrev(b *testing.B, n int) { + t := TreeNew(cmp) + for i := int64(0); i < int64(n); i++ { + t.Set(i, struct{}{}) + } + debug.FreeOSMemory() + b.ResetTimer() + for i := 0; i < b.N; i++ { + en, err := t.SeekLast() + if err != nil { + b.Fatal(err) + } + + m := 0 + for { + if _, _, err = en.Prev(); err != nil { + break + } + m++ + } + if m != n { + b.Fatal(m) + } + } +} diff --git a/graph/memstore/iterator.go b/graph/memstore/iterator.go index 277f86a..00e6bdc 100644 --- a/graph/memstore/iterator.go +++ b/graph/memstore/iterator.go @@ -19,49 +19,36 @@ import ( "math" "strings" - "github.com/petar/GoLLRB/llrb" - "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/graph/memstore/b" ) type Iterator struct { - uid uint64 - ts *TripleStore - tags graph.Tagger - tree *llrb.LLRB - data string - isRunning bool - iterLast Int64 - result graph.Value + uid uint64 + ts *TripleStore + tags graph.Tagger + tree *b.Tree + iter *b.Enumerator + data string + result graph.Value } -type Int64 int64 - -func (i Int64) Less(than llrb.Item) bool { - return i < than.(Int64) +func cmp(a, b int64) int { + return int(a - b) } -func IterateOne(tree *llrb.LLRB, last Int64) Int64 { - var next Int64 - tree.AscendGreaterOrEqual(last, func(i llrb.Item) bool { - if i.(Int64) == last { - return true - } else { - next = i.(Int64) - return false - } - }) - return next -} - -func NewLlrbIterator(tree *llrb.LLRB, data string, ts *TripleStore) *Iterator { +func NewIterator(tree *b.Tree, data string, ts *TripleStore) *Iterator { + iter, err := tree.SeekFirst() + if err != nil { + iter = nil + } return &Iterator{ - uid: iterator.NextUID(), - ts: ts, - tree: tree, - iterLast: Int64(-1), - data: data, + uid: iterator.NextUID(), + ts: ts, + tree: tree, + iter: iter, + data: data, } } @@ -70,7 +57,11 @@ func (it *Iterator) UID() uint64 { } func (it *Iterator) Reset() { - it.iterLast = Int64(-1) + var err error + it.iter, err = it.tree.SeekFirst() + if err != nil { + it.iter = nil + } } func (it *Iterator) Tagger() *graph.Tagger { @@ -88,8 +79,30 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) { } func (it *Iterator) Clone() graph.Iterator { - m := NewLlrbIterator(it.tree, it.data, it.ts) + var iter *b.Enumerator + if it.result != nil { + var ok bool + iter, ok = it.tree.Seek(it.result.(int64)) + if !ok { + panic("value unexpectedly missing") + } + } else { + var err error + iter, err = it.tree.SeekFirst() + if err != nil { + iter = nil + } + } + + m := &Iterator{ + uid: iterator.NextUID(), + ts: it.ts, + tree: it.tree, + iter: iter, + data: it.data, + } m.tags.CopyFrom(it) + return m } @@ -101,14 +114,18 @@ func (it *Iterator) checkValid(index int64) bool { func (it *Iterator) Next() bool { graph.NextLogIn(it) - if it.tree.Max() == nil || it.iterLast == it.tree.Max().(Int64) { + + if it.iter == nil { return graph.NextLogOut(it, nil, false) } - it.iterLast = IterateOne(it.tree, it.iterLast) - if !it.checkValid(int64(it.iterLast)) { + result, _, err := it.iter.Next() + if err != nil { + return graph.NextLogOut(it, nil, false) + } + if !it.checkValid(result) { return it.Next() } - it.result = int64(it.iterLast) + it.result = result return graph.NextLogOut(it, it.result, true) } @@ -135,7 +152,7 @@ func (it *Iterator) Size() (int64, bool) { func (it *Iterator) Contains(v graph.Value) bool { graph.ContainsLogIn(it, v) - if it.tree.Has(Int64(v.(int64))) && it.checkValid(v.(int64)) { + if _, ok := it.tree.Get(v.(int64)); ok { it.result = v return graph.ContainsLogOut(it, v, true) } @@ -150,7 +167,7 @@ func (it *Iterator) DebugString(indent int) string { var memType graph.Type func init() { - memType = graph.RegisterIterator("llrb") + memType = graph.RegisterIterator("b+tree") } func Type() graph.Type { return memType } diff --git a/graph/memstore/triplestore.go b/graph/memstore/triplestore.go index babae77..da03810 100644 --- a/graph/memstore/triplestore.go +++ b/graph/memstore/triplestore.go @@ -18,11 +18,11 @@ import ( "fmt" "github.com/barakmich/glog" + "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/graph/memstore/b" "github.com/google/cayley/quad" - - "github.com/petar/GoLLRB/llrb" ) func init() { @@ -32,47 +32,36 @@ func init() { } type QuadDirectionIndex struct { - subject map[int64]*llrb.LLRB - predicate map[int64]*llrb.LLRB - object map[int64]*llrb.LLRB - label map[int64]*llrb.LLRB + index [4]map[int64]*b.Tree } -func NewQuadDirectionIndex() *QuadDirectionIndex { - var qdi QuadDirectionIndex - qdi.subject = make(map[int64]*llrb.LLRB) - qdi.predicate = make(map[int64]*llrb.LLRB) - qdi.object = make(map[int64]*llrb.LLRB) - qdi.label = make(map[int64]*llrb.LLRB) - return &qdi +func NewQuadDirectionIndex() QuadDirectionIndex { + return QuadDirectionIndex{[...]map[int64]*b.Tree{ + quad.Subject - 1: make(map[int64]*b.Tree), + quad.Predicate - 1: make(map[int64]*b.Tree), + quad.Object - 1: make(map[int64]*b.Tree), + quad.Label - 1: make(map[int64]*b.Tree), + }} } -func (qdi *QuadDirectionIndex) GetForDir(d quad.Direction) map[int64]*llrb.LLRB { - switch d { - case quad.Subject: - return qdi.subject - case quad.Object: - return qdi.object - case quad.Predicate: - return qdi.predicate - case quad.Label: - return qdi.label +func (qdi QuadDirectionIndex) Tree(d quad.Direction, id int64) *b.Tree { + if d < quad.Subject || d > quad.Label { + panic("illegal direction") } - panic("illegal direction") -} - -func (qdi *QuadDirectionIndex) GetOrCreate(d quad.Direction, id int64) *llrb.LLRB { - directionIndex := qdi.GetForDir(d) - if _, ok := directionIndex[id]; !ok { - directionIndex[id] = llrb.New() + tree, ok := qdi.index[d-1][id] + if !ok { + tree = b.TreeNew(cmp) + qdi.index[d-1][id] = tree } - return directionIndex[id] + return tree } -func (qdi *QuadDirectionIndex) Get(d quad.Direction, id int64) (*llrb.LLRB, bool) { - directionIndex := qdi.GetForDir(d) - tree, exists := directionIndex[id] - return tree, exists +func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*b.Tree, bool) { + if d < quad.Subject || d > quad.Label { + panic("illegal direction") + } + tree, ok := qdi.index[d-1][id] + return tree, ok } type LogEntry struct { @@ -88,24 +77,24 @@ type TripleStore struct { log []LogEntry size int64 index QuadDirectionIndex - // vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree + // vip_index map[string]map[int64]map[string]map[int64]*b.Tree } func newTripleStore() *TripleStore { - var ts TripleStore - ts.idMap = make(map[string]int64) - ts.revIdMap = make(map[int64]string) - ts.log = make([]LogEntry, 1, 200) + return &TripleStore{ + idMap: make(map[string]int64), + revIdMap: make(map[int64]string), - // Sentinel null entry so indices start at 1 - ts.log[0] = LogEntry{} - ts.index = *NewQuadDirectionIndex() - ts.idCounter = 1 - ts.quadIdCounter = 1 - return &ts + // Sentinel null entry so indices start at 1 + log: make([]LogEntry, 1, 200), + + index: NewQuadDirectionIndex(), + idCounter: 1, + quadIdCounter: 1, + } } -func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { +func (ts *TripleStore) ApplyDeltas(deltas []graph.Delta) error { for _, d := range deltas { var err error if d.Action == graph.Add { @@ -120,47 +109,47 @@ func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { return nil } -func (ts *TripleStore) quadExists(t quad.Quad) (bool, int64) { - smallest := -1 - var smallest_tree *llrb.LLRB +const maxInt = int(^uint(0) >> 1) + +func (ts *TripleStore) indexOf(t quad.Quad) (int64, bool) { + min := maxInt + var tree *b.Tree for d := quad.Subject; d <= quad.Label; d++ { sid := t.Get(d) if d == quad.Label && sid == "" { continue } id, ok := ts.idMap[sid] - // If we've never heard about a node, it most not exist + // If we've never heard about a node, it must not exist if !ok { - return false, 0 + return 0, false } - index, exists := ts.index.Get(d, id) - if !exists { + index, ok := ts.index.Get(d, id) + if !ok { // If it's never been indexed in this direction, it can't exist. - return false, 0 + return 0, false } - if smallest == -1 || index.Len() < smallest { - smallest = index.Len() - smallest_tree = index + if l := index.Len(); l < min { + min, tree = l, index } } - it := NewLlrbIterator(smallest_tree, "", ts) + it := NewIterator(tree, "", ts) for it.Next() { val := it.Result() if t == ts.log[val.(int64)].Quad { - return true, val.(int64) + return val.(int64), true } } - return false, 0 + return 0, false } -func (ts *TripleStore) AddDelta(d *graph.Delta) error { - if exists, _ := ts.quadExists(d.Quad); exists { +func (ts *TripleStore) AddDelta(d graph.Delta) error { + if _, exists := ts.indexOf(d.Quad); exists { return graph.ErrQuadExists } - var quadID int64 - quadID = ts.quadIdCounter - ts.log = append(ts.log, LogEntry{Delta: *d}) + qid := ts.quadIdCounter + ts.log = append(ts.log, LogEntry{Delta: d}) ts.size++ ts.quadIdCounter++ @@ -181,25 +170,22 @@ func (ts *TripleStore) AddDelta(d *graph.Delta) error { continue } id := ts.idMap[d.Quad.Get(dir)] - tree := ts.index.GetOrCreate(dir, id) - tree.ReplaceOrInsert(Int64(quadID)) + tree := ts.index.Tree(dir, id) + tree.Set(qid, struct{}{}) } // TODO(barakmich): Add VIP indexing return nil } -func (ts *TripleStore) RemoveDelta(d *graph.Delta) error { - var prevQuadID int64 - var exists bool - prevQuadID = 0 - if exists, prevQuadID = ts.quadExists(d.Quad); !exists { +func (ts *TripleStore) RemoveDelta(d graph.Delta) error { + prevQuadID, exists := ts.indexOf(d.Quad) + if !exists { return graph.ErrQuadNotExist } - var quadID int64 - quadID = ts.quadIdCounter - ts.log = append(ts.log, LogEntry{Delta: *d}) + quadID := ts.quadIdCounter + ts.log = append(ts.log, LogEntry{Delta: d}) ts.log[prevQuadID].DeletedBy = quadID ts.size-- ts.quadIdCounter++ @@ -214,7 +200,7 @@ func (ts *TripleStore) TripleIterator(d quad.Direction, value graph.Value) graph index, ok := ts.index.Get(d, value.(int64)) data := fmt.Sprintf("dir:%s val:%d", d, value.(int64)) if ok { - return NewLlrbIterator(index, data, ts) + return NewIterator(index, data, ts) } return &iterator.Null{} } @@ -260,4 +246,5 @@ func (ts *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph. func (ts *TripleStore) NodesAllIterator() graph.Iterator { return NewMemstoreNodesAllIterator(ts) } + func (ts *TripleStore) Close() {} diff --git a/graph/mongo/iterator.go b/graph/mongo/iterator.go index ed65e3a..3356974 100644 --- a/graph/mongo/iterator.go +++ b/graph/mongo/iterator.go @@ -45,17 +45,7 @@ type Iterator struct { func NewIterator(qs *TripleStore, collection string, d quad.Direction, val graph.Value) *Iterator { name := qs.NameOf(val) - var constraint bson.M - switch d { - case quad.Subject: - constraint = bson.M{"Subject": name} - case quad.Predicate: - constraint = bson.M{"Predicate": name} - case quad.Object: - constraint = bson.M{"Object": name} - case quad.Label: - constraint = bson.M{"Label": name} - } + constraint := bson.M{d.String(): name} size, err := qs.db.C(collection).Find(constraint).Count() if err != nil { @@ -187,13 +177,13 @@ func (it *Iterator) Contains(v graph.Value) bool { case quad.Subject: offset = 0 case quad.Predicate: - offset = (it.qs.hasherSize * 2) + offset = (hashSize * 2) case quad.Object: - offset = (it.qs.hasherSize * 2) * 2 + offset = (hashSize * 2) * 2 case quad.Label: - offset = (it.qs.hasherSize * 2) * 3 + offset = (hashSize * 2) * 3 } - val := v.(string)[offset : it.qs.hasherSize*2+offset] + val := v.(string)[offset : hashSize*2+offset] if val == it.hash { it.result = v return graph.ContainsLogOut(it, v, true) diff --git a/graph/mongo/triplestore.go b/graph/mongo/triplestore.go index 7e54aee..03c4c93 100644 --- a/graph/mongo/triplestore.go +++ b/graph/mongo/triplestore.go @@ -18,6 +18,7 @@ import ( "crypto/sha1" "encoding/hex" "hash" + "sync" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" @@ -34,12 +35,17 @@ func init() { const DefaultDBName = "cayley" +var ( + hashPool = sync.Pool{ + New: func() interface{} { return sha1.New() }, + } + hashSize = sha1.Size +) + type TripleStore struct { - session *mgo.Session - db *mgo.Database - hasherSize int - makeHasher func() hash.Hash - idCache *IDLru + session *mgo.Session + db *mgo.Database + idCache *IDLru } func createNewMongoGraph(addr string, options graph.Options) error { @@ -54,18 +60,18 @@ func createNewMongoGraph(addr string, options graph.Options) error { } db := conn.DB(dbName) indexOpts := mgo.Index{ - Key: []string{"Sub"}, + Key: []string{"subject"}, Unique: false, DropDups: false, Background: true, Sparse: true, } db.C("quads").EnsureIndex(indexOpts) - indexOpts.Key = []string{"Pred"} + indexOpts.Key = []string{"predicate"} db.C("quads").EnsureIndex(indexOpts) - indexOpts.Key = []string{"Obj"} + indexOpts.Key = []string{"object"} db.C("quads").EnsureIndex(indexOpts) - indexOpts.Key = []string{"Label"} + indexOpts.Key = []string{"label"} db.C("quads").EnsureIndex(indexOpts) logOpts := mgo.Index{ Key: []string{"LogID"}, @@ -91,26 +97,26 @@ func newTripleStore(addr string, options graph.Options) (graph.TripleStore, erro } qs.db = conn.DB(dbName) qs.session = conn - qs.hasherSize = sha1.Size - qs.makeHasher = sha1.New qs.idCache = NewIDLru(1 << 16) return &qs, nil } -func (qs *TripleStore) getIdForTriple(t quad.Quad) string { - hasher := qs.makeHasher() - id := qs.convertStringToByteHash(t.Subject, hasher) - id += qs.convertStringToByteHash(t.Predicate, hasher) - id += qs.convertStringToByteHash(t.Object, hasher) - id += qs.convertStringToByteHash(t.Label, hasher) +func (qs *TripleStore) getIdForQuad(t quad.Quad) string { + id := qs.convertStringToByteHash(t.Subject) + id += qs.convertStringToByteHash(t.Predicate) + id += qs.convertStringToByteHash(t.Object) + id += qs.convertStringToByteHash(t.Label) return id } -func (qs *TripleStore) convertStringToByteHash(s string, hasher hash.Hash) string { - hasher.Reset() - key := make([]byte, 0, qs.hasherSize) - hasher.Write([]byte(s)) - key = hasher.Sum(key) +func (qs *TripleStore) convertStringToByteHash(s string) string { + h := hashPool.Get().(hash.Hash) + h.Reset() + defer hashPool.Put(h) + + key := make([]byte, 0, hashSize) + h.Write([]byte(s)) + key = h.Sum(key) return hex.EncodeToString(key) } @@ -147,26 +153,20 @@ func (qs *TripleStore) updateNodeBy(node_name string, inc int) error { return err } -func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) error { +func (qs *TripleStore) updateQuad(q quad.Quad, id int64, proc graph.Procedure) error { var setname string if proc == graph.Add { setname = "Added" } else if proc == graph.Delete { setname = "Deleted" } - tripledoc := bson.M{ - "Subject": t.Subject, - "Predicate": t.Predicate, - "Object": t.Object, - "Label": t.Label, - } upsert := bson.M{ - "$setOnInsert": tripledoc, + "$setOnInsert": q, "$push": bson.M{ setname: id, }, } - _, err := qs.db.C("quads").UpsertId(qs.getIdForTriple(t), upsert) + _, err := qs.db.C("quads").UpsertId(qs.getIdForQuad(q), upsert) if err != nil { glog.Errorf("Error: %v", err) } @@ -192,7 +192,7 @@ func (qs *TripleStore) checkValid(key string) bool { return true } -func (qs *TripleStore) updateLog(d *graph.Delta) error { +func (qs *TripleStore) updateLog(d graph.Delta) error { var action string if d.Action == graph.Add { action = "Add" @@ -202,7 +202,7 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error { entry := MongoLogEntry{ LogID: d.ID, Action: action, - Key: qs.getIdForTriple(d.Quad), + Key: qs.getIdForQuad(d.Quad), Timestamp: d.Timestamp.UnixNano(), } err := qs.db.C("log").Insert(entry) @@ -212,12 +212,12 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error { return err } -func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { +func (qs *TripleStore) ApplyDeltas(in []graph.Delta) error { qs.session.SetSafe(nil) ids := make(map[string]int) // Pre-check the existence condition. for _, d := range in { - key := qs.getIdForTriple(d.Quad) + key := qs.getIdForQuad(d.Quad) switch d.Action { case graph.Add: if qs.checkValid(key) { @@ -239,7 +239,7 @@ func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { } } for _, d := range in { - err := qs.updateTriple(d.Quad, d.ID, d.Action) + err := qs.updateQuad(d.Quad, d.ID, d.Action) if err != nil { return err } @@ -267,17 +267,12 @@ func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { } func (qs *TripleStore) Quad(val graph.Value) quad.Quad { - var bsonDoc bson.M - err := qs.db.C("quads").FindId(val.(string)).One(&bsonDoc) + var q quad.Quad + err := qs.db.C("quads").FindId(val.(string)).One(&q) if err != nil { glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err) } - return quad.Quad{ - bsonDoc["Subject"].(string), - bsonDoc["Predicate"].(string), - bsonDoc["Object"].(string), - bsonDoc["Label"].(string), - } + return q } func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { @@ -293,8 +288,7 @@ func (qs *TripleStore) TriplesAllIterator() graph.Iterator { } func (qs *TripleStore) ValueOf(s string) graph.Value { - h := qs.makeHasher() - return qs.convertStringToByteHash(s, h) + return qs.convertStringToByteHash(s) } func (qs *TripleStore) NameOf(v graph.Value) string { @@ -352,13 +346,13 @@ func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.V case quad.Subject: offset = 0 case quad.Predicate: - offset = (qs.hasherSize * 2) + offset = (hashSize * 2) case quad.Object: - offset = (qs.hasherSize * 2) * 2 + offset = (hashSize * 2) * 2 case quad.Label: - offset = (qs.hasherSize * 2) * 3 + offset = (hashSize * 2) * 3 } - val := in.(string)[offset : qs.hasherSize*2+offset] + val := in.(string)[offset : hashSize*2+offset] return val } diff --git a/graph/quadwriter.go b/graph/quadwriter.go index dddc19a..38ae137 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -53,8 +53,10 @@ func (h *Handle) Close() { h.QuadWriter.Close() } -var ErrQuadExists = errors.New("Quad exists") -var ErrQuadNotExist = errors.New("Quad doesn't exist") +var ( + ErrQuadExists = errors.New("Quad exists") + ErrQuadNotExist = errors.New("Quad doesn't exist") +) type QuadWriter interface { // Add a quad to the store. diff --git a/graph/triplestore.go b/graph/triplestore.go index 3f884ff..df85124 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -44,7 +44,7 @@ type Value interface{} type TripleStore interface { // The only way in is through building a transaction, which // is done by a replication strategy. - ApplyDeltas([]*Delta) error + ApplyDeltas([]Delta) error // Given an opaque token, returns the triple for that token from the store. Quad(Value) quad.Quad diff --git a/writer/single.go b/writer/single.go index 4a3a787..d8c3382 100644 --- a/writer/single.go +++ b/writer/single.go @@ -50,8 +50,8 @@ func (s *Single) AcquireNextID() int64 { } func (s *Single) AddQuad(q quad.Quad) error { - deltas := make([]*graph.Delta, 1) - deltas[0] = &graph.Delta{ + deltas := make([]graph.Delta, 1) + deltas[0] = graph.Delta{ ID: s.AcquireNextID(), Quad: q, Action: graph.Add, @@ -61,9 +61,9 @@ func (s *Single) AddQuad(q quad.Quad) error { } func (s *Single) AddQuadSet(set []quad.Quad) error { - deltas := make([]*graph.Delta, len(set)) + deltas := make([]graph.Delta, len(set)) for i, q := range set { - deltas[i] = &graph.Delta{ + deltas[i] = graph.Delta{ ID: s.AcquireNextID(), Quad: q, Action: graph.Add, @@ -75,8 +75,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error { } func (s *Single) RemoveQuad(q quad.Quad) error { - deltas := make([]*graph.Delta, 1) - deltas[0] = &graph.Delta{ + deltas := make([]graph.Delta, 1) + deltas[0] = graph.Delta{ ID: s.AcquireNextID(), Quad: q, Action: graph.Delete,