From d261e5d870d16d7db164767f990e34917ee6e7dd Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Mon, 11 Aug 2014 02:54:14 -0400 Subject: [PATCH 01/13] wip --- graph/bolt/all_iterator.go | 176 +++++++++++ graph/bolt/iterator.go | 289 ++++++++++++++++++ graph/bolt/quadstore.go | 478 ++++++++++++++++++++++++++++++ graph/bolt/quadstore_iterator_optimize.go | 55 ++++ 4 files changed, 998 insertions(+) create mode 100644 graph/bolt/all_iterator.go create mode 100644 graph/bolt/iterator.go create mode 100644 graph/bolt/quadstore.go create mode 100644 graph/bolt/quadstore_iterator_optimize.go diff --git a/graph/bolt/all_iterator.go b/graph/bolt/all_iterator.go new file mode 100644 index 0000000..95fe04b --- /dev/null +++ b/graph/bolt/all_iterator.go @@ -0,0 +1,176 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "bytes" + "fmt" + "strings" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +type AllIterator struct { + uid uint64 + tags graph.Tagger + prefix []byte + dir quad.Direction + open bool + qs *QuadStore + result graph.Value +} + +func NewAllIterator(prefix string, d quad.Direction, ts *QuadStore) *AllIterator { + opts := &opt.ReadOptions{ + DontFillCache: true, + } + + it := AllIterator{ + uid: iterator.NextUID(), + ro: opts, + iter: ts.db.NewIterator(nil, opts), + prefix: []byte(prefix), + dir: d, + open: true, + ts: ts, + } + + it.iter.Seek(it.prefix) + if !it.iter.Valid() { + // FIXME(kortschak) What are the semantics here? Is this iterator usable? + // If not, we should return nil *Iterator and an error. + it.open = false + it.iter.Release() + } + + return &it +} + +func (it *AllIterator) UID() uint64 { + return it.uid +} + +func (it *AllIterator) Reset() { + if !it.open { + it.iter = it.ts.db.NewIterator(nil, it.ro) + it.open = true + } + it.iter.Seek(it.prefix) + if !it.iter.Valid() { + it.open = false + it.iter.Release() + } +} + +func (it *AllIterator) Tagger() *graph.Tagger { + return &it.tags +} + +func (it *AllIterator) TagResults(dst map[string]graph.Value) { + for _, tag := range it.tags.Tags() { + dst[tag] = it.Result() + } + + for tag, value := range it.tags.Fixed() { + dst[tag] = value + } +} + +func (it *AllIterator) Clone() graph.Iterator { + out := NewAllIterator(string(it.prefix), it.dir, it.ts) + out.tags.CopyFrom(it) + return out +} + +func (it *AllIterator) Next() bool { + if !it.open { + it.result = nil + return false + } + var out []byte + out = make([]byte, len(it.iter.Key())) + copy(out, it.iter.Key()) + it.iter.Next() + if !it.iter.Valid() { + it.Close() + } + if !bytes.HasPrefix(out, it.prefix) { + it.Close() + return false + } + it.result = Token(out) + return true +} + +func (it *AllIterator) ResultTree() *graph.ResultTree { + return graph.NewResultTree(it.Result()) +} + +func (it *AllIterator) Result() graph.Value { + return it.result +} + +func (it *AllIterator) NextPath() bool { + return false +} + +// No subiterators. +func (it *AllIterator) SubIterators() []graph.Iterator { + return nil +} + +func (it *AllIterator) Contains(v graph.Value) bool { + it.result = v + return true +} + +func (it *AllIterator) Close() { + if it.open { + it.iter.Release() + it.open = false + } +} + +func (it *AllIterator) Size() (int64, bool) { + size, err := it.ts.SizeOfPrefix(it.prefix) + if err == nil { + return size, false + } + // INT64_MAX + return int64(^uint64(0) >> 1), false +} + +func (it *AllIterator) DebugString(indent int) string { + size, _ := it.Size() + return fmt.Sprintf("%s(%s tags: %v leveldb size:%d %s %p)", strings.Repeat(" ", indent), it.Type(), it.tags.Tags(), size, it.dir, it) +} + +func (it *AllIterator) Type() graph.Type { return graph.All } +func (it *AllIterator) Sorted() bool { return false } + +func (it *AllIterator) Optimize() (graph.Iterator, bool) { + return it, false +} + +func (it *AllIterator) Stats() graph.IteratorStats { + s, _ := it.Size() + return graph.IteratorStats{ + ContainsCost: 1, + NextCost: 2, + Size: s, + } +} diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go new file mode 100644 index 0000000..a1a1da6 --- /dev/null +++ b/graph/bolt/iterator.go @@ -0,0 +1,289 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + + "github.com/barakmich/glog" + ldbit "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +type Iterator struct { + uid uint64 + tags graph.Tagger + nextPrefix []byte + checkId []byte + dir quad.Direction + open bool + iter ldbit.Iterator + qs *QuadStore + ro *opt.ReadOptions + originalPrefix string + result graph.Value +} + +func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *QuadStore) graph.Iterator { + vb := value.(Token) + p := make([]byte, 0, 2+qs.hasher.Size()) + p = append(p, []byte(prefix)...) + p = append(p, []byte(vb[1:])...) + + opts := &opt.ReadOptions{ + DontFillCache: true, + } + + it := Iterator{ + uid: iterator.NextUID(), + nextPrefix: p, + checkId: vb, + dir: d, + originalPrefix: prefix, + ro: opts, + iter: qs.db.NewIterator(nil, opts), + open: true, + qs: qs, + } + + ok := it.iter.Seek(it.nextPrefix) + if !ok { + it.open = false + it.iter.Release() + glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix) + return &iterator.Null{} + } + + return &it +} + +func (it *Iterator) UID() uint64 { + return it.uid +} + +func (it *Iterator) Reset() { + if !it.open { + it.iter = it.qs.db.NewIterator(nil, it.ro) + it.open = true + } + ok := it.iter.Seek(it.nextPrefix) + if !ok { + it.open = false + it.iter.Release() + } +} + +func (it *Iterator) Tagger() *graph.Tagger { + return &it.tags +} + +func (it *Iterator) TagResults(dst map[string]graph.Value) { + for _, tag := range it.tags.Tags() { + dst[tag] = it.Result() + } + + for tag, value := range it.tags.Fixed() { + dst[tag] = value + } +} + +func (it *Iterator) Clone() graph.Iterator { + out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs) + out.Tagger().CopyFrom(it) + return out +} + +func (it *Iterator) Close() { + if it.open { + it.iter.Release() + it.open = false + } +} + +func (it *Iterator) isLiveValue(val []byte) bool { + var entry IndexEntry + json.Unmarshal(val, &entry) + return len(entry.History)%2 != 0 +} + +func (it *Iterator) Next() bool { + if it.iter == nil { + it.result = nil + return false + } + if !it.open { + it.result = nil + return false + } + if !it.iter.Valid() { + it.result = nil + it.Close() + return false + } + if bytes.HasPrefix(it.iter.Key(), it.nextPrefix) { + if !it.isLiveValue(it.iter.Value()) { + return it.Next() + } + out := make([]byte, len(it.iter.Key())) + copy(out, it.iter.Key()) + it.result = Token(out) + ok := it.iter.Next() + if !ok { + it.Close() + } + return true + } + it.Close() + it.result = nil + return false +} + +func (it *Iterator) ResultTree() *graph.ResultTree { + return graph.NewResultTree(it.Result()) +} + +func (it *Iterator) Result() graph.Value { + return it.result +} + +func (it *Iterator) NextPath() bool { + return false +} + +// No subiterators. +func (it *Iterator) SubIterators() []graph.Iterator { + return nil +} + +func PositionOf(prefix []byte, d quad.Direction, qs *QuadStore) int { + if bytes.Equal(prefix, []byte("sp")) { + switch d { + case quad.Subject: + return 2 + case quad.Predicate: + return qs.hasher.Size() + 2 + case quad.Object: + return 2*qs.hasher.Size() + 2 + case quad.Label: + return 3*qs.hasher.Size() + 2 + } + } + if bytes.Equal(prefix, []byte("po")) { + switch d { + case quad.Subject: + return 2*qs.hasher.Size() + 2 + case quad.Predicate: + return 2 + case quad.Object: + return qs.hasher.Size() + 2 + case quad.Label: + return 3*qs.hasher.Size() + 2 + } + } + if bytes.Equal(prefix, []byte("os")) { + switch d { + case quad.Subject: + return qs.hasher.Size() + 2 + case quad.Predicate: + return 2*qs.hasher.Size() + 2 + case quad.Object: + return 2 + case quad.Label: + return 3*qs.hasher.Size() + 2 + } + } + if bytes.Equal(prefix, []byte("cp")) { + switch d { + case quad.Subject: + return 2*qs.hasher.Size() + 2 + case quad.Predicate: + return qs.hasher.Size() + 2 + case quad.Object: + return 3*qs.hasher.Size() + 2 + case quad.Label: + return 2 + } + } + panic("unreachable") +} + +func (it *Iterator) Contains(v graph.Value) bool { + val := v.(Token) + if val[0] == 'z' { + return false + } + offset := PositionOf(val[0:2], it.dir, it.qs) + if bytes.HasPrefix(val[offset:], it.checkId[1:]) { + // You may ask, why don't we check to see if it's a valid (not deleted) triple + // again? + // + // We've already done that -- in order to get the graph.Value token in the + // first place, we had to have done the check already; it came from a Next(). + // + // However, if it ever starts coming from somewhere else, it'll be more + // efficient to change the interface of the graph.Value for LevelDB to a + // struct with a flag for isValid, to save another random read. + return true + } + return false +} + +func (it *Iterator) Size() (int64, bool) { + return it.qs.SizeOf(Token(it.checkId)), true +} + +func (it *Iterator) DebugString(indent int) string { + size, _ := it.Size() + return fmt.Sprintf("%s(%s %d tags: %v dir: %s size:%d %s)", + strings.Repeat(" ", indent), + it.Type(), + it.UID(), + it.tags.Tags(), + it.dir, + size, + it.qs.NameOf(Token(it.checkId)), + ) +} + +var levelDBType graph.Type + +func init() { + levelDBType = graph.RegisterIterator("leveldb") +} + +func Type() graph.Type { return levelDBType } + +func (it *Iterator) Type() graph.Type { return levelDBType } +func (it *Iterator) Sorted() bool { return false } + +func (it *Iterator) Optimize() (graph.Iterator, bool) { + return it, false +} + +func (it *Iterator) Stats() graph.IteratorStats { + s, _ := it.Size() + return graph.IteratorStats{ + ContainsCost: 1, + NextCost: 2, + Size: s, + } +} diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go new file mode 100644 index 0000000..974da37 --- /dev/null +++ b/graph/bolt/quadstore.go @@ -0,0 +1,478 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "bytes" + "crypto/sha1" + "encoding/binary" + "encoding/json" + "fmt" + "hash" + + "github.com/barakmich/glog" + "github.com/boltdb/bolt" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +func init() { + graph.RegisterTripleStore("bolt", true, newQuadStore, createNewLevelDB) +} + +type Token struct { + bucket []byte + key []byte +} + +func (t *Token) Key() interface{} { + return fmt.Sprint(t.bucket, t.data) +} + +type QuadStore struct { + db *bolt.DB + path string + open bool + size int64 + horizon int64 + hasher hash.Hash +} + +func createNewBolt(path string, _ graph.Options) error { + opts := &opt.Options{} + db, err := bolt.Open(path, 0600, nil) + if err != nil { + glog.Errorf("Error: couldn't create Bolt database: %v", err) + return err + } + defer db.Close() + qs := &QuadStore{} + qs.db = db + qs.writeopts = &opt.WriteOptions{ + Sync: true, + } + err = qs.createBuckets() + if err != nil { + return err + } + qs.Close() + return nil +} + +func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) { + var qs QuadStore + var err error + qs.hasher = sha1.New() + db, err := bolt.Open(path, 0600, nil) + if err != nil { + glog.Errorln("Error, couldn't open! ", err) + return nil, err + } + qs.db = db + err = qs.getMetadata() + if err != nil { + return nil, err + } + return &qs, nil +} + +func (qs *QuadStore) createBuckets() error { + return db.Update(func(tx *bolt.Tx) error { + var err error + for _, bucket := range [][]byte{spo, osp, pos, cps} { + _, err = tx.CreateBucket(bucketFor(bucket)) + if err != nil { + return fmt.Errorf("Couldn't create bucket: %s", err) + } + } + _, err = tx.CreateBucket(logBucket) + if err != nil { + return fmt.Errorf("Couldn't create bucket: %s", err) + } + _, err = tx.CreateBucket(nodeBucket) + if err != nil { + return fmt.Errorf("Couldn't create bucket: %s", err) + } + _, err = tx.CreateBucket(metaBucket) + if err != nil { + return fmt.Errorf("Couldn't create bucket: %s", err) + } + }) +} + +func (qs *QuadStore) Size() int64 { + return qs.size +} + +func (qs *QuadStore) Horizon() int64 { + return qs.horizon +} + +func (qa *QuadStore) createDeltaKeyFor(d *graph.Delta) []byte { + return []byte(fmt.Sprintf("%018x", d.ID)) +} + +func bucketFor(d [4]quad.Direction) []byte { + return []byte{d[0].Prefix(), d[1].Prefix(), d[2].Prefix(), d[3].Prefix()} +} + +func (qs *QuadStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { + key := make([]byte, 0, (qs.hasher.Size() * 4)) + key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) + return key +} + +func (qs *QuadStore) createValueKeyFor(s string) []byte { + key := make([]byte, 0, qs.hasher.Size()) + key = append(key, qs.convertStringToByteHash(s)...) + return key +} + +type IndexEntry struct { + quad.Quad + History []int64 +} + +// Short hand for direction permutations. +var ( + spo = bucketFor([4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label}) + osp = bucketFor([4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label}) + pos = bucketFor([4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label}) + cps = bucketFor([4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}) +) + +var logBucket = []byte("log") +var nodeBucket = []byte("node") +var metaBucket = []byte("meta") + +func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { + batch := &leveldb.Batch{} + var size_change int64 + var new_horizon int64 + err := qs.db.Update(func(tx *bolt.Tx) error { + var b *bolt.Bucket + var err error + resizeMap := make(map[string]int64) + size_change = int64(0) + for _, d := range deltas { + bytes, err := json.Marshal(d) + if err != nil { + return err + } + b = tx.Bucket(logBucket) + err = b.Put(qs.createDeltaKeyFor(d), bytes) + if err != nil { + return err + } + err = qs.buildQuadWrite(tx, d.Quad, d.ID, d.Action == graph.Add) + if err != nil { + return err + } + delta := int64(1) + if d.Action == graph.Delete { + delta = int64(-1) + } + resizeMap[d.Quad.Subject] += delta + resizeMap[d.Quad.Predicate] += delta + resizeMap[d.Quad.Object] += delta + if d.Quad.Label != "" { + resizeMap[d.Quad.Label] += delta + } + size_change += delta + new_horizon = d.ID + } + for k, v := range resizeMap { + if v != 0 { + err := qs.UpdateValueKeyBy(k, v, tx) + if err != nil { + return err + } + } + } + return nil + }) + + if err != nil { + glog.Error("Couldn't write to DB for Delta set. Error: ", err) + return err + } + qs.size += size_change + qs.horizon = new_horizon + return nil +} + +func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error { + var entry IndexEntry + b := tx.Bucket(bucketFor(spo)) + + data := b.Get(qs.createKeyFor(spo, q)) + if data != nil { + // We got something. + err = json.Unmarshal(data, &entry) + if err != nil { + return err + } + } else { + entry.Quad = q + } + + if isAdd && len(entry.History)%2 == 1 { + glog.Error("Adding a valid triple ", entry) + return graph.ErrQuadExists + } + if !isAdd && len(entry.History)%2 == 0 { + glog.Error("Deleting an invalid triple ", entry) + return graph.ErrQuadNotExist + } + + entry.History = append(entry.History, id) + + bytes, err := json.Marshal(entry) + if err != nil { + glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) + return err + } + for _, bucket := range [][4]quad.Direction{spo, osp, pos, cps} { + if bucket == cps && q.Get(quad.Label) == "" { + continue + } + b := tx.Bucket(bucketFor(bucket)) + err = b.Put(qs.createKeyFor(bucket, q), bytes) + if err != nil { + return err + } + } + return nil +} + +type ValueData struct { + Name string + Size int64 +} + +func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error { + value := &ValueData{name, amount} + b := tx.Bucket(nodeBucket) + key := qs.createValueKeyFor(name) + data := b.Get(key) + + if data != nil { + // Node exists in the database -- unmarshal and update. + err = json.Unmarshal(b, value) + if err != nil { + glog.Errorf("Error: couldn't reconstruct value: %v", err) + return err + } + value.Size += amount + } + + // Are we deleting something? + if value.Size <= 0 { + value.Size = 0 + } + + // Repackage and rewrite. + bytes, err := json.Marshal(&value) + if err != nil { + glog.Errorf("Couldn't write to buffer for value %s: %s", name, err) + return err + } + err = b.Put(key, bytes) + return err +} + +func (qs *QuadStore) Close() { + qs.db.Update(func(tx *bolt.Tx) error { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, qs.size) + if err == nil { + b := tx.Bucket(metaBucket) + werr := b.Put([]byte("size"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write size before closing!") + return werr + } + } else { + glog.Errorf("Couldn't convert size before closing!") + return err + } + buf.Reset() + err = binary.Write(buf, binary.LittleEndian, qs.horizon) + if err == nil { + b := tx.Bucket(metaBucket) + werr := b.Put([]byte("horizon"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write horizon before closing!") + return werr + } + } else { + glog.Errorf("Couldn't convert horizon before closing!") + } + return err + }) + qs.db.Close() + qs.open = false +} + +func (qs *QuadStore) Quad(k graph.Value) quad.Quad { + var q quad.Quad + tok := k.(*Token) + err := qs.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(tok.bucket) + data := qs.db.Get(tok.key, qs.readopts) + if data == nil { + // No harm, no foul. + return nil + } + err = json.Unmarshal(data, &q) + return err + }) + if err != nil { + glog.Error("Error getting triple: ", err) + return quad.Quad{} + } + return q +} + +func (qs *QuadStore) convertStringToByteHash(s string) []byte { + qs.hasher.Reset() + key := make([]byte, 0, qs.hasher.Size()) + qs.hasher.Write([]byte(s)) + key = qs.hasher.Sum(key) + return key +} + +func (qs *QuadStore) ValueOf(s string) graph.Value { + return &Token{ + bucket: nodeBucket, + key: qs.createValueKeyFor(s), + } +} + +func (qs *QuadStore) valueData(t *Token) ValueData { + var out ValueData + if glog.V(3) { + glog.V(3).Infof("%s %v", string(t.bucket), t.key) + } + err := qs.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(t.bucket) + data := b.Get(t.key) + if data != nil { + return json.Unmarshal(data, &out) + } + return nil + }) + if err != nil { + glog.Errorln("Error: couldn't get value") + return ValueData{} + } + return out +} + +func (qs *QuadStore) NameOf(k graph.Value) string { + if k == nil { + glog.V(2).Info("k was nil") + return "" + } + return qs.valueData(k.(*Token)).Name +} + +func (qs *QuadStore) SizeOf(k graph.Value) int64 { + if k == nil { + return 0 + } + return int64(qs.valueData(k.(*Token)).Size) +} + +func (qs *QuadStore) getInt64ForKey(tx *bolt.Tx, key string, empty int64) (int64, error) { + var out int64 + b := tx.Bucket(metaBucket) + data := b.Get([]byte(key)) + if data == nil { + return empty, nil + } + buf := bytes.NewBuffer(data) + err = binary.Read(buf, binary.LittleEndian, &out) + if err != nil { + return 0, err + } + return out, nil +} + +func (qs *QuadStore) getMetadata() error { + err := qs.db.View(func(tx *bolt.Tx) error { + var err error + qs.size, err = qs.getInt64ForKey("size", 0) + if err != nil { + return err + } + qs.horizon, err = qs.getInt64ForKey("horizon", 0) + return err + }) + return err +} + +func (qs *QuadStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { + var prefix []byte + switch d { + case quad.Subject: + prefix = spo + case quad.Predicate: + prefix = pos + case quad.Object: + prefix = osp + case quad.Label: + prefix = cps + default: + panic("unreachable " + d.String()) + } + return NewIterator(prefix, d, val, qs) +} + +func (qs *QuadStore) NodesAllIterator() graph.Iterator { + return NewAllIterator(nodeBucket, quad.Any, qs) +} + +func (qs *QuadStore) TriplesAllIterator() graph.Iterator { + return NewAllIterator(pos, quad.Predicate, qs) +} + +func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Value { + v := val.(*Token) + offset := PositionOf(v, d, qs) + if offset != -1 { + return &Token{ + bucket: nodeBucket, + key: v[offset : offset+qs.hasher.Size()], + } + } else { + return qs.ValueOf(qs.Quad(v).Get(d)) + } +} + +func compareTokens(a, b graph.Value) bool { + atok := a.(*Token) + btok := b.(*Token) + return bytes.Equal(atok.key, btok.key) && atok.bucket == btok.bucket +} + +func (qs *QuadStore) FixedIterator() graph.FixedIterator { + return iterator.NewFixedIteratorWithCompare(compareTokens) +} diff --git a/graph/bolt/quadstore_iterator_optimize.go b/graph/bolt/quadstore_iterator_optimize.go new file mode 100644 index 0000000..e893020 --- /dev/null +++ b/graph/bolt/quadstore_iterator_optimize.go @@ -0,0 +1,55 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" +) + +func (ts *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { + switch it.Type() { + case graph.LinksTo: + return ts.optimizeLinksTo(it.(*iterator.LinksTo)) + + } + return it, false +} + +func (ts *QuadStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { + subs := it.SubIterators() + if len(subs) != 1 { + return it, false + } + primary := subs[0] + if primary.Type() == graph.Fixed { + size, _ := primary.Size() + if size == 1 { + if !graph.Next(primary) { + panic("unexpected size during optimize") + } + val := primary.Result() + newIt := ts.TripleIterator(it.Direction(), val) + nt := newIt.Tagger() + nt.CopyFrom(it) + for _, tag := range primary.Tagger().Tags() { + nt.AddFixed(tag, val) + } + it.Close() + return newIt, true + } + } + return it, false +} From 82e4d122fc77ae118b748e277e43fad486a74890 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Mon, 11 Aug 2014 05:31:34 -0400 Subject: [PATCH 02/13] bolt db works --- cayley.go | 1 + db/db.go | 9 ++ graph/bolt/all_iterator.go | 133 ++++++++++++++---------- graph/bolt/iterator.go | 247 +++++++++++++++++++++++++-------------------- graph/bolt/quadstore.go | 85 ++++++++-------- 5 files changed, 272 insertions(+), 203 deletions(-) diff --git a/cayley.go b/cayley.go index 7bc90f1..99294dc 100644 --- a/cayley.go +++ b/cayley.go @@ -41,6 +41,7 @@ import ( "github.com/google/cayley/quad/nquads" // Load all supported backends. + _ "github.com/google/cayley/graph/bolt" _ "github.com/google/cayley/graph/leveldb" _ "github.com/google/cayley/graph/memstore" _ "github.com/google/cayley/graph/mongo" diff --git a/db/db.go b/db/db.go index e5370ee..641e1c7 100644 --- a/db/db.go +++ b/db/db.go @@ -70,6 +70,7 @@ func OpenQuadWriter(qs graph.TripleStore, cfg *config.Config) (graph.QuadWriter, func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error { block := make([]quad.Quad, 0, cfg.LoadSize) + count := 0 for { t, err := dec.Unmarshal() if err != nil { @@ -80,11 +81,19 @@ func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error { } block = append(block, t) if len(block) == cap(block) { + count += len(block) qw.AddQuadSet(block) + if glog.V(2) { + glog.V(2).Infof("Wrote %d quads.", count) + } block = block[:0] } } + count += len(block) qw.AddQuadSet(block) + if glog.V(2) { + glog.V(2).Infof("Wrote %d quads.", count) + } return nil } diff --git a/graph/bolt/all_iterator.go b/graph/bolt/all_iterator.go index 95fe04b..f221527 100644 --- a/graph/bolt/all_iterator.go +++ b/graph/bolt/all_iterator.go @@ -19,6 +19,9 @@ import ( "fmt" "strings" + "github.com/barakmich/glog" + "github.com/boltdb/bolt" + "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" "github.com/google/cayley/quad" @@ -27,34 +30,22 @@ import ( type AllIterator struct { uid uint64 tags graph.Tagger - prefix []byte + bucket []byte dir quad.Direction - open bool qs *QuadStore - result graph.Value + result *Token + buffer [][]byte + offset int + done bool } -func NewAllIterator(prefix string, d quad.Direction, ts *QuadStore) *AllIterator { - opts := &opt.ReadOptions{ - DontFillCache: true, - } +func NewAllIterator(bucket []byte, d quad.Direction, qs *QuadStore) *AllIterator { it := AllIterator{ uid: iterator.NextUID(), - ro: opts, - iter: ts.db.NewIterator(nil, opts), - prefix: []byte(prefix), + bucket: bucket, dir: d, - open: true, - ts: ts, - } - - it.iter.Seek(it.prefix) - if !it.iter.Valid() { - // FIXME(kortschak) What are the semantics here? Is this iterator usable? - // If not, we should return nil *Iterator and an error. - it.open = false - it.iter.Release() + qs: qs, } return &it @@ -65,15 +56,9 @@ func (it *AllIterator) UID() uint64 { } func (it *AllIterator) Reset() { - if !it.open { - it.iter = it.ts.db.NewIterator(nil, it.ro) - it.open = true - } - it.iter.Seek(it.prefix) - if !it.iter.Valid() { - it.open = false - it.iter.Release() - } + it.buffer = nil + it.offset = 0 + it.done = false } func (it *AllIterator) Tagger() *graph.Tagger { @@ -91,28 +76,65 @@ func (it *AllIterator) TagResults(dst map[string]graph.Value) { } func (it *AllIterator) Clone() graph.Iterator { - out := NewAllIterator(string(it.prefix), it.dir, it.ts) + out := NewAllIterator(it.bucket, it.dir, it.qs) out.tags.CopyFrom(it) return out } func (it *AllIterator) Next() bool { - if !it.open { - it.result = nil + if it.done { return false } - var out []byte - out = make([]byte, len(it.iter.Key())) - copy(out, it.iter.Key()) - it.iter.Next() - if !it.iter.Valid() { - it.Close() + if len(it.buffer) <= it.offset+1 { + it.offset = 0 + var last []byte + if it.buffer != nil { + last = it.buffer[len(it.buffer)-1] + } + it.buffer = make([][]byte, 0, bufferSize) + err := it.qs.db.View(func(tx *bolt.Tx) error { + i := 0 + b := tx.Bucket(it.bucket) + cur := b.Cursor() + if last == nil { + k, _ := cur.First() + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } else { + k, _ := cur.Seek(last) + if !bytes.Equal(k, last) { + return fmt.Errorf("Couldn't pick up after", k) + } + } + for i < bufferSize { + k, _ := cur.Next() + if k == nil { + it.buffer = append(it.buffer, k) + break + } + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } + return nil + }) + if err != nil { + glog.Error("Error nexting in database: ", err) + it.done = true + return false + } + } else { + it.offset++ } - if !bytes.HasPrefix(out, it.prefix) { - it.Close() + if it.Result() == nil { + it.done = true return false } - it.result = Token(out) return true } @@ -121,7 +143,16 @@ func (it *AllIterator) ResultTree() *graph.ResultTree { } func (it *AllIterator) Result() graph.Value { - return it.result + if it.done { + return nil + } + if it.result != nil { + return it.result + } + if it.offset >= len(it.buffer) { + return nil + } + return &Token{bucket: it.bucket, key: it.buffer[it.offset]} } func (it *AllIterator) NextPath() bool { @@ -134,29 +165,23 @@ func (it *AllIterator) SubIterators() []graph.Iterator { } func (it *AllIterator) Contains(v graph.Value) bool { - it.result = v + it.result = v.(*Token) return true } func (it *AllIterator) Close() { - if it.open { - it.iter.Release() - it.open = false - } + it.result = nil + it.buffer = nil + it.done = true } func (it *AllIterator) Size() (int64, bool) { - size, err := it.ts.SizeOfPrefix(it.prefix) - if err == nil { - return size, false - } - // INT64_MAX - return int64(^uint64(0) >> 1), false + return it.qs.size, true } func (it *AllIterator) DebugString(indent int) string { size, _ := it.Size() - return fmt.Sprintf("%s(%s tags: %v leveldb size:%d %s %p)", strings.Repeat(" ", indent), it.Type(), it.tags.Tags(), size, it.dir, it) + return fmt.Sprintf("%s(%s tags: %v bolt size:%d %s %p)", strings.Repeat(" ", indent), it.Type(), it.tags.Tags(), size, it.dir, it) } func (it *AllIterator) Type() graph.Type { return graph.All } diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go index a1a1da6..8650a94 100644 --- a/graph/bolt/iterator.go +++ b/graph/bolt/iterator.go @@ -17,12 +17,12 @@ package bolt import ( "bytes" "encoding/json" + "errors" "fmt" "strings" "github.com/barakmich/glog" - ldbit "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/boltdb/bolt" "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" @@ -30,48 +30,38 @@ import ( ) type Iterator struct { - uid uint64 - tags graph.Tagger - nextPrefix []byte - checkId []byte - dir quad.Direction - open bool - iter ldbit.Iterator - qs *QuadStore - ro *opt.ReadOptions - originalPrefix string - result graph.Value + uid uint64 + tags graph.Tagger + bucket []byte + checkId []byte + dir quad.Direction + qs *QuadStore + result *Token + buffer [][]byte + offset int + done bool + size int64 } -func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *QuadStore) graph.Iterator { - vb := value.(Token) - p := make([]byte, 0, 2+qs.hasher.Size()) - p = append(p, []byte(prefix)...) - p = append(p, []byte(vb[1:])...) +var bufferSize = 50 - opts := &opt.ReadOptions{ - DontFillCache: true, +func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) graph.Iterator { + tok := value.(*Token) + if !bytes.Equal(tok.bucket, nodeBucket) { + glog.Error("Creating an iterator from a non-node value.") + return &iterator.Null{} } it := Iterator{ - uid: iterator.NextUID(), - nextPrefix: p, - checkId: vb, - dir: d, - originalPrefix: prefix, - ro: opts, - iter: qs.db.NewIterator(nil, opts), - open: true, - qs: qs, + uid: iterator.NextUID(), + bucket: bucket, + dir: d, + qs: qs, + size: qs.SizeOf(value), } - ok := it.iter.Seek(it.nextPrefix) - if !ok { - it.open = false - it.iter.Release() - glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix) - return &iterator.Null{} - } + it.checkId = make([]byte, len(tok.key)) + copy(it.checkId, tok.key) return &it } @@ -81,15 +71,9 @@ func (it *Iterator) UID() uint64 { } func (it *Iterator) Reset() { - if !it.open { - it.iter = it.qs.db.NewIterator(nil, it.ro) - it.open = true - } - ok := it.iter.Seek(it.nextPrefix) - if !ok { - it.open = false - it.iter.Release() - } + it.buffer = nil + it.offset = 0 + it.done = false } func (it *Iterator) Tagger() *graph.Tagger { @@ -107,16 +91,15 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) { } func (it *Iterator) Clone() graph.Iterator { - out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs) + out := NewIterator(it.bucket, it.dir, &Token{nodeBucket, it.checkId}, it.qs) out.Tagger().CopyFrom(it) return out } func (it *Iterator) Close() { - if it.open { - it.iter.Release() - it.open = false - } + it.result = nil + it.buffer = nil + it.done = true } func (it *Iterator) isLiveValue(val []byte) bool { @@ -125,36 +108,73 @@ func (it *Iterator) isLiveValue(val []byte) bool { return len(entry.History)%2 != 0 } +var errNotExist = errors.New("Triple doesn't exist") + func (it *Iterator) Next() bool { - if it.iter == nil { - it.result = nil + if it.done { return false } - if !it.open { - it.result = nil - return false - } - if !it.iter.Valid() { - it.result = nil - it.Close() - return false - } - if bytes.HasPrefix(it.iter.Key(), it.nextPrefix) { - if !it.isLiveValue(it.iter.Value()) { - return it.Next() + if len(it.buffer) <= it.offset+1 { + it.offset = 0 + var last []byte + if it.buffer != nil { + last = it.buffer[len(it.buffer)-1] } - out := make([]byte, len(it.iter.Key())) - copy(out, it.iter.Key()) - it.result = Token(out) - ok := it.iter.Next() - if !ok { - it.Close() + it.buffer = make([][]byte, 0, bufferSize) + err := it.qs.db.View(func(tx *bolt.Tx) error { + i := 0 + b := tx.Bucket(it.bucket) + cur := b.Cursor() + if last == nil { + k, _ := cur.Seek(it.checkId) + if bytes.HasPrefix(k, it.checkId) { + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } else { + it.buffer = append(it.buffer, nil) + return errNotExist + } + } else { + k, _ := cur.Seek(last) + if !bytes.Equal(k, last) { + return fmt.Errorf("Couldn't pick up after", k) + } + } + for i < bufferSize { + k, v := cur.Next() + if k == nil || !bytes.HasPrefix(k, it.checkId) { + it.buffer = append(it.buffer, nil) + break + } + if !it.isLiveValue(v) { + continue + } + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } + return nil + }) + if err != nil { + if err != errNotExist { + glog.Error("Error nexting in database: ", err) + } + it.done = true + return false } - return true + } else { + it.offset++ } - it.Close() - it.result = nil - return false + if it.Result() == nil { + it.done = true + return false + } + return true } func (it *Iterator) ResultTree() *graph.ResultTree { @@ -162,7 +182,19 @@ func (it *Iterator) ResultTree() *graph.ResultTree { } func (it *Iterator) Result() graph.Value { - return it.result + if it.done { + return nil + } + if it.result != nil { + return it.result + } + if it.offset >= len(it.buffer) { + return nil + } + if it.buffer[it.offset] == nil { + return nil + } + return &Token{bucket: it.bucket, key: it.buffer[it.offset]} } func (it *Iterator) NextPath() bool { @@ -174,65 +206,65 @@ func (it *Iterator) SubIterators() []graph.Iterator { return nil } -func PositionOf(prefix []byte, d quad.Direction, qs *QuadStore) int { - if bytes.Equal(prefix, []byte("sp")) { +func PositionOf(tok *Token, d quad.Direction, qs *QuadStore) int { + if bytes.Equal(tok.bucket, spoBucket) { switch d { case quad.Subject: - return 2 + return 0 case quad.Predicate: - return qs.hasher.Size() + 2 + return qs.hasher.Size() case quad.Object: - return 2*qs.hasher.Size() + 2 + return 2 * qs.hasher.Size() case quad.Label: - return 3*qs.hasher.Size() + 2 + return 3 * qs.hasher.Size() } } - if bytes.Equal(prefix, []byte("po")) { + if bytes.Equal(tok.bucket, posBucket) { switch d { case quad.Subject: - return 2*qs.hasher.Size() + 2 + return 2 * qs.hasher.Size() case quad.Predicate: - return 2 + return 0 case quad.Object: - return qs.hasher.Size() + 2 + return qs.hasher.Size() case quad.Label: - return 3*qs.hasher.Size() + 2 + return 3 * qs.hasher.Size() } } - if bytes.Equal(prefix, []byte("os")) { + if bytes.Equal(tok.bucket, ospBucket) { switch d { case quad.Subject: - return qs.hasher.Size() + 2 + return qs.hasher.Size() case quad.Predicate: - return 2*qs.hasher.Size() + 2 + return 2 * qs.hasher.Size() case quad.Object: - return 2 + return 0 case quad.Label: - return 3*qs.hasher.Size() + 2 + return 3 * qs.hasher.Size() } } - if bytes.Equal(prefix, []byte("cp")) { + if bytes.Equal(tok.bucket, cpsBucket) { switch d { case quad.Subject: - return 2*qs.hasher.Size() + 2 + return 2 * qs.hasher.Size() case quad.Predicate: - return qs.hasher.Size() + 2 + return qs.hasher.Size() case quad.Object: - return 3*qs.hasher.Size() + 2 + return 3 * qs.hasher.Size() case quad.Label: - return 2 + return 0 } } panic("unreachable") } func (it *Iterator) Contains(v graph.Value) bool { - val := v.(Token) - if val[0] == 'z' { + val := v.(*Token) + if bytes.Equal(val.bucket, nodeBucket) { return false } - offset := PositionOf(val[0:2], it.dir, it.qs) - if bytes.HasPrefix(val[offset:], it.checkId[1:]) { + offset := PositionOf(val, it.dir, it.qs) + if bytes.HasPrefix(val.key[offset:], it.checkId) { // You may ask, why don't we check to see if it's a valid (not deleted) triple // again? // @@ -248,31 +280,30 @@ func (it *Iterator) Contains(v graph.Value) bool { } func (it *Iterator) Size() (int64, bool) { - return it.qs.SizeOf(Token(it.checkId)), true + return it.size, true } func (it *Iterator) DebugString(indent int) string { - size, _ := it.Size() return fmt.Sprintf("%s(%s %d tags: %v dir: %s size:%d %s)", strings.Repeat(" ", indent), it.Type(), it.UID(), it.tags.Tags(), it.dir, - size, - it.qs.NameOf(Token(it.checkId)), + it.size, + it.qs.NameOf(&Token{it.bucket, it.checkId}), ) } -var levelDBType graph.Type +var boltType graph.Type func init() { - levelDBType = graph.RegisterIterator("leveldb") + boltType = graph.RegisterIterator("bolt") } -func Type() graph.Type { return levelDBType } +func Type() graph.Type { return boltType } -func (it *Iterator) Type() graph.Type { return levelDBType } +func (it *Iterator) Type() graph.Type { return boltType } func (it *Iterator) Sorted() bool { return false } func (it *Iterator) Optimize() (graph.Iterator, bool) { diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 974da37..6e2d401 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -21,9 +21,11 @@ import ( "encoding/json" "fmt" "hash" + "time" "github.com/barakmich/glog" "github.com/boltdb/bolt" + "github.com/boltdb/coalescer" "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" @@ -31,7 +33,7 @@ import ( ) func init() { - graph.RegisterTripleStore("bolt", true, newQuadStore, createNewLevelDB) + graph.RegisterTripleStore("bolt", true, newQuadStore, createNewBolt) } type Token struct { @@ -40,7 +42,7 @@ type Token struct { } func (t *Token) Key() interface{} { - return fmt.Sprint(t.bucket, t.data) + return fmt.Sprint(t.bucket, t.key) } type QuadStore struct { @@ -53,7 +55,6 @@ type QuadStore struct { } func createNewBolt(path string, _ graph.Options) error { - opts := &opt.Options{} db, err := bolt.Open(path, 0600, nil) if err != nil { glog.Errorf("Error: couldn't create Bolt database: %v", err) @@ -62,9 +63,6 @@ func createNewBolt(path string, _ graph.Options) error { defer db.Close() qs := &QuadStore{} qs.db = db - qs.writeopts = &opt.WriteOptions{ - Sync: true, - } err = qs.createBuckets() if err != nil { return err @@ -91,10 +89,10 @@ func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) } func (qs *QuadStore) createBuckets() error { - return db.Update(func(tx *bolt.Tx) error { + return qs.db.Update(func(tx *bolt.Tx) error { var err error - for _, bucket := range [][]byte{spo, osp, pos, cps} { - _, err = tx.CreateBucket(bucketFor(bucket)) + for _, index := range [][4]quad.Direction{spo, osp, pos, cps} { + _, err = tx.CreateBucket(bucketFor(index)) if err != nil { return fmt.Errorf("Couldn't create bucket: %s", err) } @@ -111,6 +109,7 @@ func (qs *QuadStore) createBuckets() error { if err != nil { return fmt.Errorf("Couldn't create bucket: %s", err) } + return nil }) } @@ -152,10 +151,14 @@ type IndexEntry struct { // Short hand for direction permutations. var ( - spo = bucketFor([4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label}) - osp = bucketFor([4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label}) - pos = bucketFor([4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label}) - cps = bucketFor([4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}) + spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} + osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} + pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} + cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} + spoBucket = bucketFor(spo) + ospBucket = bucketFor(osp) + posBucket = bucketFor(pos) + cpsBucket = bucketFor(cps) ) var logBucket = []byte("log") @@ -163,12 +166,14 @@ var nodeBucket = []byte("node") var metaBucket = []byte("meta") func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { - batch := &leveldb.Batch{} var size_change int64 var new_horizon int64 - err := qs.db.Update(func(tx *bolt.Tx) error { + c, err := coalescer.New(qs.db, 200, 100*time.Millisecond) + if err != nil { + return err + } + err = c.Update(func(tx *bolt.Tx) error { var b *bolt.Bucket - var err error resizeMap := make(map[string]int64) size_change = int64(0) for _, d := range deltas { @@ -220,12 +225,11 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error { var entry IndexEntry - b := tx.Bucket(bucketFor(spo)) - + b := tx.Bucket(spoBucket) data := b.Get(qs.createKeyFor(spo, q)) if data != nil { // We got something. - err = json.Unmarshal(data, &entry) + err := json.Unmarshal(data, &entry) if err != nil { return err } @@ -244,17 +248,17 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo entry.History = append(entry.History, id) - bytes, err := json.Marshal(entry) + jsonbytes, err := json.Marshal(entry) if err != nil { glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) return err } - for _, bucket := range [][4]quad.Direction{spo, osp, pos, cps} { - if bucket == cps && q.Get(quad.Label) == "" { + for _, index := range [][4]quad.Direction{spo, osp, pos, cps} { + if index == cps && q.Get(quad.Label) == "" { continue } - b := tx.Bucket(bucketFor(bucket)) - err = b.Put(qs.createKeyFor(bucket, q), bytes) + b := tx.Bucket(bucketFor(index)) + err = b.Put(qs.createKeyFor(index, q), jsonbytes) if err != nil { return err } @@ -268,14 +272,14 @@ type ValueData struct { } func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error { - value := &ValueData{name, amount} + value := ValueData{name, amount} b := tx.Bucket(nodeBucket) key := qs.createValueKeyFor(name) data := b.Get(key) if data != nil { // Node exists in the database -- unmarshal and update. - err = json.Unmarshal(b, value) + err := json.Unmarshal(data, &value) if err != nil { glog.Errorf("Error: couldn't reconstruct value: %v", err) return err @@ -336,13 +340,12 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { tok := k.(*Token) err := qs.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(tok.bucket) - data := qs.db.Get(tok.key, qs.readopts) + data := b.Get(tok.key) if data == nil { // No harm, no foul. return nil } - err = json.Unmarshal(data, &q) - return err + return json.Unmarshal(data, &q) }) if err != nil { glog.Error("Error getting triple: ", err) @@ -409,7 +412,7 @@ func (qs *QuadStore) getInt64ForKey(tx *bolt.Tx, key string, empty int64) (int64 return empty, nil } buf := bytes.NewBuffer(data) - err = binary.Read(buf, binary.LittleEndian, &out) + err := binary.Read(buf, binary.LittleEndian, &out) if err != nil { return 0, err } @@ -419,31 +422,31 @@ func (qs *QuadStore) getInt64ForKey(tx *bolt.Tx, key string, empty int64) (int64 func (qs *QuadStore) getMetadata() error { err := qs.db.View(func(tx *bolt.Tx) error { var err error - qs.size, err = qs.getInt64ForKey("size", 0) + qs.size, err = qs.getInt64ForKey(tx, "size", 0) if err != nil { return err } - qs.horizon, err = qs.getInt64ForKey("horizon", 0) + qs.horizon, err = qs.getInt64ForKey(tx, "horizon", 0) return err }) return err } func (qs *QuadStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { - var prefix []byte + var bucket []byte switch d { case quad.Subject: - prefix = spo + bucket = spoBucket case quad.Predicate: - prefix = pos + bucket = posBucket case quad.Object: - prefix = osp + bucket = ospBucket case quad.Label: - prefix = cps + bucket = cpsBucket default: panic("unreachable " + d.String()) } - return NewIterator(prefix, d, val, qs) + return NewIterator(bucket, d, val, qs) } func (qs *QuadStore) NodesAllIterator() graph.Iterator { @@ -451,7 +454,7 @@ func (qs *QuadStore) NodesAllIterator() graph.Iterator { } func (qs *QuadStore) TriplesAllIterator() graph.Iterator { - return NewAllIterator(pos, quad.Predicate, qs) + return NewAllIterator(posBucket, quad.Predicate, qs) } func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Value { @@ -460,7 +463,7 @@ func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Va if offset != -1 { return &Token{ bucket: nodeBucket, - key: v[offset : offset+qs.hasher.Size()], + key: v.key[offset : offset+qs.hasher.Size()], } } else { return qs.ValueOf(qs.Quad(v).Get(d)) @@ -470,7 +473,7 @@ func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Va func compareTokens(a, b graph.Value) bool { atok := a.(*Token) btok := b.(*Token) - return bytes.Equal(atok.key, btok.key) && atok.bucket == btok.bucket + return bytes.Equal(atok.key, btok.key) && bytes.Equal(atok.bucket, btok.bucket) } func (qs *QuadStore) FixedIterator() graph.FixedIterator { From 3ceb19ca6c843da3bc8dbd7c3f0e06e7ed0cf6bf Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Mon, 11 Aug 2014 05:32:53 -0400 Subject: [PATCH 03/13] add travis deps --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index bd0558f..a3c6dbb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,8 @@ install: - go get github.com/syndtr/goleveldb/leveldb/iterator - go get github.com/syndtr/goleveldb/leveldb/opt - go get github.com/syndtr/goleveldb/leveldb/util + - go get github.com/boltdb/bolt + - go get github.com/boltdb/coalescer - go get gopkg.in/mgo.v2 - go get gopkg.in/mgo.v2/bson From 1099969591c4dbd6df23f28d348ef0ceffca08b8 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Mon, 11 Aug 2014 17:13:49 -0400 Subject: [PATCH 04/13] save horizon with transactions --- .travis.yml | 1 - graph/bolt/quadstore.go | 76 ++++++++++++++++++++++++------------------------- 2 files changed, 38 insertions(+), 39 deletions(-) diff --git a/.travis.yml b/.travis.yml index a3c6dbb..7c5309a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,6 @@ install: - go get github.com/syndtr/goleveldb/leveldb/opt - go get github.com/syndtr/goleveldb/leveldb/util - go get github.com/boltdb/bolt - - go get github.com/boltdb/coalescer - go get gopkg.in/mgo.v2 - go get gopkg.in/mgo.v2/bson diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 6e2d401..5cfd8a3 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -21,11 +21,9 @@ import ( "encoding/json" "fmt" "hash" - "time" "github.com/barakmich/glog" "github.com/boltdb/bolt" - "github.com/boltdb/coalescer" "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" @@ -166,13 +164,10 @@ var nodeBucket = []byte("node") var metaBucket = []byte("meta") func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { + var old_size = qs.size + var old_horizon = qs.horizon var size_change int64 - var new_horizon int64 - c, err := coalescer.New(qs.db, 200, 100*time.Millisecond) - if err != nil { - return err - } - err = c.Update(func(tx *bolt.Tx) error { + err := qs.db.Update(func(tx *bolt.Tx) error { var b *bolt.Bucket resizeMap := make(map[string]int64) size_change = int64(0) @@ -201,7 +196,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { resizeMap[d.Quad.Label] += delta } size_change += delta - new_horizon = d.ID + qs.horizon = d.ID } for k, v := range resizeMap { if v != 0 { @@ -211,15 +206,16 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { } } } - return nil + qs.size += size_change + return qs.WriteHorizonAndSize(tx) }) if err != nil { glog.Error("Couldn't write to DB for Delta set. Error: ", err) + qs.horizon = old_horizon + qs.size = old_size return err } - qs.size += size_change - qs.horizon = new_horizon return nil } @@ -302,34 +298,38 @@ func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) er return err } +func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, qs.size) + if err == nil { + b := tx.Bucket(metaBucket) + werr := b.Put([]byte("size"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write size!") + return werr + } + } else { + glog.Errorf("Couldn't convert size!") + return err + } + buf.Reset() + err = binary.Write(buf, binary.LittleEndian, qs.horizon) + if err == nil { + b := tx.Bucket(metaBucket) + werr := b.Put([]byte("horizon"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write horizon!") + return werr + } + } else { + glog.Errorf("Couldn't convert horizon!") + } + return err +} + func (qs *QuadStore) Close() { qs.db.Update(func(tx *bolt.Tx) error { - buf := new(bytes.Buffer) - err := binary.Write(buf, binary.LittleEndian, qs.size) - if err == nil { - b := tx.Bucket(metaBucket) - werr := b.Put([]byte("size"), buf.Bytes()) - if werr != nil { - glog.Error("Couldn't write size before closing!") - return werr - } - } else { - glog.Errorf("Couldn't convert size before closing!") - return err - } - buf.Reset() - err = binary.Write(buf, binary.LittleEndian, qs.horizon) - if err == nil { - b := tx.Bucket(metaBucket) - werr := b.Put([]byte("horizon"), buf.Bytes()) - if werr != nil { - glog.Error("Couldn't write horizon before closing!") - return werr - } - } else { - glog.Errorf("Couldn't convert horizon before closing!") - } - return err + return qs.WriteHorizonAndSize(tx) }) qs.db.Close() qs.open = false From 5d3a4a4a8fd97595900a9b8d4cd4300e6f6d644e Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Mon, 11 Aug 2014 21:25:32 -0400 Subject: [PATCH 05/13] fix all iterator --- graph/bolt/all_iterator.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/graph/bolt/all_iterator.go b/graph/bolt/all_iterator.go index f221527..6c08b49 100644 --- a/graph/bolt/all_iterator.go +++ b/graph/bolt/all_iterator.go @@ -152,6 +152,9 @@ func (it *AllIterator) Result() graph.Value { if it.offset >= len(it.buffer) { return nil } + if it.buffer[it.offset] == nil { + return nil + } return &Token{bucket: it.bucket, key: it.buffer[it.offset]} } From 6b02f1a997413b0e51de4fbd6367c6709a6b6e77 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Wed, 13 Aug 2014 14:49:16 -0400 Subject: [PATCH 06/13] port 117 fix to bolt --- graph/bolt/iterator.go | 24 ++++++++++++------------ graph/bolt/quadstore.go | 44 ++++++++++++++++++++++++-------------------- 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go index 8650a94..f3bc322 100644 --- a/graph/bolt/iterator.go +++ b/graph/bolt/iterator.go @@ -212,45 +212,45 @@ func PositionOf(tok *Token, d quad.Direction, qs *QuadStore) int { case quad.Subject: return 0 case quad.Predicate: - return qs.hasher.Size() + return qs.hasherSize case quad.Object: - return 2 * qs.hasher.Size() + return 2 * qs.hasherSize case quad.Label: - return 3 * qs.hasher.Size() + return 3 * qs.hasherSize } } if bytes.Equal(tok.bucket, posBucket) { switch d { case quad.Subject: - return 2 * qs.hasher.Size() + return 2 * qs.hasherSize case quad.Predicate: return 0 case quad.Object: - return qs.hasher.Size() + return qs.hasherSize case quad.Label: - return 3 * qs.hasher.Size() + return 3 * qs.hasherSize } } if bytes.Equal(tok.bucket, ospBucket) { switch d { case quad.Subject: - return qs.hasher.Size() + return qs.hasherSize case quad.Predicate: - return 2 * qs.hasher.Size() + return 2 * qs.hasherSize case quad.Object: return 0 case quad.Label: - return 3 * qs.hasher.Size() + return 3 * qs.hasherSize } } if bytes.Equal(tok.bucket, cpsBucket) { switch d { case quad.Subject: - return 2 * qs.hasher.Size() + return 2 * qs.hasherSize case quad.Predicate: - return qs.hasher.Size() + return qs.hasherSize case quad.Object: - return 3 * qs.hasher.Size() + return 3 * qs.hasherSize case quad.Label: return 0 } diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 5cfd8a3..e3c4f52 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -44,12 +44,13 @@ func (t *Token) Key() interface{} { } type QuadStore struct { - db *bolt.DB - path string - open bool - size int64 - horizon int64 - hasher hash.Hash + db *bolt.DB + path string + open bool + size int64 + horizon int64 + makeHasher func() hash.Hash + hasherSize int } func createNewBolt(path string, _ graph.Options) error { @@ -72,7 +73,8 @@ func createNewBolt(path string, _ graph.Options) error { func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) { var qs QuadStore var err error - qs.hasher = sha1.New() + qs.hasherSize = sha1.Size + qs.makeHasher = sha1.New db, err := bolt.Open(path, 0600, nil) if err != nil { glog.Errorln("Error, couldn't open! ", err) @@ -128,17 +130,19 @@ func bucketFor(d [4]quad.Direction) []byte { } func (qs *QuadStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { - key := make([]byte, 0, (qs.hasher.Size() * 4)) - key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) + hasher := qs.makeHasher() + key := make([]byte, 0, (qs.hasherSize * 4)) + key = append(key, qs.convertStringToByteHash(triple.Get(d[0]), hasher)...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[1]), hasher)...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[2]), hasher)...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]), hasher)...) return key } func (qs *QuadStore) createValueKeyFor(s string) []byte { - key := make([]byte, 0, qs.hasher.Size()) - key = append(key, qs.convertStringToByteHash(s)...) + hasher := qs.makeHasher() + key := make([]byte, 0, qs.hasherSize) + key = append(key, qs.convertStringToByteHash(s, hasher)...) return key } @@ -354,11 +358,11 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { return q } -func (qs *QuadStore) convertStringToByteHash(s string) []byte { - qs.hasher.Reset() - key := make([]byte, 0, qs.hasher.Size()) - qs.hasher.Write([]byte(s)) - key = qs.hasher.Sum(key) +func (qs *QuadStore) convertStringToByteHash(s string, hasher hash.Hash) []byte { + hasher.Reset() + key := make([]byte, 0, qs.hasherSize) + hasher.Write([]byte(s)) + key = hasher.Sum(key) return key } @@ -463,7 +467,7 @@ func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Va if offset != -1 { return &Token{ bucket: nodeBucket, - key: v.key[offset : offset+qs.hasher.Size()], + key: v.key[offset : offset+qs.hasherSize], } } else { return qs.ValueOf(qs.Quad(v).Get(d)) From fc1648340d92b30dc93509e87e9d39df42a62e77 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Wed, 13 Aug 2014 15:07:55 -0400 Subject: [PATCH 07/13] slight improvement by appending deltas first --- graph/bolt/quadstore.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index e3c4f52..085e7b9 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -185,7 +185,9 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { if err != nil { return err } - err = qs.buildQuadWrite(tx, d.Quad, d.ID, d.Action == graph.Add) + } + for _, d := range deltas { + err := qs.buildQuadWrite(tx, d.Quad, d.ID, d.Action == graph.Add) if err != nil { return err } From c94cd2a53abcd6f1a782675d540c821fd453b80b Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Thu, 14 Aug 2014 00:37:20 -0400 Subject: [PATCH 08/13] store less duplicate data in the indices --- graph/bolt/quadstore.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 085e7b9..e445fda 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -121,8 +121,8 @@ func (qs *QuadStore) Horizon() int64 { return qs.horizon } -func (qa *QuadStore) createDeltaKeyFor(d *graph.Delta) []byte { - return []byte(fmt.Sprintf("%018x", d.ID)) +func (qs *QuadStore) createDeltaKeyFor(id int64) []byte { + return []byte(fmt.Sprintf("%018x", id)) } func bucketFor(d [4]quad.Direction) []byte { @@ -147,7 +147,6 @@ func (qs *QuadStore) createValueKeyFor(s string) []byte { } type IndexEntry struct { - quad.Quad History []int64 } @@ -181,7 +180,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { return err } b = tx.Bucket(logBucket) - err = b.Put(qs.createDeltaKeyFor(d), bytes) + err = b.Put(qs.createDeltaKeyFor(d.ID), bytes) if err != nil { return err } @@ -235,8 +234,6 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo if err != nil { return err } - } else { - entry.Quad = q } if isAdd && len(entry.History)%2 == 1 { @@ -342,12 +339,25 @@ func (qs *QuadStore) Close() { } func (qs *QuadStore) Quad(k graph.Value) quad.Quad { + var in IndexEntry var q quad.Quad tok := k.(*Token) err := qs.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(tok.bucket) data := b.Get(tok.key) if data == nil { + return nil + } + err := json.Unmarshal(data, &in) + if err != nil { + return err + } + if len(in.History) == 0 { + return nil + } + b = tx.Bucket(logBucket) + data = b.Get(qs.createDeltaKeyFor(in.History[len(in.History)-1])) + if data == nil { // No harm, no foul. return nil } From 3b0110b22680a4bf99771cc9ed1fcef5748d7d04 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sat, 16 Aug 2014 03:50:05 -0400 Subject: [PATCH 09/13] concrete deltas --- graph/bolt/quadstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index e445fda..7b6c66f 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -166,7 +166,7 @@ var logBucket = []byte("log") var nodeBucket = []byte("node") var metaBucket = []byte("meta") -func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { var old_size = qs.size var old_horizon = qs.horizon var size_change int64 From f4f0af4f53f03c16d065ffd95d75fe6abebb7783 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sat, 16 Aug 2014 07:16:31 -0400 Subject: [PATCH 10/13] improve cost of bolt iterator --- graph/bolt/iterator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go index f3bc322..6fb1603 100644 --- a/graph/bolt/iterator.go +++ b/graph/bolt/iterator.go @@ -314,7 +314,7 @@ func (it *Iterator) Stats() graph.IteratorStats { s, _ := it.Size() return graph.IteratorStats{ ContainsCost: 1, - NextCost: 2, + NextCost: 4, Size: s, } } From 6d82c78b456da21080ba97d7748245c40ca7e88c Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Fri, 22 Aug 2014 16:31:50 -0400 Subject: [PATCH 11/13] Cleanup based on comments --- graph/bolt/all_iterator.go | 5 +- graph/bolt/iterator.go | 56 ++++++++++---------- graph/bolt/quadstore.go | 126 +++++++++++++++++++++++---------------------- quad/quad.go | 1 + 4 files changed, 94 insertions(+), 94 deletions(-) diff --git a/graph/bolt/all_iterator.go b/graph/bolt/all_iterator.go index 6c08b49..1214da5 100644 --- a/graph/bolt/all_iterator.go +++ b/graph/bolt/all_iterator.go @@ -40,15 +40,12 @@ type AllIterator struct { } func NewAllIterator(bucket []byte, d quad.Direction, qs *QuadStore) *AllIterator { - - it := AllIterator{ + return &AllIterator{ uid: iterator.NextUID(), bucket: bucket, dir: d, qs: qs, } - - return &it } func (it *AllIterator) UID() uint64 { diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go index 6fb1603..07c07bb 100644 --- a/graph/bolt/iterator.go +++ b/graph/bolt/iterator.go @@ -17,7 +17,6 @@ package bolt import ( "bytes" "encoding/json" - "errors" "fmt" "strings" @@ -29,6 +28,15 @@ import ( "github.com/google/cayley/quad" ) +var ( + boltType graph.Type + bufferSize = 50 +) + +func init() { + boltType = graph.RegisterIterator("bolt") +} + type Iterator struct { uid uint64 tags graph.Tagger @@ -43,13 +51,11 @@ type Iterator struct { size int64 } -var bufferSize = 50 - -func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) graph.Iterator { +func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) *Iterator { tok := value.(*Token) if !bytes.Equal(tok.bucket, nodeBucket) { glog.Error("Creating an iterator from a non-node value.") - return &iterator.Null{} + return &Iterator{done: true} } it := Iterator{ @@ -66,6 +72,8 @@ func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadSto return &it } +func Type() graph.Type { return boltType } + func (it *Iterator) UID() uint64 { return it.uid } @@ -108,8 +116,6 @@ func (it *Iterator) isLiveValue(val []byte) bool { return len(entry.History)%2 != 0 } -var errNotExist = errors.New("Triple doesn't exist") - func (it *Iterator) Next() bool { if it.done { return false @@ -135,7 +141,7 @@ func (it *Iterator) Next() bool { i++ } else { it.buffer = append(it.buffer, nil) - return errNotExist + return quad.ErrNotExist } } else { k, _ := cur.Seek(last) @@ -161,7 +167,7 @@ func (it *Iterator) Next() bool { return nil }) if err != nil { - if err != errNotExist { + if err != quad.ErrNotExist { glog.Error("Error nexting in database: ", err) } it.done = true @@ -212,45 +218,45 @@ func PositionOf(tok *Token, d quad.Direction, qs *QuadStore) int { case quad.Subject: return 0 case quad.Predicate: - return qs.hasherSize + return hashSize case quad.Object: - return 2 * qs.hasherSize + return 2 * hashSize case quad.Label: - return 3 * qs.hasherSize + return 3 * hashSize } } if bytes.Equal(tok.bucket, posBucket) { switch d { case quad.Subject: - return 2 * qs.hasherSize + return 2 * hashSize case quad.Predicate: return 0 case quad.Object: - return qs.hasherSize + return hashSize case quad.Label: - return 3 * qs.hasherSize + return 3 * hashSize } } if bytes.Equal(tok.bucket, ospBucket) { switch d { case quad.Subject: - return qs.hasherSize + return hashSize case quad.Predicate: - return 2 * qs.hasherSize + return 2 * hashSize case quad.Object: return 0 case quad.Label: - return 3 * qs.hasherSize + return 3 * hashSize } } if bytes.Equal(tok.bucket, cpsBucket) { switch d { case quad.Subject: - return 2 * qs.hasherSize + return 2 * hashSize case quad.Predicate: - return qs.hasherSize + return hashSize case quad.Object: - return 3 * qs.hasherSize + return 3 * hashSize case quad.Label: return 0 } @@ -295,14 +301,6 @@ func (it *Iterator) DebugString(indent int) string { ) } -var boltType graph.Type - -func init() { - boltType = graph.RegisterIterator("bolt") -} - -func Type() graph.Type { return boltType } - func (it *Iterator) Type() graph.Type { return boltType } func (it *Iterator) Sorted() bool { return false } diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 7b6c66f..627677e 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "hash" + "sync" "github.com/barakmich/glog" "github.com/boltdb/bolt" @@ -34,6 +35,13 @@ func init() { graph.RegisterTripleStore("bolt", true, newQuadStore, createNewBolt) } +var ( + hashPool = sync.Pool{ + New: func() interface{} { return sha1.New() }, + } + hashSize = sha1.Size +) + type Token struct { bucket []byte key []byte @@ -44,13 +52,11 @@ func (t *Token) Key() interface{} { } type QuadStore struct { - db *bolt.DB - path string - open bool - size int64 - horizon int64 - makeHasher func() hash.Hash - hasherSize int + db *bolt.DB + path string + open bool + size int64 + horizon int64 } func createNewBolt(path string, _ graph.Options) error { @@ -70,11 +76,9 @@ func createNewBolt(path string, _ graph.Options) error { return nil } -func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) { +func newQuadStore(path string, _ graph.Options) (graph.TripleStore, error) { var qs QuadStore var err error - qs.hasherSize = sha1.Size - qs.makeHasher = sha1.New db, err := bolt.Open(path, 0600, nil) if err != nil { glog.Errorln("Error, couldn't open! ", err) @@ -130,19 +134,17 @@ func bucketFor(d [4]quad.Direction) []byte { } func (qs *QuadStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { - hasher := qs.makeHasher() - key := make([]byte, 0, (qs.hasherSize * 4)) - key = append(key, qs.convertStringToByteHash(triple.Get(d[0]), hasher)...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[1]), hasher)...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[2]), hasher)...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[3]), hasher)...) + key := make([]byte, 0, (hashSize * 4)) + key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) return key } func (qs *QuadStore) createValueKeyFor(s string) []byte { - hasher := qs.makeHasher() - key := make([]byte, 0, qs.hasherSize) - key = append(key, qs.convertStringToByteHash(s, hasher)...) + key := make([]byte, 0, hashSize) + key = append(key, qs.convertStringToByteHash(s)...) return key } @@ -150,30 +152,30 @@ type IndexEntry struct { History []int64 } -// Short hand for direction permutations. var ( - spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} - osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} - pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} - cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} - spoBucket = bucketFor(spo) - ospBucket = bucketFor(osp) - posBucket = bucketFor(pos) - cpsBucket = bucketFor(cps) + // Short hand for direction permutations. + spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} + osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} + pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} + cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} + + // Byte arrays for each bucket name. + spoBucket = bucketFor(spo) + ospBucket = bucketFor(osp) + posBucket = bucketFor(pos) + cpsBucket = bucketFor(cps) + logBucket = []byte("log") + nodeBucket = []byte("node") + metaBucket = []byte("meta") ) -var logBucket = []byte("log") -var nodeBucket = []byte("node") -var metaBucket = []byte("meta") - func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { - var old_size = qs.size - var old_horizon = qs.horizon - var size_change int64 + old_size := qs.size + old_horizon := qs.horizon err := qs.db.Update(func(tx *bolt.Tx) error { var b *bolt.Bucket resizeMap := make(map[string]int64) - size_change = int64(0) + size_change := int64(0) for _, d := range deltas { bytes, err := json.Marshal(d) if err != nil { @@ -304,29 +306,30 @@ func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) er func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error { buf := new(bytes.Buffer) err := binary.Write(buf, binary.LittleEndian, qs.size) - if err == nil { - b := tx.Bucket(metaBucket) - werr := b.Put([]byte("size"), buf.Bytes()) - if werr != nil { - glog.Error("Couldn't write size!") - return werr - } - } else { + if err != nil { glog.Errorf("Couldn't convert size!") return err } + b := tx.Bucket(metaBucket) + werr := b.Put([]byte("size"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write size!") + return werr + } buf.Reset() err = binary.Write(buf, binary.LittleEndian, qs.horizon) - if err == nil { - b := tx.Bucket(metaBucket) - werr := b.Put([]byte("horizon"), buf.Bytes()) - if werr != nil { - glog.Error("Couldn't write horizon!") - return werr - } - } else { + + if err != nil { glog.Errorf("Couldn't convert horizon!") } + + b = tx.Bucket(metaBucket) + werr = b.Put([]byte("horizon"), buf.Bytes()) + + if werr != nil { + glog.Error("Couldn't write horizon!") + return werr + } return err } @@ -339,7 +342,6 @@ func (qs *QuadStore) Close() { } func (qs *QuadStore) Quad(k graph.Value) quad.Quad { - var in IndexEntry var q quad.Quad tok := k.(*Token) err := qs.db.View(func(tx *bolt.Tx) error { @@ -348,6 +350,7 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { if data == nil { return nil } + var in IndexEntry err := json.Unmarshal(data, &in) if err != nil { return err @@ -370,11 +373,13 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { return q } -func (qs *QuadStore) convertStringToByteHash(s string, hasher hash.Hash) []byte { - hasher.Reset() - key := make([]byte, 0, qs.hasherSize) - hasher.Write([]byte(s)) - key = hasher.Sum(key) +func (qs *QuadStore) convertStringToByteHash(s string) []byte { + h := hashPool.Get().(hash.Hash) + h.Reset() + defer hashPool.Put(h) + key := make([]byte, 0, hashSize) + h.Write([]byte(s)) + key = h.Sum(key) return key } @@ -479,11 +484,10 @@ func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Va if offset != -1 { return &Token{ bucket: nodeBucket, - key: v.key[offset : offset+qs.hasherSize], + key: v.key[offset : offset+hashSize], } - } else { - return qs.ValueOf(qs.Quad(v).Get(d)) } + return qs.ValueOf(qs.Quad(v).Get(d)) } func compareTokens(a, b graph.Value) bool { diff --git a/quad/quad.go b/quad/quad.go index eaf7d98..4928040 100644 --- a/quad/quad.go +++ b/quad/quad.go @@ -44,6 +44,7 @@ import ( var ( ErrInvalid = errors.New("invalid N-Quad") ErrIncomplete = errors.New("incomplete N-Quad") + ErrNotExist = errors.New("Quad does not exist") ) // Our triple struct, used throughout. From e11dfeb50fbdb2197a31b1d85c92e85003a7bef0 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sat, 23 Aug 2014 17:19:14 -0400 Subject: [PATCH 12/13] Optionalize the sync parameter --- graph/bolt/quadstore.go | 16 +++++++++++----- graph/triplestore.go | 12 ++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 627677e..7c3cf60 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -39,7 +39,8 @@ var ( hashPool = sync.Pool{ New: func() interface{} { return sha1.New() }, } - hashSize = sha1.Size + hashSize = sha1.Size + localFillPercent = 0.7 ) type Token struct { @@ -76,7 +77,7 @@ func createNewBolt(path string, _ graph.Options) error { return nil } -func newQuadStore(path string, _ graph.Options) (graph.TripleStore, error) { +func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) { var qs QuadStore var err error db, err := bolt.Open(path, 0600, nil) @@ -85,6 +86,8 @@ func newQuadStore(path string, _ graph.Options) (graph.TripleStore, error) { return nil, err } qs.db = db + // BoolKey returns false on non-existence. IE, Sync by default. + qs.db.NoSync, _ = options.BoolKey("nosync") err = qs.getMetadata() if err != nil { return nil, err @@ -173,7 +176,8 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { old_size := qs.size old_horizon := qs.horizon err := qs.db.Update(func(tx *bolt.Tx) error { - var b *bolt.Bucket + b := tx.Bucket(logBucket) + b.FillPercent = localFillPercent resizeMap := make(map[string]int64) size_change := int64(0) for _, d := range deltas { @@ -181,7 +185,6 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { if err != nil { return err } - b = tx.Bucket(logBucket) err = b.Put(qs.createDeltaKeyFor(d.ID), bytes) if err != nil { return err @@ -229,6 +232,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error { var entry IndexEntry b := tx.Bucket(spoBucket) + b.FillPercent = localFillPercent data := b.Get(qs.createKeyFor(spo, q)) if data != nil { // We got something. @@ -259,6 +263,7 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo continue } b := tx.Bucket(bucketFor(index)) + b.FillPercent = localFillPercent err = b.Put(qs.createKeyFor(index, q), jsonbytes) if err != nil { return err @@ -275,6 +280,7 @@ type ValueData struct { func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error { value := ValueData{name, amount} b := tx.Bucket(nodeBucket) + b.FillPercent = localFillPercent key := qs.createValueKeyFor(name) data := b.Get(key) @@ -311,6 +317,7 @@ func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error { return err } b := tx.Bucket(metaBucket) + b.FillPercent = localFillPercent werr := b.Put([]byte("size"), buf.Bytes()) if werr != nil { glog.Error("Couldn't write size!") @@ -323,7 +330,6 @@ func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error { glog.Errorf("Couldn't convert horizon!") } - b = tx.Bucket(metaBucket) werr = b.Put([]byte("horizon"), buf.Bytes()) if werr != nil { diff --git a/graph/triplestore.go b/graph/triplestore.go index df85124..741297c 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -121,6 +121,18 @@ func (d Options) StringKey(key string) (string, bool) { return "", false } +func (d Options) BoolKey(key string) (bool, bool) { + if val, ok := d[key]; ok { + switch vv := val.(type) { + case bool: + return vv, true + default: + glog.Fatalln("Invalid", key, "parameter type from config.") + } + } + return false, false +} + var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load") type BulkLoader interface { From d0fcdf42992ad3ec831d34986315db3feedc1b8f Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sat, 23 Aug 2014 18:05:04 -0400 Subject: [PATCH 13/13] Documentation and tests --- README.md | 5 +++-- cayley_test.go | 35 +++++++++++++++++++++++++++++++++-- docs/Configuration.md | 20 +++++++++++++++----- docs/Overview.md | 3 ++- graph/bolt/iterator.go | 10 ++++++---- quad/quad.go | 1 - 6 files changed, 59 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 65c0faa..8d7752c 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,9 @@ Its goal is to be a part of the developer's toolbox where [Linked Data](http://l * JavaScript, with a [Gremlin](http://gremlindocs.com/)-inspired\* graph object. * (simplified) [MQL](https://developers.google.com/freebase/v1/mql-overview), for Freebase fans * Plays well with multiple backend stores: - * [LevelDB](http://code.google.com/p/leveldb/) for single-machine storage - * [MongoDB](http://mongodb.org) + * [LevelDB](http://code.google.com/p/leveldb/) + * [Bolt](http://github.com/boltdb/bolt) + * [MongoDB](http://mongodb.org) for distributed stores * In-memory, ephemeral * Modular design; easy to extend with new languages and backends * Good test coverage diff --git a/cayley_test.go b/cayley_test.go index 02d4f4a..d8d15dd 100644 --- a/cayley_test.go +++ b/cayley_test.go @@ -18,8 +18,10 @@ import ( "bytes" "compress/bzip2" "compress/gzip" + "flag" "fmt" "io" + "os" "reflect" "sort" "strings" @@ -33,6 +35,8 @@ import ( "github.com/google/cayley/query/gremlin" ) +var backend = flag.String("backend", "memstore", "Which backend to test. Loads test data to /tmp if not present.") + var benchmarkQueries = []struct { message string long bool @@ -378,15 +382,42 @@ var ( ) func prepare(t testing.TB) { + switch *backend { + case "memstore": + break + case "leveldb": + fallthrough + case "bolt": + cfg.DatabaseType = *backend + cfg.DatabasePath = fmt.Sprint("/tmp/cayley_test_", *backend) + cfg.DatabaseOptions = map[string]interface{}{ + "nosync": true, // It's a test. If we need to load, do it fast. + } + default: + t.Fatalf("Untestable backend store %s", *backend) + } + var err error create.Do(func() { + needsLoad := true + if graph.IsPersistent(cfg.DatabaseType) { + if _, err := os.Stat(cfg.DatabasePath); os.IsNotExist(err) { + err = db.Init(cfg) + if err != nil { + t.Fatalf("Could not initialize database: %v", err) + } + } else { + needsLoad = false + } + } + handle, err = db.Open(cfg) if err != nil { t.Fatalf("Failed to open %q: %v", cfg.DatabasePath, err) } - if !graph.IsPersistent(cfg.DatabaseType) { - err = load(handle.QuadWriter, cfg, "", "cquad") + if needsLoad { + err = load(handle.QuadWriter, cfg, "30kmoviedata.nq.gz", "cquad") if err != nil { t.Fatalf("Failed to load %q: %v", cfg.DatabasePath, err) } diff --git a/docs/Configuration.md b/docs/Configuration.md index 4b6b898..bd3bf11 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -23,7 +23,8 @@ All command line flags take precedence over the configuration file. * `mem`: An in-memory store, based on an initial N-Quads file. Loses all changes when the process exits. * `leveldb`: A persistent on-disk store backed by [LevelDB](http://code.google.com/p/leveldb/). - * `mongodb`: Stores the graph data and indices in a [MongoDB](http://mongodb.org) instance. Slower, as it incurs network traffic, but multiple Cayley instances can disappear and reconnect at will, across a potentially horizontally-scaled store. + * `bolt`: Stores the graph data on-disk in a [Bolt](http://github.com/boltdb/bolt) file. Uses more disk space and memory than LevelDB for smaller stores, but is often faster to write to and comparable for large ones, with faster average query times. + * `mongo`: Stores the graph data and indices in a [MongoDB](http://mongodb.org) instance. Slower, as it incurs network traffic, but multiple Cayley instances can disappear and reconnect at will, across a potentially horizontally-scaled store. #### **`db_path`** @@ -32,9 +33,10 @@ All command line flags take precedence over the configuration file. Where does the database actually live? Dependent on the type of database. For each datastore: - * `mem`: Path to a triple file to automatically load - * `leveldb`: Directory to hold the LevelDB database files - * `mongodb`: "hostname:port" of the desired MongoDB server. + * `mem`: Path to a triple file to automatically load. + * `leveldb`: Directory to hold the LevelDB database files. + * `bolt`: Path to the persistent single Bolt database file. + * `mongo`: "hostname:port" of the desired MongoDB server. #### **`listen_host`** @@ -103,8 +105,16 @@ The size in MiB of the LevelDB write cache. Increasing this number allows for mo The size in MiB of the LevelDB block cache. Increasing this number uses more memory to maintain a bigger cache of triple blocks for better performance. +### Bolt -### MongoDB +#### **`nosync`** + + * Type: Boolean + * Default: false + +Optionally disable syncing to disk per transaction. Nosync being true means much faster load times, but without consistency guarantees. + +### Mongo #### **`database_name`** diff --git a/docs/Overview.md b/docs/Overview.md index 300d005..39ded35 100644 --- a/docs/Overview.md +++ b/docs/Overview.md @@ -17,7 +17,8 @@ You can set up a full [configuration file](/docs/Configuration) if you'd prefer, Examples for each backend: * `leveldb`: `./cayley init --db=leveldb --dbpath=/tmp/moviedb` -- where /tmp/moviedb is the path you'd like to store your data. - * `mongodb`: `./cayley init --db=mongodb --dbpath=":"` -- where HOSTNAME and PORT point to your Mongo instance. + * `bolt`: `./cayley init --db=bolt --dbpath=/tmp/moviedb` -- where /tmp/moviedb is the filename where you'd like to store your data. + * `mongo`: `./cayley init --db=mongo --dbpath=":"` -- where HOSTNAME and PORT point to your Mongo instance. Those two options (db and dbpath) are always going to be present. If you feel like not repeating yourself, setting up a configuration file for your backend might be something to do now. There's an example file, `cayley.cfg.example` in the root directory. diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go index 07c07bb..a53f6ed 100644 --- a/graph/bolt/iterator.go +++ b/graph/bolt/iterator.go @@ -17,6 +17,7 @@ package bolt import ( "bytes" "encoding/json" + "errors" "fmt" "strings" @@ -29,8 +30,9 @@ import ( ) var ( - boltType graph.Type - bufferSize = 50 + boltType graph.Type + bufferSize = 50 + errNotExist = errors.New("Quad does not exist") ) func init() { @@ -141,7 +143,7 @@ func (it *Iterator) Next() bool { i++ } else { it.buffer = append(it.buffer, nil) - return quad.ErrNotExist + return errNotExist } } else { k, _ := cur.Seek(last) @@ -167,7 +169,7 @@ func (it *Iterator) Next() bool { return nil }) if err != nil { - if err != quad.ErrNotExist { + if err != errNotExist { glog.Error("Error nexting in database: ", err) } it.done = true diff --git a/quad/quad.go b/quad/quad.go index 4928040..eaf7d98 100644 --- a/quad/quad.go +++ b/quad/quad.go @@ -44,7 +44,6 @@ import ( var ( ErrInvalid = errors.New("invalid N-Quad") ErrIncomplete = errors.New("incomplete N-Quad") - ErrNotExist = errors.New("Quad does not exist") ) // Our triple struct, used throughout.