diff --git a/graph/bolt/all_iterator.go b/graph/bolt/all_iterator.go index 6c08b49..1214da5 100644 --- a/graph/bolt/all_iterator.go +++ b/graph/bolt/all_iterator.go @@ -40,15 +40,12 @@ type AllIterator struct { } func NewAllIterator(bucket []byte, d quad.Direction, qs *QuadStore) *AllIterator { - - it := AllIterator{ + return &AllIterator{ uid: iterator.NextUID(), bucket: bucket, dir: d, qs: qs, } - - return &it } func (it *AllIterator) UID() uint64 { diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go index 6fb1603..07c07bb 100644 --- a/graph/bolt/iterator.go +++ b/graph/bolt/iterator.go @@ -17,7 +17,6 @@ package bolt import ( "bytes" "encoding/json" - "errors" "fmt" "strings" @@ -29,6 +28,15 @@ import ( "github.com/google/cayley/quad" ) +var ( + boltType graph.Type + bufferSize = 50 +) + +func init() { + boltType = graph.RegisterIterator("bolt") +} + type Iterator struct { uid uint64 tags graph.Tagger @@ -43,13 +51,11 @@ type Iterator struct { size int64 } -var bufferSize = 50 - -func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) graph.Iterator { +func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) *Iterator { tok := value.(*Token) if !bytes.Equal(tok.bucket, nodeBucket) { glog.Error("Creating an iterator from a non-node value.") - return &iterator.Null{} + return &Iterator{done: true} } it := Iterator{ @@ -66,6 +72,8 @@ func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadSto return &it } +func Type() graph.Type { return boltType } + func (it *Iterator) UID() uint64 { return it.uid } @@ -108,8 +116,6 @@ func (it *Iterator) isLiveValue(val []byte) bool { return len(entry.History)%2 != 0 } -var errNotExist = errors.New("Triple doesn't exist") - func (it *Iterator) Next() bool { if it.done { return false @@ -135,7 +141,7 @@ func (it *Iterator) Next() bool { i++ } else { it.buffer = append(it.buffer, nil) - return errNotExist + return quad.ErrNotExist } } else { k, _ := cur.Seek(last) @@ -161,7 +167,7 @@ func (it *Iterator) Next() bool { return nil }) if err != nil { - if err != errNotExist { + if err != quad.ErrNotExist { glog.Error("Error nexting in database: ", err) } it.done = true @@ -212,45 +218,45 @@ func PositionOf(tok *Token, d quad.Direction, qs *QuadStore) int { case quad.Subject: return 0 case quad.Predicate: - return qs.hasherSize + return hashSize case quad.Object: - return 2 * qs.hasherSize + return 2 * hashSize case quad.Label: - return 3 * qs.hasherSize + return 3 * hashSize } } if bytes.Equal(tok.bucket, posBucket) { switch d { case quad.Subject: - return 2 * qs.hasherSize + return 2 * hashSize case quad.Predicate: return 0 case quad.Object: - return qs.hasherSize + return hashSize case quad.Label: - return 3 * qs.hasherSize + return 3 * hashSize } } if bytes.Equal(tok.bucket, ospBucket) { switch d { case quad.Subject: - return qs.hasherSize + return hashSize case quad.Predicate: - return 2 * qs.hasherSize + return 2 * hashSize case quad.Object: return 0 case quad.Label: - return 3 * qs.hasherSize + return 3 * hashSize } } if bytes.Equal(tok.bucket, cpsBucket) { switch d { case quad.Subject: - return 2 * qs.hasherSize + return 2 * hashSize case quad.Predicate: - return qs.hasherSize + return hashSize case quad.Object: - return 3 * qs.hasherSize + return 3 * hashSize case quad.Label: return 0 } @@ -295,14 +301,6 @@ func (it *Iterator) DebugString(indent int) string { ) } -var boltType graph.Type - -func init() { - boltType = graph.RegisterIterator("bolt") -} - -func Type() graph.Type { return boltType } - func (it *Iterator) Type() graph.Type { return boltType } func (it *Iterator) Sorted() bool { return false } diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 7b6c66f..627677e 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "hash" + "sync" "github.com/barakmich/glog" "github.com/boltdb/bolt" @@ -34,6 +35,13 @@ func init() { graph.RegisterTripleStore("bolt", true, newQuadStore, createNewBolt) } +var ( + hashPool = sync.Pool{ + New: func() interface{} { return sha1.New() }, + } + hashSize = sha1.Size +) + type Token struct { bucket []byte key []byte @@ -44,13 +52,11 @@ func (t *Token) Key() interface{} { } type QuadStore struct { - db *bolt.DB - path string - open bool - size int64 - horizon int64 - makeHasher func() hash.Hash - hasherSize int + db *bolt.DB + path string + open bool + size int64 + horizon int64 } func createNewBolt(path string, _ graph.Options) error { @@ -70,11 +76,9 @@ func createNewBolt(path string, _ graph.Options) error { return nil } -func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) { +func newQuadStore(path string, _ graph.Options) (graph.TripleStore, error) { var qs QuadStore var err error - qs.hasherSize = sha1.Size - qs.makeHasher = sha1.New db, err := bolt.Open(path, 0600, nil) if err != nil { glog.Errorln("Error, couldn't open! ", err) @@ -130,19 +134,17 @@ func bucketFor(d [4]quad.Direction) []byte { } func (qs *QuadStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { - hasher := qs.makeHasher() - key := make([]byte, 0, (qs.hasherSize * 4)) - key = append(key, qs.convertStringToByteHash(triple.Get(d[0]), hasher)...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[1]), hasher)...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[2]), hasher)...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[3]), hasher)...) + key := make([]byte, 0, (hashSize * 4)) + key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) return key } func (qs *QuadStore) createValueKeyFor(s string) []byte { - hasher := qs.makeHasher() - key := make([]byte, 0, qs.hasherSize) - key = append(key, qs.convertStringToByteHash(s, hasher)...) + key := make([]byte, 0, hashSize) + key = append(key, qs.convertStringToByteHash(s)...) return key } @@ -150,30 +152,30 @@ type IndexEntry struct { History []int64 } -// Short hand for direction permutations. var ( - spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} - osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} - pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} - cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} - spoBucket = bucketFor(spo) - ospBucket = bucketFor(osp) - posBucket = bucketFor(pos) - cpsBucket = bucketFor(cps) + // Short hand for direction permutations. + spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} + osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} + pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} + cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} + + // Byte arrays for each bucket name. + spoBucket = bucketFor(spo) + ospBucket = bucketFor(osp) + posBucket = bucketFor(pos) + cpsBucket = bucketFor(cps) + logBucket = []byte("log") + nodeBucket = []byte("node") + metaBucket = []byte("meta") ) -var logBucket = []byte("log") -var nodeBucket = []byte("node") -var metaBucket = []byte("meta") - func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { - var old_size = qs.size - var old_horizon = qs.horizon - var size_change int64 + old_size := qs.size + old_horizon := qs.horizon err := qs.db.Update(func(tx *bolt.Tx) error { var b *bolt.Bucket resizeMap := make(map[string]int64) - size_change = int64(0) + size_change := int64(0) for _, d := range deltas { bytes, err := json.Marshal(d) if err != nil { @@ -304,29 +306,30 @@ func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) er func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error { buf := new(bytes.Buffer) err := binary.Write(buf, binary.LittleEndian, qs.size) - if err == nil { - b := tx.Bucket(metaBucket) - werr := b.Put([]byte("size"), buf.Bytes()) - if werr != nil { - glog.Error("Couldn't write size!") - return werr - } - } else { + if err != nil { glog.Errorf("Couldn't convert size!") return err } + b := tx.Bucket(metaBucket) + werr := b.Put([]byte("size"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write size!") + return werr + } buf.Reset() err = binary.Write(buf, binary.LittleEndian, qs.horizon) - if err == nil { - b := tx.Bucket(metaBucket) - werr := b.Put([]byte("horizon"), buf.Bytes()) - if werr != nil { - glog.Error("Couldn't write horizon!") - return werr - } - } else { + + if err != nil { glog.Errorf("Couldn't convert horizon!") } + + b = tx.Bucket(metaBucket) + werr = b.Put([]byte("horizon"), buf.Bytes()) + + if werr != nil { + glog.Error("Couldn't write horizon!") + return werr + } return err } @@ -339,7 +342,6 @@ func (qs *QuadStore) Close() { } func (qs *QuadStore) Quad(k graph.Value) quad.Quad { - var in IndexEntry var q quad.Quad tok := k.(*Token) err := qs.db.View(func(tx *bolt.Tx) error { @@ -348,6 +350,7 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { if data == nil { return nil } + var in IndexEntry err := json.Unmarshal(data, &in) if err != nil { return err @@ -370,11 +373,13 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad { return q } -func (qs *QuadStore) convertStringToByteHash(s string, hasher hash.Hash) []byte { - hasher.Reset() - key := make([]byte, 0, qs.hasherSize) - hasher.Write([]byte(s)) - key = hasher.Sum(key) +func (qs *QuadStore) convertStringToByteHash(s string) []byte { + h := hashPool.Get().(hash.Hash) + h.Reset() + defer hashPool.Put(h) + key := make([]byte, 0, hashSize) + h.Write([]byte(s)) + key = h.Sum(key) return key } @@ -479,11 +484,10 @@ func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Va if offset != -1 { return &Token{ bucket: nodeBucket, - key: v.key[offset : offset+qs.hasherSize], + key: v.key[offset : offset+hashSize], } - } else { - return qs.ValueOf(qs.Quad(v).Get(d)) } + return qs.ValueOf(qs.Quad(v).Get(d)) } func compareTokens(a, b graph.Value) bool { diff --git a/quad/quad.go b/quad/quad.go index eaf7d98..4928040 100644 --- a/quad/quad.go +++ b/quad/quad.go @@ -44,6 +44,7 @@ import ( var ( ErrInvalid = errors.New("invalid N-Quad") ErrIncomplete = errors.New("incomplete N-Quad") + ErrNotExist = errors.New("Quad does not exist") ) // Our triple struct, used throughout.