diff --git a/cayley.go b/cayley.go index 7bc90f1..99294dc 100644 --- a/cayley.go +++ b/cayley.go @@ -41,6 +41,7 @@ import ( "github.com/google/cayley/quad/nquads" // Load all supported backends. + _ "github.com/google/cayley/graph/bolt" _ "github.com/google/cayley/graph/leveldb" _ "github.com/google/cayley/graph/memstore" _ "github.com/google/cayley/graph/mongo" diff --git a/db/db.go b/db/db.go index e5370ee..641e1c7 100644 --- a/db/db.go +++ b/db/db.go @@ -70,6 +70,7 @@ func OpenQuadWriter(qs graph.TripleStore, cfg *config.Config) (graph.QuadWriter, func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error { block := make([]quad.Quad, 0, cfg.LoadSize) + count := 0 for { t, err := dec.Unmarshal() if err != nil { @@ -80,11 +81,19 @@ func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error { } block = append(block, t) if len(block) == cap(block) { + count += len(block) qw.AddQuadSet(block) + if glog.V(2) { + glog.V(2).Infof("Wrote %d quads.", count) + } block = block[:0] } } + count += len(block) qw.AddQuadSet(block) + if glog.V(2) { + glog.V(2).Infof("Wrote %d quads.", count) + } return nil } diff --git a/graph/bolt/all_iterator.go b/graph/bolt/all_iterator.go index 95fe04b..f221527 100644 --- a/graph/bolt/all_iterator.go +++ b/graph/bolt/all_iterator.go @@ -19,6 +19,9 @@ import ( "fmt" "strings" + "github.com/barakmich/glog" + "github.com/boltdb/bolt" + "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" "github.com/google/cayley/quad" @@ -27,34 +30,22 @@ import ( type AllIterator struct { uid uint64 tags graph.Tagger - prefix []byte + bucket []byte dir quad.Direction - open bool qs *QuadStore - result graph.Value + result *Token + buffer [][]byte + offset int + done bool } -func NewAllIterator(prefix string, d quad.Direction, ts *QuadStore) *AllIterator { - opts := &opt.ReadOptions{ - DontFillCache: true, - } +func NewAllIterator(bucket []byte, d quad.Direction, qs *QuadStore) *AllIterator { it := AllIterator{ uid: iterator.NextUID(), - ro: opts, - iter: ts.db.NewIterator(nil, opts), - prefix: []byte(prefix), + bucket: bucket, dir: d, - open: true, - ts: ts, - } - - it.iter.Seek(it.prefix) - if !it.iter.Valid() { - // FIXME(kortschak) What are the semantics here? Is this iterator usable? - // If not, we should return nil *Iterator and an error. - it.open = false - it.iter.Release() + qs: qs, } return &it @@ -65,15 +56,9 @@ func (it *AllIterator) UID() uint64 { } func (it *AllIterator) Reset() { - if !it.open { - it.iter = it.ts.db.NewIterator(nil, it.ro) - it.open = true - } - it.iter.Seek(it.prefix) - if !it.iter.Valid() { - it.open = false - it.iter.Release() - } + it.buffer = nil + it.offset = 0 + it.done = false } func (it *AllIterator) Tagger() *graph.Tagger { @@ -91,28 +76,65 @@ func (it *AllIterator) TagResults(dst map[string]graph.Value) { } func (it *AllIterator) Clone() graph.Iterator { - out := NewAllIterator(string(it.prefix), it.dir, it.ts) + out := NewAllIterator(it.bucket, it.dir, it.qs) out.tags.CopyFrom(it) return out } func (it *AllIterator) Next() bool { - if !it.open { - it.result = nil + if it.done { return false } - var out []byte - out = make([]byte, len(it.iter.Key())) - copy(out, it.iter.Key()) - it.iter.Next() - if !it.iter.Valid() { - it.Close() + if len(it.buffer) <= it.offset+1 { + it.offset = 0 + var last []byte + if it.buffer != nil { + last = it.buffer[len(it.buffer)-1] + } + it.buffer = make([][]byte, 0, bufferSize) + err := it.qs.db.View(func(tx *bolt.Tx) error { + i := 0 + b := tx.Bucket(it.bucket) + cur := b.Cursor() + if last == nil { + k, _ := cur.First() + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } else { + k, _ := cur.Seek(last) + if !bytes.Equal(k, last) { + return fmt.Errorf("Couldn't pick up after", k) + } + } + for i < bufferSize { + k, _ := cur.Next() + if k == nil { + it.buffer = append(it.buffer, k) + break + } + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } + return nil + }) + if err != nil { + glog.Error("Error nexting in database: ", err) + it.done = true + return false + } + } else { + it.offset++ } - if !bytes.HasPrefix(out, it.prefix) { - it.Close() + if it.Result() == nil { + it.done = true return false } - it.result = Token(out) return true } @@ -121,7 +143,16 @@ func (it *AllIterator) ResultTree() *graph.ResultTree { } func (it *AllIterator) Result() graph.Value { - return it.result + if it.done { + return nil + } + if it.result != nil { + return it.result + } + if it.offset >= len(it.buffer) { + return nil + } + return &Token{bucket: it.bucket, key: it.buffer[it.offset]} } func (it *AllIterator) NextPath() bool { @@ -134,29 +165,23 @@ func (it *AllIterator) SubIterators() []graph.Iterator { } func (it *AllIterator) Contains(v graph.Value) bool { - it.result = v + it.result = v.(*Token) return true } func (it *AllIterator) Close() { - if it.open { - it.iter.Release() - it.open = false - } + it.result = nil + it.buffer = nil + it.done = true } func (it *AllIterator) Size() (int64, bool) { - size, err := it.ts.SizeOfPrefix(it.prefix) - if err == nil { - return size, false - } - // INT64_MAX - return int64(^uint64(0) >> 1), false + return it.qs.size, true } func (it *AllIterator) DebugString(indent int) string { size, _ := it.Size() - return fmt.Sprintf("%s(%s tags: %v leveldb size:%d %s %p)", strings.Repeat(" ", indent), it.Type(), it.tags.Tags(), size, it.dir, it) + return fmt.Sprintf("%s(%s tags: %v bolt size:%d %s %p)", strings.Repeat(" ", indent), it.Type(), it.tags.Tags(), size, it.dir, it) } func (it *AllIterator) Type() graph.Type { return graph.All } diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go index a1a1da6..8650a94 100644 --- a/graph/bolt/iterator.go +++ b/graph/bolt/iterator.go @@ -17,12 +17,12 @@ package bolt import ( "bytes" "encoding/json" + "errors" "fmt" "strings" "github.com/barakmich/glog" - ldbit "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/boltdb/bolt" "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" @@ -30,48 +30,38 @@ import ( ) type Iterator struct { - uid uint64 - tags graph.Tagger - nextPrefix []byte - checkId []byte - dir quad.Direction - open bool - iter ldbit.Iterator - qs *QuadStore - ro *opt.ReadOptions - originalPrefix string - result graph.Value + uid uint64 + tags graph.Tagger + bucket []byte + checkId []byte + dir quad.Direction + qs *QuadStore + result *Token + buffer [][]byte + offset int + done bool + size int64 } -func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *QuadStore) graph.Iterator { - vb := value.(Token) - p := make([]byte, 0, 2+qs.hasher.Size()) - p = append(p, []byte(prefix)...) - p = append(p, []byte(vb[1:])...) +var bufferSize = 50 - opts := &opt.ReadOptions{ - DontFillCache: true, +func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) graph.Iterator { + tok := value.(*Token) + if !bytes.Equal(tok.bucket, nodeBucket) { + glog.Error("Creating an iterator from a non-node value.") + return &iterator.Null{} } it := Iterator{ - uid: iterator.NextUID(), - nextPrefix: p, - checkId: vb, - dir: d, - originalPrefix: prefix, - ro: opts, - iter: qs.db.NewIterator(nil, opts), - open: true, - qs: qs, + uid: iterator.NextUID(), + bucket: bucket, + dir: d, + qs: qs, + size: qs.SizeOf(value), } - ok := it.iter.Seek(it.nextPrefix) - if !ok { - it.open = false - it.iter.Release() - glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix) - return &iterator.Null{} - } + it.checkId = make([]byte, len(tok.key)) + copy(it.checkId, tok.key) return &it } @@ -81,15 +71,9 @@ func (it *Iterator) UID() uint64 { } func (it *Iterator) Reset() { - if !it.open { - it.iter = it.qs.db.NewIterator(nil, it.ro) - it.open = true - } - ok := it.iter.Seek(it.nextPrefix) - if !ok { - it.open = false - it.iter.Release() - } + it.buffer = nil + it.offset = 0 + it.done = false } func (it *Iterator) Tagger() *graph.Tagger { @@ -107,16 +91,15 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) { } func (it *Iterator) Clone() graph.Iterator { - out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs) + out := NewIterator(it.bucket, it.dir, &Token{nodeBucket, it.checkId}, it.qs) out.Tagger().CopyFrom(it) return out } func (it *Iterator) Close() { - if it.open { - it.iter.Release() - it.open = false - } + it.result = nil + it.buffer = nil + it.done = true } func (it *Iterator) isLiveValue(val []byte) bool { @@ -125,36 +108,73 @@ func (it *Iterator) isLiveValue(val []byte) bool { return len(entry.History)%2 != 0 } +var errNotExist = errors.New("Triple doesn't exist") + func (it *Iterator) Next() bool { - if it.iter == nil { - it.result = nil + if it.done { return false } - if !it.open { - it.result = nil - return false - } - if !it.iter.Valid() { - it.result = nil - it.Close() - return false - } - if bytes.HasPrefix(it.iter.Key(), it.nextPrefix) { - if !it.isLiveValue(it.iter.Value()) { - return it.Next() + if len(it.buffer) <= it.offset+1 { + it.offset = 0 + var last []byte + if it.buffer != nil { + last = it.buffer[len(it.buffer)-1] } - out := make([]byte, len(it.iter.Key())) - copy(out, it.iter.Key()) - it.result = Token(out) - ok := it.iter.Next() - if !ok { - it.Close() + it.buffer = make([][]byte, 0, bufferSize) + err := it.qs.db.View(func(tx *bolt.Tx) error { + i := 0 + b := tx.Bucket(it.bucket) + cur := b.Cursor() + if last == nil { + k, _ := cur.Seek(it.checkId) + if bytes.HasPrefix(k, it.checkId) { + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } else { + it.buffer = append(it.buffer, nil) + return errNotExist + } + } else { + k, _ := cur.Seek(last) + if !bytes.Equal(k, last) { + return fmt.Errorf("Couldn't pick up after", k) + } + } + for i < bufferSize { + k, v := cur.Next() + if k == nil || !bytes.HasPrefix(k, it.checkId) { + it.buffer = append(it.buffer, nil) + break + } + if !it.isLiveValue(v) { + continue + } + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } + return nil + }) + if err != nil { + if err != errNotExist { + glog.Error("Error nexting in database: ", err) + } + it.done = true + return false } - return true + } else { + it.offset++ } - it.Close() - it.result = nil - return false + if it.Result() == nil { + it.done = true + return false + } + return true } func (it *Iterator) ResultTree() *graph.ResultTree { @@ -162,7 +182,19 @@ func (it *Iterator) ResultTree() *graph.ResultTree { } func (it *Iterator) Result() graph.Value { - return it.result + if it.done { + return nil + } + if it.result != nil { + return it.result + } + if it.offset >= len(it.buffer) { + return nil + } + if it.buffer[it.offset] == nil { + return nil + } + return &Token{bucket: it.bucket, key: it.buffer[it.offset]} } func (it *Iterator) NextPath() bool { @@ -174,65 +206,65 @@ func (it *Iterator) SubIterators() []graph.Iterator { return nil } -func PositionOf(prefix []byte, d quad.Direction, qs *QuadStore) int { - if bytes.Equal(prefix, []byte("sp")) { +func PositionOf(tok *Token, d quad.Direction, qs *QuadStore) int { + if bytes.Equal(tok.bucket, spoBucket) { switch d { case quad.Subject: - return 2 + return 0 case quad.Predicate: - return qs.hasher.Size() + 2 + return qs.hasher.Size() case quad.Object: - return 2*qs.hasher.Size() + 2 + return 2 * qs.hasher.Size() case quad.Label: - return 3*qs.hasher.Size() + 2 + return 3 * qs.hasher.Size() } } - if bytes.Equal(prefix, []byte("po")) { + if bytes.Equal(tok.bucket, posBucket) { switch d { case quad.Subject: - return 2*qs.hasher.Size() + 2 + return 2 * qs.hasher.Size() case quad.Predicate: - return 2 + return 0 case quad.Object: - return qs.hasher.Size() + 2 + return qs.hasher.Size() case quad.Label: - return 3*qs.hasher.Size() + 2 + return 3 * qs.hasher.Size() } } - if bytes.Equal(prefix, []byte("os")) { + if bytes.Equal(tok.bucket, ospBucket) { switch d { case quad.Subject: - return qs.hasher.Size() + 2 + return qs.hasher.Size() case quad.Predicate: - return 2*qs.hasher.Size() + 2 + return 2 * qs.hasher.Size() case quad.Object: - return 2 + return 0 case quad.Label: - return 3*qs.hasher.Size() + 2 + return 3 * qs.hasher.Size() } } - if bytes.Equal(prefix, []byte("cp")) { + if bytes.Equal(tok.bucket, cpsBucket) { switch d { case quad.Subject: - return 2*qs.hasher.Size() + 2 + return 2 * qs.hasher.Size() case quad.Predicate: - return qs.hasher.Size() + 2 + return qs.hasher.Size() case quad.Object: - return 3*qs.hasher.Size() + 2 + return 3 * qs.hasher.Size() case quad.Label: - return 2 + return 0 } } panic("unreachable") } func (it *Iterator) Contains(v graph.Value) bool { - val := v.(Token) - if val[0] == 'z' { + val := v.(*Token) + if bytes.Equal(val.bucket, nodeBucket) { return false } - offset := PositionOf(val[0:2], it.dir, it.qs) - if bytes.HasPrefix(val[offset:], it.checkId[1:]) { + offset := PositionOf(val, it.dir, it.qs) + if bytes.HasPrefix(val.key[offset:], it.checkId) { // You may ask, why don't we check to see if it's a valid (not deleted) triple // again? // @@ -248,31 +280,30 @@ func (it *Iterator) Contains(v graph.Value) bool { } func (it *Iterator) Size() (int64, bool) { - return it.qs.SizeOf(Token(it.checkId)), true + return it.size, true } func (it *Iterator) DebugString(indent int) string { - size, _ := it.Size() return fmt.Sprintf("%s(%s %d tags: %v dir: %s size:%d %s)", strings.Repeat(" ", indent), it.Type(), it.UID(), it.tags.Tags(), it.dir, - size, - it.qs.NameOf(Token(it.checkId)), + it.size, + it.qs.NameOf(&Token{it.bucket, it.checkId}), ) } -var levelDBType graph.Type +var boltType graph.Type func init() { - levelDBType = graph.RegisterIterator("leveldb") + boltType = graph.RegisterIterator("bolt") } -func Type() graph.Type { return levelDBType } +func Type() graph.Type { return boltType } -func (it *Iterator) Type() graph.Type { return levelDBType } +func (it *Iterator) Type() graph.Type { return boltType } func (it *Iterator) Sorted() bool { return false } func (it *Iterator) Optimize() (graph.Iterator, bool) { diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 974da37..6e2d401 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -21,9 +21,11 @@ import ( "encoding/json" "fmt" "hash" + "time" "github.com/barakmich/glog" "github.com/boltdb/bolt" + "github.com/boltdb/coalescer" "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" @@ -31,7 +33,7 @@ import ( ) func init() { - graph.RegisterTripleStore("bolt", true, newQuadStore, createNewLevelDB) + graph.RegisterTripleStore("bolt", true, newQuadStore, createNewBolt) } type Token struct { @@ -40,7 +42,7 @@ type Token struct { } func (t *Token) Key() interface{} { - return fmt.Sprint(t.bucket, t.data) + return fmt.Sprint(t.bucket, t.key) } type QuadStore struct { @@ -53,7 +55,6 @@ type QuadStore struct { } func createNewBolt(path string, _ graph.Options) error { - opts := &opt.Options{} db, err := bolt.Open(path, 0600, nil) if err != nil { glog.Errorf("Error: couldn't create Bolt database: %v", err) @@ -62,9 +63,6 @@ func createNewBolt(path string, _ graph.Options) error { defer db.Close() qs := &QuadStore{} qs.db = db - qs.writeopts = &opt.WriteOptions{ - Sync: true, - } err = qs.createBuckets() if err != nil { return err @@ -91,10 +89,10 @@ func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) } func (qs *QuadStore) createBuckets() error { - return db.Update(func(tx *bolt.Tx) error { + return qs.db.Update(func(tx *bolt.Tx) error { var err error - for _, bucket := range [][]byte{spo, osp, pos, cps} { - _, err = tx.CreateBucket(bucketFor(bucket)) + for _, index := range [][4]quad.Direction{spo, osp, pos, cps} { + _, err = tx.CreateBucket(bucketFor(index)) if err != nil { return fmt.Errorf("Couldn't create bucket: %s", err) } @@ -111,6 +109,7 @@ func (qs *QuadStore) createBuckets() error { if err != nil { return fmt.Errorf("Couldn't create bucket: %s", err) } + return nil }) } @@ -152,10 +151,14 @@ type IndexEntry struct { // Short hand for direction permutations. var ( - spo = bucketFor([4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label}) - osp = bucketFor([4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label}) - pos = bucketFor([4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label}) - cps = bucketFor([4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}) + spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} + osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} + pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} + cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} + spoBucket = bucketFor(spo) + ospBucket = bucketFor(osp) + posBucket = bucketFor(pos) + cpsBucket = bucketFor(cps) ) var logBucket = []byte("log") @@ -163,12 +166,14 @@ var nodeBucket = []byte("node") var metaBucket = []byte("meta") func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { - batch := &leveldb.Batch{} var size_change int64 var new_horizon int64 - err := qs.db.Update(func(tx *bolt.Tx) error { + c, err := coalescer.New(qs.db, 200, 100*time.Millisecond) + if err != nil { + return err + } + err = c.Update(func(tx *bolt.Tx) error { var b *bolt.Bucket - var err error resizeMap := make(map[string]int64) size_change = int64(0) for _, d := range deltas { @@ -220,12 +225,11 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error { var entry IndexEntry - b := tx.Bucket(bucketFor(spo)) - + b := tx.Bucket(spoBucket) data := b.Get(qs.createKeyFor(spo, q)) if data != nil { // We got something. - err = json.Unmarshal(data, &entry) + err := json.Unmarshal(data, &entry) if err != nil { return err } @@ -244,17 +248,17 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo entry.History = append(entry.History, id) - bytes, err := json.Marshal(entry) + jsonbytes, err := json.Marshal(entry) if err != nil { glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) return err } - for _, bucket := range [][4]quad.Direction{spo, osp, pos, cps} { - if bucket == cps && q.Get(quad.Label) == "" { + for _, index := range [][4]quad.Direction{spo, osp, pos, cps} { + if index == cps && q.Get(quad.Label) == "" { continue } - b := tx.Bucket(bucketFor(bucket)) - err = b.Put(qs.createKeyFor(bucket, q), bytes) + b := tx.Bucket(bucketFor(index)) + err = b.Put(qs.createKeyFor(index, q), jsonbytes) if err != nil { return err } @@ -268,14 +272,14 @@ type ValueData struct { } func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error { - value := &ValueData{name, amount} + value := ValueData{name, amount} b := tx.Bucket(nodeBucket) key := qs.createValueKeyFor(name) data := b.Get(key) if data != nil { // Node exists in the database -- unmarshal and update. - err = json.Unmarshal(b, value) + err := json.Unmarshal(data, &value) if err != nil { glog.Errorf("Error: couldn't reconstruct value: %v", err) return err @@ -336,13 +340,12 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { tok := k.(*Token) err := qs.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(tok.bucket) - data := qs.db.Get(tok.key, qs.readopts) + data := b.Get(tok.key) if data == nil { // No harm, no foul. return nil } - err = json.Unmarshal(data, &q) - return err + return json.Unmarshal(data, &q) }) if err != nil { glog.Error("Error getting triple: ", err) @@ -409,7 +412,7 @@ func (qs *QuadStore) getInt64ForKey(tx *bolt.Tx, key string, empty int64) (int64 return empty, nil } buf := bytes.NewBuffer(data) - err = binary.Read(buf, binary.LittleEndian, &out) + err := binary.Read(buf, binary.LittleEndian, &out) if err != nil { return 0, err } @@ -419,31 +422,31 @@ func (qs *QuadStore) getInt64ForKey(tx *bolt.Tx, key string, empty int64) (int64 func (qs *QuadStore) getMetadata() error { err := qs.db.View(func(tx *bolt.Tx) error { var err error - qs.size, err = qs.getInt64ForKey("size", 0) + qs.size, err = qs.getInt64ForKey(tx, "size", 0) if err != nil { return err } - qs.horizon, err = qs.getInt64ForKey("horizon", 0) + qs.horizon, err = qs.getInt64ForKey(tx, "horizon", 0) return err }) return err } func (qs *QuadStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { - var prefix []byte + var bucket []byte switch d { case quad.Subject: - prefix = spo + bucket = spoBucket case quad.Predicate: - prefix = pos + bucket = posBucket case quad.Object: - prefix = osp + bucket = ospBucket case quad.Label: - prefix = cps + bucket = cpsBucket default: panic("unreachable " + d.String()) } - return NewIterator(prefix, d, val, qs) + return NewIterator(bucket, d, val, qs) } func (qs *QuadStore) NodesAllIterator() graph.Iterator { @@ -451,7 +454,7 @@ func (qs *QuadStore) NodesAllIterator() graph.Iterator { } func (qs *QuadStore) TriplesAllIterator() graph.Iterator { - return NewAllIterator(pos, quad.Predicate, qs) + return NewAllIterator(posBucket, quad.Predicate, qs) } func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Value { @@ -460,7 +463,7 @@ func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Va if offset != -1 { return &Token{ bucket: nodeBucket, - key: v[offset : offset+qs.hasher.Size()], + key: v.key[offset : offset+qs.hasher.Size()], } } else { return qs.ValueOf(qs.Quad(v).Get(d)) @@ -470,7 +473,7 @@ func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Va func compareTokens(a, b graph.Value) bool { atok := a.(*Token) btok := b.(*Token) - return bytes.Equal(atok.key, btok.key) && atok.bucket == btok.bucket + return bytes.Equal(atok.key, btok.key) && bytes.Equal(atok.bucket, btok.bucket) } func (qs *QuadStore) FixedIterator() graph.FixedIterator {