Optionalize the sync parameter

This commit is contained in:
Barak Michener 2014-08-23 17:19:14 -04:00
parent 6d82c78b45
commit e11dfeb50f
2 changed files with 23 additions and 5 deletions

View file

@ -40,6 +40,7 @@ var (
New: func() interface{} { return sha1.New() }, New: func() interface{} { return sha1.New() },
} }
hashSize = sha1.Size hashSize = sha1.Size
localFillPercent = 0.7
) )
type Token struct { type Token struct {
@ -76,7 +77,7 @@ func createNewBolt(path string, _ graph.Options) error {
return nil return nil
} }
func newQuadStore(path string, _ graph.Options) (graph.TripleStore, error) { func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) {
var qs QuadStore var qs QuadStore
var err error var err error
db, err := bolt.Open(path, 0600, nil) db, err := bolt.Open(path, 0600, nil)
@ -85,6 +86,8 @@ func newQuadStore(path string, _ graph.Options) (graph.TripleStore, error) {
return nil, err return nil, err
} }
qs.db = db qs.db = db
// BoolKey returns false on non-existence. IE, Sync by default.
qs.db.NoSync, _ = options.BoolKey("nosync")
err = qs.getMetadata() err = qs.getMetadata()
if err != nil { if err != nil {
return nil, err return nil, err
@ -173,7 +176,8 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
old_size := qs.size old_size := qs.size
old_horizon := qs.horizon old_horizon := qs.horizon
err := qs.db.Update(func(tx *bolt.Tx) error { err := qs.db.Update(func(tx *bolt.Tx) error {
var b *bolt.Bucket b := tx.Bucket(logBucket)
b.FillPercent = localFillPercent
resizeMap := make(map[string]int64) resizeMap := make(map[string]int64)
size_change := int64(0) size_change := int64(0)
for _, d := range deltas { for _, d := range deltas {
@ -181,7 +185,6 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
if err != nil { if err != nil {
return err return err
} }
b = tx.Bucket(logBucket)
err = b.Put(qs.createDeltaKeyFor(d.ID), bytes) err = b.Put(qs.createDeltaKeyFor(d.ID), bytes)
if err != nil { if err != nil {
return err return err
@ -229,6 +232,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error { func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error {
var entry IndexEntry var entry IndexEntry
b := tx.Bucket(spoBucket) b := tx.Bucket(spoBucket)
b.FillPercent = localFillPercent
data := b.Get(qs.createKeyFor(spo, q)) data := b.Get(qs.createKeyFor(spo, q))
if data != nil { if data != nil {
// We got something. // We got something.
@ -259,6 +263,7 @@ func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bo
continue continue
} }
b := tx.Bucket(bucketFor(index)) b := tx.Bucket(bucketFor(index))
b.FillPercent = localFillPercent
err = b.Put(qs.createKeyFor(index, q), jsonbytes) err = b.Put(qs.createKeyFor(index, q), jsonbytes)
if err != nil { if err != nil {
return err return err
@ -275,6 +280,7 @@ type ValueData struct {
func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error { func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error {
value := ValueData{name, amount} value := ValueData{name, amount}
b := tx.Bucket(nodeBucket) b := tx.Bucket(nodeBucket)
b.FillPercent = localFillPercent
key := qs.createValueKeyFor(name) key := qs.createValueKeyFor(name)
data := b.Get(key) data := b.Get(key)
@ -311,6 +317,7 @@ func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error {
return err return err
} }
b := tx.Bucket(metaBucket) b := tx.Bucket(metaBucket)
b.FillPercent = localFillPercent
werr := b.Put([]byte("size"), buf.Bytes()) werr := b.Put([]byte("size"), buf.Bytes())
if werr != nil { if werr != nil {
glog.Error("Couldn't write size!") glog.Error("Couldn't write size!")
@ -323,7 +330,6 @@ func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error {
glog.Errorf("Couldn't convert horizon!") glog.Errorf("Couldn't convert horizon!")
} }
b = tx.Bucket(metaBucket)
werr = b.Put([]byte("horizon"), buf.Bytes()) werr = b.Put([]byte("horizon"), buf.Bytes())
if werr != nil { if werr != nil {

View file

@ -121,6 +121,18 @@ func (d Options) StringKey(key string) (string, bool) {
return "", false return "", false
} }
func (d Options) BoolKey(key string) (bool, bool) {
if val, ok := d[key]; ok {
switch vv := val.(type) {
case bool:
return vv, true
default:
glog.Fatalln("Invalid", key, "parameter type from config.")
}
}
return false, false
}
var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load") var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load")
type BulkLoader interface { type BulkLoader interface {