diff --git a/.travis.yml b/.travis.yml index a3c6dbb..7c5309a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,6 @@ install: - go get github.com/syndtr/goleveldb/leveldb/opt - go get github.com/syndtr/goleveldb/leveldb/util - go get github.com/boltdb/bolt - - go get github.com/boltdb/coalescer - go get gopkg.in/mgo.v2 - go get gopkg.in/mgo.v2/bson diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 6e2d401..5cfd8a3 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -21,11 +21,9 @@ import ( "encoding/json" "fmt" "hash" - "time" "github.com/barakmich/glog" "github.com/boltdb/bolt" - "github.com/boltdb/coalescer" "github.com/google/cayley/graph" "github.com/google/cayley/graph/iterator" @@ -166,13 +164,10 @@ var nodeBucket = []byte("node") var metaBucket = []byte("meta") func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { + var old_size = qs.size + var old_horizon = qs.horizon var size_change int64 - var new_horizon int64 - c, err := coalescer.New(qs.db, 200, 100*time.Millisecond) - if err != nil { - return err - } - err = c.Update(func(tx *bolt.Tx) error { + err := qs.db.Update(func(tx *bolt.Tx) error { var b *bolt.Bucket resizeMap := make(map[string]int64) size_change = int64(0) @@ -201,7 +196,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { resizeMap[d.Quad.Label] += delta } size_change += delta - new_horizon = d.ID + qs.horizon = d.ID } for k, v := range resizeMap { if v != 0 { @@ -211,15 +206,16 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { } } } - return nil + qs.size += size_change + return qs.WriteHorizonAndSize(tx) }) if err != nil { glog.Error("Couldn't write to DB for Delta set. Error: ", err) + qs.horizon = old_horizon + qs.size = old_size return err } - qs.size += size_change - qs.horizon = new_horizon return nil } @@ -302,34 +298,38 @@ func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) er return err } +func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, qs.size) + if err == nil { + b := tx.Bucket(metaBucket) + werr := b.Put([]byte("size"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write size!") + return werr + } + } else { + glog.Errorf("Couldn't convert size!") + return err + } + buf.Reset() + err = binary.Write(buf, binary.LittleEndian, qs.horizon) + if err == nil { + b := tx.Bucket(metaBucket) + werr := b.Put([]byte("horizon"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write horizon!") + return werr + } + } else { + glog.Errorf("Couldn't convert horizon!") + } + return err +} + func (qs *QuadStore) Close() { qs.db.Update(func(tx *bolt.Tx) error { - buf := new(bytes.Buffer) - err := binary.Write(buf, binary.LittleEndian, qs.size) - if err == nil { - b := tx.Bucket(metaBucket) - werr := b.Put([]byte("size"), buf.Bytes()) - if werr != nil { - glog.Error("Couldn't write size before closing!") - return werr - } - } else { - glog.Errorf("Couldn't convert size before closing!") - return err - } - buf.Reset() - err = binary.Write(buf, binary.LittleEndian, qs.horizon) - if err == nil { - b := tx.Bucket(metaBucket) - werr := b.Put([]byte("horizon"), buf.Bytes()) - if werr != nil { - glog.Error("Couldn't write horizon before closing!") - return werr - } - } else { - glog.Errorf("Couldn't convert horizon before closing!") - } - return err + return qs.WriteHorizonAndSize(tx) }) qs.db.Close() qs.open = false