save horizon with transactions

This commit is contained in:
Barak Michener 2014-08-11 17:13:49 -04:00
parent 3ceb19ca6c
commit 1099969591
2 changed files with 38 additions and 39 deletions

View file

@ -19,7 +19,6 @@ install:
- go get github.com/syndtr/goleveldb/leveldb/opt - go get github.com/syndtr/goleveldb/leveldb/opt
- go get github.com/syndtr/goleveldb/leveldb/util - go get github.com/syndtr/goleveldb/leveldb/util
- go get github.com/boltdb/bolt - go get github.com/boltdb/bolt
- go get github.com/boltdb/coalescer
- go get gopkg.in/mgo.v2 - go get gopkg.in/mgo.v2
- go get gopkg.in/mgo.v2/bson - go get gopkg.in/mgo.v2/bson

View file

@ -21,11 +21,9 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"hash" "hash"
"time"
"github.com/barakmich/glog" "github.com/barakmich/glog"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/boltdb/coalescer"
"github.com/google/cayley/graph" "github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator" "github.com/google/cayley/graph/iterator"
@ -166,13 +164,10 @@ var nodeBucket = []byte("node")
var metaBucket = []byte("meta") var metaBucket = []byte("meta")
func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error { func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error {
var old_size = qs.size
var old_horizon = qs.horizon
var size_change int64 var size_change int64
var new_horizon int64 err := qs.db.Update(func(tx *bolt.Tx) error {
c, err := coalescer.New(qs.db, 200, 100*time.Millisecond)
if err != nil {
return err
}
err = c.Update(func(tx *bolt.Tx) error {
var b *bolt.Bucket var b *bolt.Bucket
resizeMap := make(map[string]int64) resizeMap := make(map[string]int64)
size_change = int64(0) size_change = int64(0)
@ -201,7 +196,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error {
resizeMap[d.Quad.Label] += delta resizeMap[d.Quad.Label] += delta
} }
size_change += delta size_change += delta
new_horizon = d.ID qs.horizon = d.ID
} }
for k, v := range resizeMap { for k, v := range resizeMap {
if v != 0 { if v != 0 {
@ -211,15 +206,16 @@ func (qs *QuadStore) ApplyDeltas(deltas []*graph.Delta) error {
} }
} }
} }
return nil qs.size += size_change
return qs.WriteHorizonAndSize(tx)
}) })
if err != nil { if err != nil {
glog.Error("Couldn't write to DB for Delta set. Error: ", err) glog.Error("Couldn't write to DB for Delta set. Error: ", err)
qs.horizon = old_horizon
qs.size = old_size
return err return err
} }
qs.size += size_change
qs.horizon = new_horizon
return nil return nil
} }
@ -302,19 +298,18 @@ func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) er
return err return err
} }
func (qs *QuadStore) Close() { func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error {
qs.db.Update(func(tx *bolt.Tx) error {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, qs.size) err := binary.Write(buf, binary.LittleEndian, qs.size)
if err == nil { if err == nil {
b := tx.Bucket(metaBucket) b := tx.Bucket(metaBucket)
werr := b.Put([]byte("size"), buf.Bytes()) werr := b.Put([]byte("size"), buf.Bytes())
if werr != nil { if werr != nil {
glog.Error("Couldn't write size before closing!") glog.Error("Couldn't write size!")
return werr return werr
} }
} else { } else {
glog.Errorf("Couldn't convert size before closing!") glog.Errorf("Couldn't convert size!")
return err return err
} }
buf.Reset() buf.Reset()
@ -323,13 +318,18 @@ func (qs *QuadStore) Close() {
b := tx.Bucket(metaBucket) b := tx.Bucket(metaBucket)
werr := b.Put([]byte("horizon"), buf.Bytes()) werr := b.Put([]byte("horizon"), buf.Bytes())
if werr != nil { if werr != nil {
glog.Error("Couldn't write horizon before closing!") glog.Error("Couldn't write horizon!")
return werr return werr
} }
} else { } else {
glog.Errorf("Couldn't convert horizon before closing!") glog.Errorf("Couldn't convert horizon!")
} }
return err return err
}
func (qs *QuadStore) Close() {
qs.db.Update(func(tx *bolt.Tx) error {
return qs.WriteHorizonAndSize(tx)
}) })
qs.db.Close() qs.db.Close()
qs.open = false qs.open = false