diff --git a/docs/Configuration.md b/docs/Configuration.md index 516b23d..e1ac677 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -123,3 +123,23 @@ Optionally disable syncing to disk per transaction. Nosync being true means much * Default: "cayley" The name of the database within MongoDB to connect to. Manages its own collections and indicies therein. + +## Per-Replication Options + +The `replication_options` object in the main configuration file contains any of these following options that change the behavior of the replication manager. + +### All + +#### **`ignore_missing`** + + * Type: Boolean + * Default: false + +Optionally ignore missing quad on delete. + +#### **`ignore_duplicate`** + + * Type: Boolean + * Default: false + +Optionally ignore duplicated quad on add. \ No newline at end of file diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go index ed11237..265f948 100644 --- a/graph/bolt/quadstore.go +++ b/graph/bolt/quadstore.go @@ -43,6 +43,7 @@ var ( } hashSize = sha1.Size localFillPercent = 0.7 + ) type Token struct { @@ -184,7 +185,7 @@ var ( metaBucket = []byte("meta") ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { oldSize := qs.size oldHorizon := qs.horizon err := qs.db.Update(func(tx *bolt.Tx) error { @@ -208,6 +209,12 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { for _, d := range deltas { err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { + if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup{ + continue + } + if err == graph.ErrQuadNotExist && ignoreOpts.IgnoreMissing{ + continue + } return err } delta := int64(1) diff --git a/graph/iterator/mock_ts_test.go b/graph/iterator/mock_ts_test.go index 2a1407b..6e2b00d 100644 --- a/graph/iterator/mock_ts_test.go +++ b/graph/iterator/mock_ts_test.go @@ -35,7 +35,7 @@ func (qs *store) ValueOf(s string) graph.Value { return nil } -func (qs *store) ApplyDeltas([]graph.Delta) error { return nil } +func (qs *store) ApplyDeltas([]graph.Delta, graph.IgnoreOpts) error { return nil } func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} } diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go index d6b7611..0a7fe99 100644 --- a/graph/leveldb/quadstore.go +++ b/graph/leveldb/quadstore.go @@ -49,6 +49,7 @@ var ( New: func() interface{} { return sha1.New() }, } hashSize = sha1.Size + ) type Token []byte @@ -180,7 +181,7 @@ var ( cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} ) -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { batch := &leveldb.Batch{} resizeMap := make(map[string]int64) sizeChange := int64(0) @@ -195,6 +196,12 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { batch.Put(keyFor(d), bytes) err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add) if err != nil { + if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup { + continue + } + if err == graph.ErrQuadNotExist && ignoreOpts.IgnoreMissing { + continue + } return err } delta := int64(1) @@ -250,12 +257,17 @@ func (qs *QuadStore) buildQuadWrite(batch *leveldb.Batch, q quad.Quad, id int64, } else { entry.Quad = q } - entry.History = append(entry.History, id) - if isAdd && len(entry.History)%2 == 0 { - glog.Error("Entry History is out of sync for", entry) - return errors.New("odd index history") + if isAdd && len(entry.History)%2 == 1 { + glog.Errorf("attempt to add existing quad %v: %#v", entry, q) + return graph.ErrQuadExists } + if !isAdd && len(entry.History)%2 == 0 { + glog.Error("attempt to delete non-existent quad %v: %#c", entry, q) + return graph.ErrQuadNotExist + } + + entry.History = append(entry.History, id) bytes, err := json.Marshal(entry) if err != nil { diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 6b11e4e..4a62ec5 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -100,14 +100,20 @@ func newQuadStore() *QuadStore { } } -func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { for _, d := range deltas { var err error switch d.Action { case graph.Add: err = qs.AddDelta(d) + if err != nil && ignoreOpts.IgnoreDup{ + err = nil + } case graph.Delete: err = qs.RemoveDelta(d) + if err != nil && ignoreOpts.IgnoreMissing{ + err = nil + } default: err = errors.New("memstore: invalid action") } diff --git a/graph/mongo/quadstore.go b/graph/mongo/quadstore.go index 25c7758..98a5cc3 100644 --- a/graph/mongo/quadstore.go +++ b/graph/mongo/quadstore.go @@ -214,7 +214,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error { return err } -func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { +func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) error { qs.session.SetSafe(nil) ids := make(map[string]int) // Pre-check the existence condition. @@ -226,11 +226,19 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { switch d.Action { case graph.Add: if qs.checkValid(key) { - return graph.ErrQuadExists + if ignoreOpts.IgnoreDup { + continue + }else{ + return graph.ErrQuadExists + } } case graph.Delete: if !qs.checkValid(key) { - return graph.ErrQuadNotExist + if ignoreOpts.IgnoreMissing { + continue + }else{ + return graph.ErrQuadNotExist + } } } } diff --git a/graph/quadstore.go b/graph/quadstore.go index 79f1df7..cb7eca0 100644 --- a/graph/quadstore.go +++ b/graph/quadstore.go @@ -44,7 +44,7 @@ type Value interface{} type QuadStore interface { // The only way in is through building a transaction, which // is done by a replication strategy. - ApplyDeltas([]Delta) error + ApplyDeltas([]Delta, IgnoreOpts) error // Given an opaque token, returns the quad for that token from the store. Quad(Value) quad.Quad diff --git a/graph/quadwriter.go b/graph/quadwriter.go index 5124d60..fdf65f3 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -24,6 +24,7 @@ package graph import ( "errors" "time" + "flag" "github.com/google/cayley/quad" ) @@ -48,6 +49,10 @@ type Handle struct { QuadWriter QuadWriter } +type IgnoreOpts struct { + IgnoreDup, IgnoreMissing bool +} + func (h *Handle) Close() { h.QuadStore.Close() h.QuadWriter.Close() @@ -58,6 +63,11 @@ var ( ErrQuadNotExist = errors.New("quad does not exist") ) +var ( + IgnoreDup = flag.Bool("ignoredup", false, "Don't stop loading on duplicated key on add") + IgnoreMissing = flag.Bool("ignoremissing", false, "Don't stop loading on missing key on delete") +) + type QuadWriter interface { // Add a quad to the store. AddQuad(quad.Quad) error diff --git a/writer/single.go b/writer/single.go index d6770a5..4c386b8 100644 --- a/writer/single.go +++ b/writer/single.go @@ -26,12 +26,34 @@ func init() { } type Single struct { - currentID graph.PrimaryKey - qs graph.QuadStore + currentID graph.PrimaryKey + qs graph.QuadStore + ignoreOpts graph.IgnoreOpts } func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) { - return &Single{currentID: qs.Horizon(), qs: qs}, nil + var ignoreMissing, ignoreDuplicate bool + + if *graph.IgnoreMissing{ + ignoreMissing = true + }else{ + ignoreMissing,_ = opts.BoolKey("ignore_missing") + } + + if *graph.IgnoreDup{ + ignoreDuplicate = true + }else{ + ignoreDuplicate,_ = opts.BoolKey("ignore_duplicate") + } + + return &Single{ + currentID: qs.Horizon(), + qs: qs, + ignoreOpts: graph.IgnoreOpts{ + IgnoreDup: ignoreDuplicate, + IgnoreMissing:ignoreMissing, + }, + }, nil } func (s *Single) AddQuad(q quad.Quad) error { @@ -42,7 +64,7 @@ func (s *Single) AddQuad(q quad.Quad) error { Action: graph.Add, Timestamp: time.Now(), } - return s.qs.ApplyDeltas(deltas) + return s.qs.ApplyDeltas(deltas, s.ignoreOpts) } func (s *Single) AddQuadSet(set []quad.Quad) error { @@ -55,7 +77,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error { Timestamp: time.Now(), } } - return s.qs.ApplyDeltas(deltas) + + return s.qs.ApplyDeltas(deltas, s.ignoreOpts) } func (s *Single) RemoveQuad(q quad.Quad) error { @@ -66,7 +89,7 @@ func (s *Single) RemoveQuad(q quad.Quad) error { Action: graph.Delete, Timestamp: time.Now(), } - return s.qs.ApplyDeltas(deltas) + return s.qs.ApplyDeltas(deltas, s.ignoreOpts) } func (s *Single) Close() error {