From ce1cce5a01c8e599959b36bffe0e91f65f457cf5 Mon Sep 17 00:00:00 2001 From: "l.albertalli" Date: Fri, 6 Feb 2015 17:49:16 -0800 Subject: [PATCH 1/7] Added command line options to ignore duplicate quad in add or missing quad in delete --- graph/bolt/quadstore.go | 7 +++++++ graph/leveldb/quadstore.go | 21 +++++++++++++++++---- graph/memstore/quadstore.go | 12 ++++++++++-- graph/mongo/quadstore.go | 12 ++++++++++-- graph/quadstore.go | 6 ++++++ 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index ed11237..40d89f7 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -43,6 +43,7 @@ var ( } hashSize = sha1.Size localFillPercent = 0.7 + ) type Token struct { @@ -208,6 +209,12 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { for _, d := range deltas { err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { + if err == graph.ErrQuadExists && *graph.NoErrorDup{ + continue + } + if err == graph.ErrQuadNotExist && *graph.NoErrorDel{ + continue + } return err } delta := int64(1) diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go index d6b7611..78e5e88 100644 --- a/graph/leveldb/quadstore.go +++ b/graph/leveldb/quadstore.go @@ -49,6 +49,7 @@ var ( New: func() interface{} { return sha1.New() }, } hashSize = sha1.Size + ) type Token []byte @@ -195,6 +196,12 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { batch.Put(keyFor(d), bytes) err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { + if err == graph.ErrQuadExists && *graph.NoErrorDup{ + continue + } + if err == graph.ErrQuadNotExist && *graph.NoErrorDel{ + continue + } return err } delta := int64(1) @@ -243,6 +250,7 @@ func (qs *QuadStore) buildQuadWrite(batch *leveldb.Batch, q quad.Quad, id int64, } if err == nil { // We got something. + fmt.Printf("Got something") err = json.Unmarshal(data, &entry) if err != nil { return err @@ -250,12 +258,17 @@ func (qs *QuadStore) buildQuadWrite(batch *leveldb.Batch, q quad.Quad, id int64, } else { entry.Quad = q } - entry.History = append(entry.History, id) - if isAdd && len(entry.History)%2 == 0 { - glog.Error("Entry History is out of sync for", entry) - return errors.New("odd index history") + if isAdd && len(entry.History)%2 == 1 { + glog.Errorf("attempt to add existing quad %v: %#v", entry, q) + return graph.ErrQuadExists } + if !isAdd && len(entry.History)%2 == 0 { + glog.Error("attempt to delete non-existent quad %v: %#c", entry, q) + return graph.ErrQuadNotExist + } + + entry.History = append(entry.History, id) bytes, err := json.Marshal(entry) if err != nil { diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 6b11e4e..1d435a0 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -155,7 +155,11 @@ func (qs *QuadStore) indexOf(t quad.Quad) (int64, bool) { func (qs *QuadStore) AddDelta(d graph.Delta) error { if _, exists := qs.indexOf(d.Quad); exists { - return graph.ErrQuadExists + if *graph.NoErrorDup { + return nil + }else{ + return graph.ErrQuadExists + } } qid := qs.nextQuadID qs.log = append(qs.log, LogEntry{ @@ -194,7 +198,11 @@ func (qs *QuadStore) AddDelta(d graph.Delta) error { func (qs *QuadStore) RemoveDelta(d graph.Delta) error { prevQuadID, exists := qs.indexOf(d.Quad) if !exists { - return graph.ErrQuadNotExist + if *graph.NoErrorDel { + return nil + }else{ + return graph.ErrQuadNotExist + } } quadID := qs.nextQuadID diff --git a/graph/mongo/quadstore.go b/graph/mongo/quadstore.go index 25c7758..605a7cd 100644 --- a/graph/mongo/quadstore.go +++ b/graph/mongo/quadstore.go @@ -226,11 +226,19 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { switch d.Action { case graph.Add: if qs.checkValid(key) { - return graph.ErrQuadExists + if *graph.NoErrorDup { + continue + }else{ + return graph.ErrQuadExists + } } case graph.Delete: if !qs.checkValid(key) { - return graph.ErrQuadNotExist + if *graph.NoErrorDel { + continue + }else{ + return graph.ErrQuadNotExist + } } } } diff --git a/graph/quadstore.go b/graph/quadstore.go index 79f1df7..5408c94 100644 --- a/graph/quadstore.go +++ b/graph/quadstore.go @@ -23,6 +23,7 @@ package graph import ( "errors" + "flag" "github.com/barakmich/glog" "github.com/google/cayley/quad" @@ -193,3 +194,8 @@ func QuadStores() []string { } return t } + +var ( + NoErrorDup = flag.Bool("noerrdup", false, "Don't stop loading on duplicated key on add") + NoErrorDel = flag.Bool("noerrdel", false, "Don't stop loading on missing key on delete") +) From 68cd44b986fc584e40af1e73428c9da8069ed8b1 Mon Sep 17 00:00:00 2001 From: "l.albertalli" Date: Fri, 6 Feb 2015 17:52:09 -0800 Subject: [PATCH 2/7] Removed a debug line --- graph/leveldb/quadstore.go | 1 - 1 file changed, 1 deletion(-) diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go index 78e5e88..e0bff4c 100644 --- a/graph/leveldb/quadstore.go +++ b/graph/leveldb/quadstore.go @@ -250,7 +250,6 @@ func (qs *QuadStore) buildQuadWrite(batch *leveldb.Batch, q quad.Quad, id int64, } if err == nil { // We got something. - fmt.Printf("Got something") err = json.Unmarshal(data, &entry) if err != nil { return err From 6134bc8bdd0ad3732a69b1b6bbb40d55ebc5096f Mon Sep 17 00:00:00 2001 From: "l.albertalli" Date: Mon, 9 Feb 2015 18:43:26 -0800 Subject: [PATCH 3/7] Moved configuration to Quadwriter and added to config file --- graph/bolt/quadstore.go | 6 +++--- graph/leveldb/quadstore.go | 6 +++--- graph/memstore/quadstore.go | 20 +++++++++----------- graph/mongo/quadstore.go | 6 +++--- graph/quadstore.go | 8 +------- graph/quadwriter.go | 6 ++++++ writer/single.go | 29 +++++++++++++++++++++++------ 7 files changed, 48 insertions(+), 33 deletions(-) diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 40d89f7..112fd84 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -185,7 +185,7 @@ var ( metaBucket = []byte("meta") ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { oldSize := qs.size oldHorizon := qs.horizon err := qs.db.Update(func(tx *bolt.Tx) error { @@ -209,10 +209,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { for _, d := range deltas { err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { - if err == graph.ErrQuadExists && *graph.NoErrorDup{ + if err == graph.ErrQuadExists && ignoreDuplicate{ continue } - if err == graph.ErrQuadNotExist && *graph.NoErrorDel{ + if err == graph.ErrQuadNotExist && ignoreMissing{ continue } return err diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go index e0bff4c..f8fb2d4 100644 --- a/graph/leveldb/quadstore.go +++ b/graph/leveldb/quadstore.go @@ -181,7 +181,7 @@ var ( cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { batch := &leveldb.Batch{} resizeMap := make(map[string]int64) sizeChange := int64(0) @@ -196,10 +196,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { batch.Put(keyFor(d), bytes) err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { - if err == graph.ErrQuadExists && *graph.NoErrorDup{ + if err == graph.ErrQuadExists && ignoreDuplicate{ continue } - if err == graph.ErrQuadNotExist && *graph.NoErrorDel{ + if err == graph.ErrQuadNotExist && ignoreMissing{ continue } return err diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 1d435a0..5734255 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -100,14 +100,20 @@ func newQuadStore() *QuadStore { } } -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { for _, d := range deltas { var err error switch d.Action { case graph.Add: err = qs.AddDelta(d) + if err != nil && ignoreDuplicate{ + err = nil + } case graph.Delete: err = qs.RemoveDelta(d) + if err != nil && ignoreMissing{ + err = nil + } default: err = errors.New("memstore: invalid action") } @@ -155,11 +161,7 @@ func (qs *QuadStore) indexOf(t quad.Quad) (int64, bool) { func (qs *QuadStore) AddDelta(d graph.Delta) error { if _, exists := qs.indexOf(d.Quad); exists { - if *graph.NoErrorDup { - return nil - }else{ - return graph.ErrQuadExists - } + return graph.ErrQuadExists } qid := qs.nextQuadID qs.log = append(qs.log, LogEntry{ @@ -198,11 +200,7 @@ func (qs *QuadStore) AddDelta(d graph.Delta) error { func (qs *QuadStore) RemoveDelta(d graph.Delta) error { prevQuadID, exists := qs.indexOf(d.Quad) if !exists { - if *graph.NoErrorDel { - return nil - }else{ - return graph.ErrQuadNotExist - } + return graph.ErrQuadNotExist } quadID := qs.nextQuadID diff --git a/graph/mongo/quadstore.go b/graph/mongo/quadstore.go index 605a7cd..f7b4ef9 100644 --- a/graph/mongo/quadstore.go +++ b/graph/mongo/quadstore.go @@ -214,7 +214,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error { return err } -func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { qs.session.SetSafe(nil) ids := make(map[string]int) // Pre-check the existence condition. @@ -226,7 +226,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { switch d.Action { case graph.Add: if qs.checkValid(key) { - if *graph.NoErrorDup { + if ignoreDuplicate { continue }else{ return graph.ErrQuadExists @@ -234,7 +234,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { } case graph.Delete: if !qs.checkValid(key) { - if *graph.NoErrorDel { + if ignoreMissing { continue }else{ return graph.ErrQuadNotExist diff --git a/graph/quadstore.go b/graph/quadstore.go index 5408c94..2194d25 100644 --- a/graph/quadstore.go +++ b/graph/quadstore.go @@ -23,7 +23,6 @@ package graph import ( "errors" - "flag" "github.com/barakmich/glog" "github.com/google/cayley/quad" @@ -45,7 +44,7 @@ type Value interface{} type QuadStore interface { // The only way in is through building a transaction, which // is done by a replication strategy. - ApplyDeltas([]Delta) error + ApplyDeltas([]Delta, bool, bool) error // Given an opaque token, returns the quad for that token from the store. Quad(Value) quad.Quad @@ -194,8 +193,3 @@ func QuadStores() []string { } return t } - -var ( - NoErrorDup = flag.Bool("noerrdup", false, "Don't stop loading on duplicated key on add") - NoErrorDel = flag.Bool("noerrdel", false, "Don't stop loading on missing key on delete") -) diff --git a/graph/quadwriter.go b/graph/quadwriter.go index 5124d60..097b133 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -24,6 +24,7 @@ package graph import ( "errors" "time" + "flag" "github.com/google/cayley/quad" ) @@ -58,6 +59,11 @@ var ( ErrQuadNotExist = errors.New("quad does not exist") ) +var ( + IgnoreDup = flag.Bool("ignoredup", false, "Don't stop loading on duplicated key on add") + IgnoreMissing = flag.Bool("ignoremissing", false, "Don't stop loading on missing key on delete") +) + type QuadWriter interface { // Add a quad to the store. AddQuad(quad.Quad) error diff --git a/writer/single.go b/writer/single.go index d6770a5..96a0ea3 100644 --- a/writer/single.go +++ b/writer/single.go @@ -26,12 +26,28 @@ func init() { } type Single struct { - currentID graph.PrimaryKey - qs graph.QuadStore + currentID graph.PrimaryKey + qs graph.QuadStore + ignoreMissing bool + ignoreDuplicate bool } func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) { - return &Single{currentID: qs.Horizon(), qs: qs}, nil + ignoreMissing, imset := opts.BoolKey("ignore_missing") + if !imset { + ignoreMissing = *graph.IgnoreMissing + } + ignoreDuplicate, idset := opts.BoolKey("ignore_duplicate") + if !idset { + ignoreDuplicate = *graph.IgnoreDup + } + + return &Single{ + currentID: qs.Horizon(), + qs: qs, + ignoreMissing: ignoreMissing, + ignoreDuplicate: ignoreDuplicate, + }, nil } func (s *Single) AddQuad(q quad.Quad) error { @@ -42,7 +58,7 @@ func (s *Single) AddQuad(q quad.Quad) error { Action: graph.Add, Timestamp: time.Now(), } - return s.qs.ApplyDeltas(deltas) + return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing) } func (s *Single) AddQuadSet(set []quad.Quad) error { @@ -55,7 +71,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error { Timestamp: time.Now(), } } - return s.qs.ApplyDeltas(deltas) + + return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing) } func (s *Single) RemoveQuad(q quad.Quad) error { @@ -66,7 +83,7 @@ func (s *Single) RemoveQuad(q quad.Quad) error { Action: graph.Delete, Timestamp: time.Now(), } - return s.qs.ApplyDeltas(deltas) + return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing) } func (s *Single) Close() error { From a7b1eb74b21c62345c7a05e16afced05450b3bef Mon Sep 17 00:00:00 2001 From: "l.albertalli" Date: Mon, 9 Feb 2015 18:48:49 -0800 Subject: [PATCH 4/7] Updated the config file --- docs/Configuration.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/docs/Configuration.md b/docs/Configuration.md index 516b23d..e1ac677 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -123,3 +123,23 @@ Optionally disable syncing to disk per transaction. Nosync being true means much * Default: "cayley" The name of the database within MongoDB to connect to. Manages its own collections and indicies therein. + +## Per-Replication Options + +The `replication_options` object in the main configuration file contains any of these following options that change the behavior of the replication manager. + +### All + +#### **`ignore_missing`** + + * Type: Boolean + * Default: false + +Optionally ignore missing quad on delete. + +#### **`ignore_duplicate`** + + * Type: Boolean + * Default: false + +Optionally ignore duplicated quad on add. \ No newline at end of file From cca6d536238e55faff0eed8bfaf6fb0198d1cb3f Mon Sep 17 00:00:00 2001 From: "l.albertalli" Date: Mon, 9 Feb 2015 19:02:58 -0800 Subject: [PATCH 5/7] Modified mock iterator for the new interface of ApplyDeltas --- graph/iterator/mock_ts_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graph/iterator/mock_ts_test.go b/graph/iterator/mock_ts_test.go index 2a1407b..8923a69 100644 --- a/graph/iterator/mock_ts_test.go +++ b/graph/iterator/mock_ts_test.go @@ -35,7 +35,7 @@ func (qs *store) ValueOf(s string) graph.Value { return nil } -func (qs *store) ApplyDeltas([]graph.Delta) error { return nil } +func (qs *store) ApplyDeltas([]graph.Delta, bool, bool) error { return nil } func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} } From 50c3e5f93c4fc05e605afb9251f6578101b77e11 Mon Sep 17 00:00:00 2001 From: "l.albertalli" Date: Tue, 10 Feb 2015 10:48:02 -0800 Subject: [PATCH 6/7] Shortended function signature and changed flag priority --- graph/bolt/quadstore.go | 6 +++--- graph/leveldb/quadstore.go | 6 +++--- graph/memstore/quadstore.go | 6 +++--- graph/mongo/quadstore.go | 6 +++--- writer/single.go | 17 +++++++++++------ 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index 112fd84..bd5fa9e 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -185,7 +185,7 @@ var ( metaBucket = []byte("meta") ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss bool) error { oldSize := qs.size oldHorizon := qs.horizon err := qs.db.Update(func(tx *bolt.Tx) error { @@ -209,10 +209,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ign for _, d := range deltas { err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { - if err == graph.ErrQuadExists && ignoreDuplicate{ + if err == graph.ErrQuadExists && ignoreDup{ continue } - if err == graph.ErrQuadNotExist && ignoreMissing{ + if err == graph.ErrQuadNotExist && ignoreMiss{ continue } return err diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go index f8fb2d4..b20f62c 100644 --- a/graph/leveldb/quadstore.go +++ b/graph/leveldb/quadstore.go @@ -181,7 +181,7 @@ var ( cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss bool) error { batch := &leveldb.Batch{} resizeMap := make(map[string]int64) sizeChange := int64(0) @@ -196,10 +196,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ign batch.Put(keyFor(d), bytes) err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { - if err == graph.ErrQuadExists && ignoreDuplicate{ + if err == graph.ErrQuadExists && ignoreDup{ continue } - if err == graph.ErrQuadNotExist && ignoreMissing{ + if err == graph.ErrQuadNotExist && ignoreMiss{ continue } return err diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 5734255..52cb298 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -100,18 +100,18 @@ func newQuadStore() *QuadStore { } } -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDupl, ignoreMiss bool) error { for _, d := range deltas { var err error switch d.Action { case graph.Add: err = qs.AddDelta(d) - if err != nil && ignoreDuplicate{ + if err != nil && ignoreDupl{ err = nil } case graph.Delete: err = qs.RemoveDelta(d) - if err != nil && ignoreMissing{ + if err != nil && ignoreMiss{ err = nil } default: diff --git a/graph/mongo/quadstore.go b/graph/mongo/quadstore.go index f7b4ef9..cb9331a 100644 --- a/graph/mongo/quadstore.go +++ b/graph/mongo/quadstore.go @@ -214,7 +214,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error { return err } -func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDuplicate bool, ignoreMissing bool) error { +func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDup, ignoreMiss bool) error { qs.session.SetSafe(nil) ids := make(map[string]int) // Pre-check the existence condition. @@ -226,7 +226,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDuplicate bool, ignoreM switch d.Action { case graph.Add: if qs.checkValid(key) { - if ignoreDuplicate { + if ignoreDup { continue }else{ return graph.ErrQuadExists @@ -234,7 +234,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDuplicate bool, ignoreM } case graph.Delete: if !qs.checkValid(key) { - if ignoreMissing { + if ignoreMiss { continue }else{ return graph.ErrQuadNotExist diff --git a/writer/single.go b/writer/single.go index 96a0ea3..f0f7cb0 100644 --- a/writer/single.go +++ b/writer/single.go @@ -33,13 +33,18 @@ type Single struct { } func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) { - ignoreMissing, imset := opts.BoolKey("ignore_missing") - if !imset { - ignoreMissing = *graph.IgnoreMissing + var ignoreMissing, ignoreDuplicate bool + + if *graph.IgnoreMissing{ + ignoreMissing = true + }else{ + ignoreMissing,_ = opts.BoolKey("ignore_missing") } - ignoreDuplicate, idset := opts.BoolKey("ignore_duplicate") - if !idset { - ignoreDuplicate = *graph.IgnoreDup + + if *graph.IgnoreDup{ + ignoreDuplicate = true + }else{ + ignoreDuplicate,_ = opts.BoolKey("ignore_duplicate") } return &Single{ From 472d86223e90f1e110efb60023a9192bb8f8d240 Mon Sep 17 00:00:00 2001 From: "l.albertalli" Date: Tue, 10 Feb 2015 18:17:54 -0800 Subject: [PATCH 7/7] Changed the ApplyDeltas signature --- graph/bolt/quadstore.go | 6 +++--- graph/iterator/mock_ts_test.go | 2 +- graph/leveldb/quadstore.go | 6 +++--- graph/memstore/quadstore.go | 6 +++--- graph/mongo/quadstore.go | 6 +++--- graph/quadstore.go | 2 +- graph/quadwriter.go | 4 ++++ writer/single.go | 15 ++++++++------- 8 files changed, 26 insertions(+), 21 deletions(-) diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index bd5fa9e..265f948 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -185,7 +185,7 @@ var ( metaBucket = []byte("meta") ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss bool) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { oldSize := qs.size oldHorizon := qs.horizon err := qs.db.Update(func(tx *bolt.Tx) error { @@ -209,10 +209,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss boo for _, d := range deltas { err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { - if err == graph.ErrQuadExists && ignoreDup{ + if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup{ continue } - if err == graph.ErrQuadNotExist && ignoreMiss{ + if err == graph.ErrQuadNotExist && ignoreOpts.IgnoreMissing{ continue } return err diff --git a/graph/iterator/mock_ts_test.go b/graph/iterator/mock_ts_test.go index 8923a69..6e2b00d 100644 --- a/graph/iterator/mock_ts_test.go +++ b/graph/iterator/mock_ts_test.go @@ -35,7 +35,7 @@ func (qs *store) ValueOf(s string) graph.Value { return nil } -func (qs *store) ApplyDeltas([]graph.Delta, bool, bool) error { return nil } +func (qs *store) ApplyDeltas([]graph.Delta, graph.IgnoreOpts) error { return nil } func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} } diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go index b20f62c..0a7fe99 100644 --- a/graph/leveldb/quadstore.go +++ b/graph/leveldb/quadstore.go @@ -181,7 +181,7 @@ var ( cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss bool) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { batch := &leveldb.Batch{} resizeMap := make(map[string]int64) sizeChange := int64(0) @@ -196,10 +196,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss boo batch.Put(keyFor(d), bytes) err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { - if err == graph.ErrQuadExists && ignoreDup{ + if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup { continue } - if err == graph.ErrQuadNotExist && ignoreMiss{ + if err == graph.ErrQuadNotExist && ignoreOpts.IgnoreMissing { continue } return err diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 52cb298..4a62ec5 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -100,18 +100,18 @@ func newQuadStore() *QuadStore { } } -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDupl, ignoreMiss bool) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { for _, d := range deltas { var err error switch d.Action { case graph.Add: err = qs.AddDelta(d) - if err != nil && ignoreDupl{ + if err != nil && ignoreOpts.IgnoreDup{ err = nil } case graph.Delete: err = qs.RemoveDelta(d) - if err != nil && ignoreMiss{ + if err != nil && ignoreOpts.IgnoreMissing{ err = nil } default: diff --git a/graph/mongo/quadstore.go b/graph/mongo/quadstore.go index cb9331a..98a5cc3 100644 --- a/graph/mongo/quadstore.go +++ b/graph/mongo/quadstore.go @@ -214,7 +214,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error { return err } -func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDup, ignoreMiss bool) error { +func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) error { qs.session.SetSafe(nil) ids := make(map[string]int) // Pre-check the existence condition. @@ -226,7 +226,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDup, ignoreMiss bool) e switch d.Action { case graph.Add: if qs.checkValid(key) { - if ignoreDup { + if ignoreOpts.IgnoreDup { continue }else{ return graph.ErrQuadExists @@ -234,7 +234,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDup, ignoreMiss bool) e } case graph.Delete: if !qs.checkValid(key) { - if ignoreMiss { + if ignoreOpts.IgnoreMissing { continue }else{ return graph.ErrQuadNotExist diff --git a/graph/quadstore.go b/graph/quadstore.go index 2194d25..cb7eca0 100644 --- a/graph/quadstore.go +++ b/graph/quadstore.go @@ -44,7 +44,7 @@ type Value interface{} type QuadStore interface { // The only way in is through building a transaction, which // is done by a replication strategy. - ApplyDeltas([]Delta, bool, bool) error + ApplyDeltas([]Delta, IgnoreOpts) error // Given an opaque token, returns the quad for that token from the store. Quad(Value) quad.Quad diff --git a/graph/quadwriter.go b/graph/quadwriter.go index 097b133..fdf65f3 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -49,6 +49,10 @@ type Handle struct { QuadWriter QuadWriter } +type IgnoreOpts struct { + IgnoreDup, IgnoreMissing bool +} + func (h *Handle) Close() { h.QuadStore.Close() h.QuadWriter.Close() diff --git a/writer/single.go b/writer/single.go index f0f7cb0..4c386b8 100644 --- a/writer/single.go +++ b/writer/single.go @@ -28,8 +28,7 @@ func init() { type Single struct { currentID graph.PrimaryKey qs graph.QuadStore - ignoreMissing bool - ignoreDuplicate bool + ignoreOpts graph.IgnoreOpts } func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) { @@ -50,8 +49,10 @@ func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWri return &Single{ currentID: qs.Horizon(), qs: qs, - ignoreMissing: ignoreMissing, - ignoreDuplicate: ignoreDuplicate, + ignoreOpts: graph.IgnoreOpts{ + IgnoreDup: ignoreDuplicate, + IgnoreMissing:ignoreMissing, + }, }, nil } @@ -63,7 +64,7 @@ func (s *Single) AddQuad(q quad.Quad) error { Action: graph.Add, Timestamp: time.Now(), } - return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing) + return s.qs.ApplyDeltas(deltas, s.ignoreOpts) } func (s *Single) AddQuadSet(set []quad.Quad) error { @@ -77,7 +78,7 @@ func (s *Single) AddQuadSet(set []quad.Quad) error { } } - return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing) + return s.qs.ApplyDeltas(deltas, s.ignoreOpts) } func (s *Single) RemoveQuad(q quad.Quad) error { @@ -88,7 +89,7 @@ func (s *Single) RemoveQuad(q quad.Quad) error { Action: graph.Delete, Timestamp: time.Now(), } - return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing) + return s.qs.ApplyDeltas(deltas, s.ignoreOpts) } func (s *Single) Close() error {