From 6134bc8bdd0ad3732a69b1b6bbb40d55ebc5096f Mon Sep 17 00:00:00 2001 From: "l.albertalli" Date: Mon, 9 Feb 2015 18:43:26 -0800 Subject: [PATCH] Moved configuration to Quadwriter and added to config file --- graph/bolt/quadstore.go | 6 +++--- graph/leveldb/quadstore.go | 6 +++--- graph/memstore/quadstore.go | 20 +++++++++----------- graph/mongo/quadstore.go | 6 +++--- graph/quadstore.go | 8 +------- graph/quadwriter.go | 6 ++++++ writer/single.go | 29 +++++++++++++++++++++++------ 7 files changed, 48 insertions(+), 33 deletions(-) diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 40d89f7..112fd84 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -185,7 +185,7 @@ var ( metaBucket = []byte("meta") ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { oldSize := qs.size oldHorizon := qs.horizon err := qs.db.Update(func(tx *bolt.Tx) error { @@ -209,10 +209,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { for _, d := range deltas { err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { - if err == graph.ErrQuadExists && *graph.NoErrorDup{ + if err == graph.ErrQuadExists && ignoreDuplicate{ continue } - if err == graph.ErrQuadNotExist && *graph.NoErrorDel{ + if err == graph.ErrQuadNotExist && ignoreMissing{ continue } return err diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go index e0bff4c..f8fb2d4 100644 --- a/graph/leveldb/quadstore.go +++ b/graph/leveldb/quadstore.go @@ -181,7 +181,7 @@ var ( cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { batch := &leveldb.Batch{} resizeMap := make(map[string]int64) sizeChange := int64(0) @@ -196,10 +196,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { batch.Put(keyFor(d), bytes) err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { - if err == graph.ErrQuadExists && *graph.NoErrorDup{ + if err == graph.ErrQuadExists && ignoreDuplicate{ continue } - if err == graph.ErrQuadNotExist && *graph.NoErrorDel{ + if err == graph.ErrQuadNotExist && ignoreMissing{ continue } return err diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 1d435a0..5734255 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -100,14 +100,20 @@ func newQuadStore() *QuadStore { } } -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { for _, d := range deltas { var err error switch d.Action { case graph.Add: err = qs.AddDelta(d) + if err != nil && ignoreDuplicate{ + err = nil + } case graph.Delete: err = qs.RemoveDelta(d) + if err != nil && ignoreMissing{ + err = nil + } default: err = errors.New("memstore: invalid action") } @@ -155,11 +161,7 @@ func (qs *QuadStore) indexOf(t quad.Quad) (int64, bool) { func (qs *QuadStore) AddDelta(d graph.Delta) error { if _, exists := qs.indexOf(d.Quad); exists { - if *graph.NoErrorDup { - return nil - }else{ - return graph.ErrQuadExists - } + return graph.ErrQuadExists } qid := qs.nextQuadID qs.log = append(qs.log, LogEntry{ @@ -198,11 +200,7 @@ func (qs *QuadStore) AddDelta(d graph.Delta) error { func (qs *QuadStore) RemoveDelta(d graph.Delta) error { prevQuadID, exists := qs.indexOf(d.Quad) if !exists { - if *graph.NoErrorDel { - return nil - }else{ - return graph.ErrQuadNotExist - } + return graph.ErrQuadNotExist } quadID := qs.nextQuadID diff --git a/graph/mongo/quadstore.go b/graph/mongo/quadstore.go index 605a7cd..f7b4ef9 100644 --- a/graph/mongo/quadstore.go +++ b/graph/mongo/quadstore.go @@ -214,7 +214,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error { return err } -func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { qs.session.SetSafe(nil) ids := make(map[string]int) // Pre-check the existence condition. @@ -226,7 +226,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { switch d.Action { case graph.Add: if qs.checkValid(key) { - if *graph.NoErrorDup { + if ignoreDuplicate { continue }else{ return graph.ErrQuadExists @@ -234,7 +234,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { } case graph.Delete: if !qs.checkValid(key) { - if *graph.NoErrorDel { + if ignoreMissing { continue }else{ return graph.ErrQuadNotExist diff --git a/graph/quadstore.go b/graph/quadstore.go index 5408c94..2194d25 100644 --- a/graph/quadstore.go +++ b/graph/quadstore.go @@ -23,7 +23,6 @@ package graph import ( "errors" - "flag" "github.com/barakmich/glog" "github.com/google/cayley/quad" @@ -45,7 +44,7 @@ type Value interface{} type QuadStore interface { // The only way in is through building a transaction, which // is done by a replication strategy. - ApplyDeltas([]Delta) error + ApplyDeltas([]Delta, bool, bool) error // Given an opaque token, returns the quad for that token from the store. Quad(Value) quad.Quad @@ -194,8 +193,3 @@ func QuadStores() []string { } return t } - -var ( - NoErrorDup = flag.Bool("noerrdup", false, "Don't stop loading on duplicated key on add") - NoErrorDel = flag.Bool("noerrdel", false, "Don't stop loading on missing key on delete") -) diff --git a/graph/quadwriter.go b/graph/quadwriter.go index 5124d60..097b133 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -24,6 +24,7 @@ package graph import ( "errors" "time" + "flag" "github.com/google/cayley/quad" ) @@ -58,6 +59,11 @@ var ( ErrQuadNotExist = errors.New("quad does not exist") ) +var ( + IgnoreDup = flag.Bool("ignoredup", false, "Don't stop loading on duplicated key on add") + IgnoreMissing = flag.Bool("ignoremissing", false, "Don't stop loading on missing key on delete") +) + type QuadWriter interface { // Add a quad to the store. AddQuad(quad.Quad) error diff --git a/writer/single.go b/writer/single.go index d6770a5..96a0ea3 100644 --- a/writer/single.go +++ b/writer/single.go @@ -26,12 +26,28 @@ func init() { } type Single struct { - currentID graph.PrimaryKey - qs graph.QuadStore + currentID graph.PrimaryKey + qs graph.QuadStore + ignoreMissing bool + ignoreDuplicate bool } func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) { - return &Single{currentID: qs.Horizon(), qs: qs}, nil + ignoreMissing, imset := opts.BoolKey("ignore_missing") + if !imset { + ignoreMissing = *graph.IgnoreMissing + } + ignoreDuplicate, idset := opts.BoolKey("ignore_duplicate") + if !idset { + ignoreDuplicate = *graph.IgnoreDup + } + + return &Single{ + currentID: qs.Horizon(), + qs: qs, + ignoreMissing: ignoreMissing, + ignoreDuplicate: ignoreDuplicate, + }, nil } func (s *Single) AddQuad(q quad.Quad) error { @@ -42,7 +58,7 @@ func (s *Single) AddQuad(q quad.Quad) error { Action: graph.Add, Timestamp: time.Now(), } - return s.qs.ApplyDeltas(deltas) + return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing) } func (s *Single) AddQuadSet(set []quad.Quad) error { @@ -55,7 +71,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error { Timestamp: time.Now(), } } - return s.qs.ApplyDeltas(deltas) + + return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing) } func (s *Single) RemoveQuad(q quad.Quad) error { @@ -66,7 +83,7 @@ func (s *Single) RemoveQuad(q quad.Quad) error { Action: graph.Delete, Timestamp: time.Now(), } - return s.qs.ApplyDeltas(deltas) + return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing) } func (s *Single) Close() error {