Moved configuration to Quadwriter and added to config file

This commit is contained in:
l.albertalli 2015-02-09 18:43:26 -08:00
parent 68cd44b986
commit 6134bc8bdd
7 changed files with 48 additions and 33 deletions

View file

@ -185,7 +185,7 @@ var (
metaBucket = []byte("meta") metaBucket = []byte("meta")
) )
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error {
oldSize := qs.size oldSize := qs.size
oldHorizon := qs.horizon oldHorizon := qs.horizon
err := qs.db.Update(func(tx *bolt.Tx) error { err := qs.db.Update(func(tx *bolt.Tx) error {
@ -209,10 +209,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
for _, d := range deltas { for _, d := range deltas {
err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add) err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add)
if err != nil { if err != nil {
if err == graph.ErrQuadExists && *graph.NoErrorDup{ if err == graph.ErrQuadExists && ignoreDuplicate{
continue continue
} }
if err == graph.ErrQuadNotExist && *graph.NoErrorDel{ if err == graph.ErrQuadNotExist && ignoreMissing{
continue continue
} }
return err return err

View file

@ -181,7 +181,7 @@ var (
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
) )
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error {
batch := &leveldb.Batch{} batch := &leveldb.Batch{}
resizeMap := make(map[string]int64) resizeMap := make(map[string]int64)
sizeChange := int64(0) sizeChange := int64(0)
@ -196,10 +196,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
batch.Put(keyFor(d), bytes) batch.Put(keyFor(d), bytes)
err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add) err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add)
if err != nil { if err != nil {
if err == graph.ErrQuadExists && *graph.NoErrorDup{ if err == graph.ErrQuadExists && ignoreDuplicate{
continue continue
} }
if err == graph.ErrQuadNotExist && *graph.NoErrorDel{ if err == graph.ErrQuadNotExist && ignoreMissing{
continue continue
} }
return err return err

View file

@ -100,14 +100,20 @@ func newQuadStore() *QuadStore {
} }
} }
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error {
for _, d := range deltas { for _, d := range deltas {
var err error var err error
switch d.Action { switch d.Action {
case graph.Add: case graph.Add:
err = qs.AddDelta(d) err = qs.AddDelta(d)
if err != nil && ignoreDuplicate{
err = nil
}
case graph.Delete: case graph.Delete:
err = qs.RemoveDelta(d) err = qs.RemoveDelta(d)
if err != nil && ignoreMissing{
err = nil
}
default: default:
err = errors.New("memstore: invalid action") err = errors.New("memstore: invalid action")
} }
@ -155,11 +161,7 @@ func (qs *QuadStore) indexOf(t quad.Quad) (int64, bool) {
func (qs *QuadStore) AddDelta(d graph.Delta) error { func (qs *QuadStore) AddDelta(d graph.Delta) error {
if _, exists := qs.indexOf(d.Quad); exists { if _, exists := qs.indexOf(d.Quad); exists {
if *graph.NoErrorDup { return graph.ErrQuadExists
return nil
}else{
return graph.ErrQuadExists
}
} }
qid := qs.nextQuadID qid := qs.nextQuadID
qs.log = append(qs.log, LogEntry{ qs.log = append(qs.log, LogEntry{
@ -198,11 +200,7 @@ func (qs *QuadStore) AddDelta(d graph.Delta) error {
func (qs *QuadStore) RemoveDelta(d graph.Delta) error { func (qs *QuadStore) RemoveDelta(d graph.Delta) error {
prevQuadID, exists := qs.indexOf(d.Quad) prevQuadID, exists := qs.indexOf(d.Quad)
if !exists { if !exists {
if *graph.NoErrorDel { return graph.ErrQuadNotExist
return nil
}else{
return graph.ErrQuadNotExist
}
} }
quadID := qs.nextQuadID quadID := qs.nextQuadID

View file

@ -214,7 +214,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error {
return err return err
} }
func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error {
qs.session.SetSafe(nil) qs.session.SetSafe(nil)
ids := make(map[string]int) ids := make(map[string]int)
// Pre-check the existence condition. // Pre-check the existence condition.
@ -226,7 +226,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error {
switch d.Action { switch d.Action {
case graph.Add: case graph.Add:
if qs.checkValid(key) { if qs.checkValid(key) {
if *graph.NoErrorDup { if ignoreDuplicate {
continue continue
}else{ }else{
return graph.ErrQuadExists return graph.ErrQuadExists
@ -234,7 +234,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error {
} }
case graph.Delete: case graph.Delete:
if !qs.checkValid(key) { if !qs.checkValid(key) {
if *graph.NoErrorDel { if ignoreMissing {
continue continue
}else{ }else{
return graph.ErrQuadNotExist return graph.ErrQuadNotExist

View file

@ -23,7 +23,6 @@ package graph
import ( import (
"errors" "errors"
"flag"
"github.com/barakmich/glog" "github.com/barakmich/glog"
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
@ -45,7 +44,7 @@ type Value interface{}
type QuadStore interface { type QuadStore interface {
// The only way in is through building a transaction, which // The only way in is through building a transaction, which
// is done by a replication strategy. // is done by a replication strategy.
ApplyDeltas([]Delta) error ApplyDeltas([]Delta, bool, bool) error
// Given an opaque token, returns the quad for that token from the store. // Given an opaque token, returns the quad for that token from the store.
Quad(Value) quad.Quad Quad(Value) quad.Quad
@ -194,8 +193,3 @@ func QuadStores() []string {
} }
return t return t
} }
var (
NoErrorDup = flag.Bool("noerrdup", false, "Don't stop loading on duplicated key on add")
NoErrorDel = flag.Bool("noerrdel", false, "Don't stop loading on missing key on delete")
)

View file

@ -24,6 +24,7 @@ package graph
import ( import (
"errors" "errors"
"time" "time"
"flag"
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
) )
@ -58,6 +59,11 @@ var (
ErrQuadNotExist = errors.New("quad does not exist") ErrQuadNotExist = errors.New("quad does not exist")
) )
var (
IgnoreDup = flag.Bool("ignoredup", false, "Don't stop loading on duplicated key on add")
IgnoreMissing = flag.Bool("ignoremissing", false, "Don't stop loading on missing key on delete")
)
type QuadWriter interface { type QuadWriter interface {
// Add a quad to the store. // Add a quad to the store.
AddQuad(quad.Quad) error AddQuad(quad.Quad) error

View file

@ -26,12 +26,28 @@ func init() {
} }
type Single struct { type Single struct {
currentID graph.PrimaryKey currentID graph.PrimaryKey
qs graph.QuadStore qs graph.QuadStore
ignoreMissing bool
ignoreDuplicate bool
} }
func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) { func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) {
return &Single{currentID: qs.Horizon(), qs: qs}, nil ignoreMissing, imset := opts.BoolKey("ignore_missing")
if !imset {
ignoreMissing = *graph.IgnoreMissing
}
ignoreDuplicate, idset := opts.BoolKey("ignore_duplicate")
if !idset {
ignoreDuplicate = *graph.IgnoreDup
}
return &Single{
currentID: qs.Horizon(),
qs: qs,
ignoreMissing: ignoreMissing,
ignoreDuplicate: ignoreDuplicate,
}, nil
} }
func (s *Single) AddQuad(q quad.Quad) error { func (s *Single) AddQuad(q quad.Quad) error {
@ -42,7 +58,7 @@ func (s *Single) AddQuad(q quad.Quad) error {
Action: graph.Add, Action: graph.Add,
Timestamp: time.Now(), Timestamp: time.Now(),
} }
return s.qs.ApplyDeltas(deltas) return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing)
} }
func (s *Single) AddQuadSet(set []quad.Quad) error { func (s *Single) AddQuadSet(set []quad.Quad) error {
@ -55,7 +71,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
Timestamp: time.Now(), Timestamp: time.Now(),
} }
} }
return s.qs.ApplyDeltas(deltas)
return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing)
} }
func (s *Single) RemoveQuad(q quad.Quad) error { func (s *Single) RemoveQuad(q quad.Quad) error {
@ -66,7 +83,7 @@ func (s *Single) RemoveQuad(q quad.Quad) error {
Action: graph.Delete, Action: graph.Delete,
Timestamp: time.Now(), Timestamp: time.Now(),
} }
return s.qs.ApplyDeltas(deltas) return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing)
} }
func (s *Single) Close() error { func (s *Single) Close() error {