Cleanup based on comments

This commit is contained in:
Barak Michener 2014-08-22 16:31:50 -04:00
parent 77b72e7189
commit 6d82c78b45
4 changed files with 94 additions and 94 deletions

View file

@ -40,15 +40,12 @@ type AllIterator struct {
} }
func NewAllIterator(bucket []byte, d quad.Direction, qs *QuadStore) *AllIterator { func NewAllIterator(bucket []byte, d quad.Direction, qs *QuadStore) *AllIterator {
return &AllIterator{
it := AllIterator{
uid: iterator.NextUID(), uid: iterator.NextUID(),
bucket: bucket, bucket: bucket,
dir: d, dir: d,
qs: qs, qs: qs,
} }
return &it
} }
func (it *AllIterator) UID() uint64 { func (it *AllIterator) UID() uint64 {

View file

@ -17,7 +17,6 @@ package bolt
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"strings" "strings"
@ -29,6 +28,15 @@ import (
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
) )
var (
boltType graph.Type
bufferSize = 50
)
func init() {
boltType = graph.RegisterIterator("bolt")
}
type Iterator struct { type Iterator struct {
uid uint64 uid uint64
tags graph.Tagger tags graph.Tagger
@ -43,13 +51,11 @@ type Iterator struct {
size int64 size int64
} }
var bufferSize = 50 func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) *Iterator {
func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) graph.Iterator {
tok := value.(*Token) tok := value.(*Token)
if !bytes.Equal(tok.bucket, nodeBucket) { if !bytes.Equal(tok.bucket, nodeBucket) {
glog.Error("Creating an iterator from a non-node value.") glog.Error("Creating an iterator from a non-node value.")
return &iterator.Null{} return &Iterator{done: true}
} }
it := Iterator{ it := Iterator{
@ -66,6 +72,8 @@ func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadSto
return &it return &it
} }
func Type() graph.Type { return boltType }
func (it *Iterator) UID() uint64 { func (it *Iterator) UID() uint64 {
return it.uid return it.uid
} }
@ -108,8 +116,6 @@ func (it *Iterator) isLiveValue(val []byte) bool {
return len(entry.History)%2 != 0 return len(entry.History)%2 != 0
} }
var errNotExist = errors.New("Triple doesn't exist")
func (it *Iterator) Next() bool { func (it *Iterator) Next() bool {
if it.done { if it.done {
return false return false
@ -135,7 +141,7 @@ func (it *Iterator) Next() bool {
i++ i++
} else { } else {
it.buffer = append(it.buffer, nil) it.buffer = append(it.buffer, nil)
return errNotExist return quad.ErrNotExist
} }
} else { } else {
k, _ := cur.Seek(last) k, _ := cur.Seek(last)
@ -161,7 +167,7 @@ func (it *Iterator) Next() bool {
return nil return nil
}) })
if err != nil { if err != nil {
if err != errNotExist { if err != quad.ErrNotExist {
glog.Error("Error nexting in database: ", err) glog.Error("Error nexting in database: ", err)
} }
it.done = true it.done = true
@ -212,45 +218,45 @@ func PositionOf(tok *Token, d quad.Direction, qs *QuadStore) int {
case quad.Subject: case quad.Subject:
return 0 return 0
case quad.Predicate: case quad.Predicate:
return qs.hasherSize return hashSize
case quad.Object: case quad.Object:
return 2 * qs.hasherSize return 2 * hashSize
case quad.Label: case quad.Label:
return 3 * qs.hasherSize return 3 * hashSize
} }
} }
if bytes.Equal(tok.bucket, posBucket) { if bytes.Equal(tok.bucket, posBucket) {
switch d { switch d {
case quad.Subject: case quad.Subject:
return 2 * qs.hasherSize return 2 * hashSize
case quad.Predicate: case quad.Predicate:
return 0 return 0
case quad.Object: case quad.Object:
return qs.hasherSize return hashSize
case quad.Label: case quad.Label:
return 3 * qs.hasherSize return 3 * hashSize
} }
} }
if bytes.Equal(tok.bucket, ospBucket) { if bytes.Equal(tok.bucket, ospBucket) {
switch d { switch d {
case quad.Subject: case quad.Subject:
return qs.hasherSize return hashSize
case quad.Predicate: case quad.Predicate:
return 2 * qs.hasherSize return 2 * hashSize
case quad.Object: case quad.Object:
return 0 return 0
case quad.Label: case quad.Label:
return 3 * qs.hasherSize return 3 * hashSize
} }
} }
if bytes.Equal(tok.bucket, cpsBucket) { if bytes.Equal(tok.bucket, cpsBucket) {
switch d { switch d {
case quad.Subject: case quad.Subject:
return 2 * qs.hasherSize return 2 * hashSize
case quad.Predicate: case quad.Predicate:
return qs.hasherSize return hashSize
case quad.Object: case quad.Object:
return 3 * qs.hasherSize return 3 * hashSize
case quad.Label: case quad.Label:
return 0 return 0
} }
@ -295,14 +301,6 @@ func (it *Iterator) DebugString(indent int) string {
) )
} }
var boltType graph.Type
func init() {
boltType = graph.RegisterIterator("bolt")
}
func Type() graph.Type { return boltType }
func (it *Iterator) Type() graph.Type { return boltType } func (it *Iterator) Type() graph.Type { return boltType }
func (it *Iterator) Sorted() bool { return false } func (it *Iterator) Sorted() bool { return false }

View file

@ -21,6 +21,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"hash" "hash"
"sync"
"github.com/barakmich/glog" "github.com/barakmich/glog"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
@ -34,6 +35,13 @@ func init() {
graph.RegisterTripleStore("bolt", true, newQuadStore, createNewBolt) graph.RegisterTripleStore("bolt", true, newQuadStore, createNewBolt)
} }
var (
hashPool = sync.Pool{
New: func() interface{} { return sha1.New() },
}
hashSize = sha1.Size
)
type Token struct { type Token struct {
bucket []byte bucket []byte
key []byte key []byte
@ -44,13 +52,11 @@ func (t *Token) Key() interface{} {
} }
type QuadStore struct { type QuadStore struct {
db *bolt.DB db *bolt.DB
path string path string
open bool open bool
size int64 size int64
horizon int64 horizon int64
makeHasher func() hash.Hash
hasherSize int
} }
func createNewBolt(path string, _ graph.Options) error { func createNewBolt(path string, _ graph.Options) error {
@ -70,11 +76,9 @@ func createNewBolt(path string, _ graph.Options) error {
return nil return nil
} }
func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) { func newQuadStore(path string, _ graph.Options) (graph.TripleStore, error) {
var qs QuadStore var qs QuadStore
var err error var err error
qs.hasherSize = sha1.Size
qs.makeHasher = sha1.New
db, err := bolt.Open(path, 0600, nil) db, err := bolt.Open(path, 0600, nil)
if err != nil { if err != nil {
glog.Errorln("Error, couldn't open! ", err) glog.Errorln("Error, couldn't open! ", err)
@ -130,19 +134,17 @@ func bucketFor(d [4]quad.Direction) []byte {
} }
func (qs *QuadStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { func (qs *QuadStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte {
hasher := qs.makeHasher() key := make([]byte, 0, (hashSize * 4))
key := make([]byte, 0, (qs.hasherSize * 4)) key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[0]), hasher)...) key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[1]), hasher)...) key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[2]), hasher)...) key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...)
key = append(key, qs.convertStringToByteHash(triple.Get(d[3]), hasher)...)
return key return key
} }
func (qs *QuadStore) createValueKeyFor(s string) []byte { func (qs *QuadStore) createValueKeyFor(s string) []byte {
hasher := qs.makeHasher() key := make([]byte, 0, hashSize)
key := make([]byte, 0, qs.hasherSize) key = append(key, qs.convertStringToByteHash(s)...)
key = append(key, qs.convertStringToByteHash(s, hasher)...)
return key return key
} }
@ -150,30 +152,30 @@ type IndexEntry struct {
History []int64 History []int64
} }
// Short hand for direction permutations.
var ( var (
spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} // Short hand for direction permutations.
osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label}
pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label}
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label}
spoBucket = bucketFor(spo) cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
ospBucket = bucketFor(osp)
posBucket = bucketFor(pos) // Byte arrays for each bucket name.
cpsBucket = bucketFor(cps) spoBucket = bucketFor(spo)
ospBucket = bucketFor(osp)
posBucket = bucketFor(pos)
cpsBucket = bucketFor(cps)
logBucket = []byte("log")
nodeBucket = []byte("node")
metaBucket = []byte("meta")
) )
var logBucket = []byte("log")
var nodeBucket = []byte("node")
var metaBucket = []byte("meta")
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
var old_size = qs.size old_size := qs.size
var old_horizon = qs.horizon old_horizon := qs.horizon
var size_change int64
err := qs.db.Update(func(tx *bolt.Tx) error { err := qs.db.Update(func(tx *bolt.Tx) error {
var b *bolt.Bucket var b *bolt.Bucket
resizeMap := make(map[string]int64) resizeMap := make(map[string]int64)
size_change = int64(0) size_change := int64(0)
for _, d := range deltas { for _, d := range deltas {
bytes, err := json.Marshal(d) bytes, err := json.Marshal(d)
if err != nil { if err != nil {
@ -304,29 +306,30 @@ func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) er
func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error { func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, qs.size) err := binary.Write(buf, binary.LittleEndian, qs.size)
if err == nil { if err != nil {
b := tx.Bucket(metaBucket)
werr := b.Put([]byte("size"), buf.Bytes())
if werr != nil {
glog.Error("Couldn't write size!")
return werr
}
} else {
glog.Errorf("Couldn't convert size!") glog.Errorf("Couldn't convert size!")
return err return err
} }
b := tx.Bucket(metaBucket)
werr := b.Put([]byte("size"), buf.Bytes())
if werr != nil {
glog.Error("Couldn't write size!")
return werr
}
buf.Reset() buf.Reset()
err = binary.Write(buf, binary.LittleEndian, qs.horizon) err = binary.Write(buf, binary.LittleEndian, qs.horizon)
if err == nil {
b := tx.Bucket(metaBucket) if err != nil {
werr := b.Put([]byte("horizon"), buf.Bytes())
if werr != nil {
glog.Error("Couldn't write horizon!")
return werr
}
} else {
glog.Errorf("Couldn't convert horizon!") glog.Errorf("Couldn't convert horizon!")
} }
b = tx.Bucket(metaBucket)
werr = b.Put([]byte("horizon"), buf.Bytes())
if werr != nil {
glog.Error("Couldn't write horizon!")
return werr
}
return err return err
} }
@ -339,7 +342,6 @@ func (qs *QuadStore) Close() {
} }
func (qs *QuadStore) Quad(k graph.Value) quad.Quad { func (qs *QuadStore) Quad(k graph.Value) quad.Quad {
var in IndexEntry
var q quad.Quad var q quad.Quad
tok := k.(*Token) tok := k.(*Token)
err := qs.db.View(func(tx *bolt.Tx) error { err := qs.db.View(func(tx *bolt.Tx) error {
@ -348,6 +350,7 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad {
if data == nil { if data == nil {
return nil return nil
} }
var in IndexEntry
err := json.Unmarshal(data, &in) err := json.Unmarshal(data, &in)
if err != nil { if err != nil {
return err return err
@ -370,11 +373,13 @@ func (qs *QuadStore) Quad(k graph.Value) quad.Quad {
return q return q
} }
func (qs *QuadStore) convertStringToByteHash(s string, hasher hash.Hash) []byte { func (qs *QuadStore) convertStringToByteHash(s string) []byte {
hasher.Reset() h := hashPool.Get().(hash.Hash)
key := make([]byte, 0, qs.hasherSize) h.Reset()
hasher.Write([]byte(s)) defer hashPool.Put(h)
key = hasher.Sum(key) key := make([]byte, 0, hashSize)
h.Write([]byte(s))
key = h.Sum(key)
return key return key
} }
@ -479,11 +484,10 @@ func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Va
if offset != -1 { if offset != -1 {
return &Token{ return &Token{
bucket: nodeBucket, bucket: nodeBucket,
key: v.key[offset : offset+qs.hasherSize], key: v.key[offset : offset+hashSize],
} }
} else {
return qs.ValueOf(qs.Quad(v).Get(d))
} }
return qs.ValueOf(qs.Quad(v).Get(d))
} }
func compareTokens(a, b graph.Value) bool { func compareTokens(a, b graph.Value) bool {

View file

@ -44,6 +44,7 @@ import (
var ( var (
ErrInvalid = errors.New("invalid N-Quad") ErrInvalid = errors.New("invalid N-Quad")
ErrIncomplete = errors.New("incomplete N-Quad") ErrIncomplete = errors.New("incomplete N-Quad")
ErrNotExist = errors.New("Quad does not exist")
) )
// Our triple struct, used throughout. // Our triple struct, used throughout.