Merge pull request #207 from LAlbertalli/master

Add options to ignore duplicate or missing quad
This commit is contained in:
Barak Michener 2015-02-11 15:45:58 -05:00
commit 969aa1a6c3
9 changed files with 104 additions and 18 deletions

View file

@ -123,3 +123,23 @@ Optionally disable syncing to disk per transaction. Nosync being true means much
* Default: "cayley" * Default: "cayley"
The name of the database within MongoDB to connect to. Manages its own collections and indicies therein. The name of the database within MongoDB to connect to. Manages its own collections and indicies therein.
## Per-Replication Options
The `replication_options` object in the main configuration file contains any of these following options that change the behavior of the replication manager.
### All
#### **`ignore_missing`**
* Type: Boolean
* Default: false
Optionally ignore missing quad on delete.
#### **`ignore_duplicate`**
* Type: Boolean
* Default: false
Optionally ignore duplicated quad on add.

View file

@ -43,6 +43,7 @@ var (
} }
hashSize = sha1.Size hashSize = sha1.Size
localFillPercent = 0.7 localFillPercent = 0.7
) )
type Token struct { type Token struct {
@ -184,7 +185,7 @@ var (
metaBucket = []byte("meta") metaBucket = []byte("meta")
) )
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
oldSize := qs.size oldSize := qs.size
oldHorizon := qs.horizon oldHorizon := qs.horizon
err := qs.db.Update(func(tx *bolt.Tx) error { err := qs.db.Update(func(tx *bolt.Tx) error {
@ -208,6 +209,12 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
for _, d := range deltas { for _, d := range deltas {
err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add) err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add)
if err != nil { if err != nil {
if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup{
continue
}
if err == graph.ErrQuadNotExist && ignoreOpts.IgnoreMissing{
continue
}
return err return err
} }
delta := int64(1) delta := int64(1)

View file

@ -35,7 +35,7 @@ func (qs *store) ValueOf(s string) graph.Value {
return nil return nil
} }
func (qs *store) ApplyDeltas([]graph.Delta) error { return nil } func (qs *store) ApplyDeltas([]graph.Delta, graph.IgnoreOpts) error { return nil }
func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} } func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} }

View file

@ -49,6 +49,7 @@ var (
New: func() interface{} { return sha1.New() }, New: func() interface{} { return sha1.New() },
} }
hashSize = sha1.Size hashSize = sha1.Size
) )
type Token []byte type Token []byte
@ -180,7 +181,7 @@ var (
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
) )
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
batch := &leveldb.Batch{} batch := &leveldb.Batch{}
resizeMap := make(map[string]int64) resizeMap := make(map[string]int64)
sizeChange := int64(0) sizeChange := int64(0)
@ -195,6 +196,12 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error {
batch.Put(keyFor(d), bytes) batch.Put(keyFor(d), bytes)
err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add) err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add)
if err != nil { if err != nil {
if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup {
continue
}
if err == graph.ErrQuadNotExist && ignoreOpts.IgnoreMissing {
continue
}
return err return err
} }
delta := int64(1) delta := int64(1)
@ -250,12 +257,17 @@ func (qs *QuadStore) buildQuadWrite(batch *leveldb.Batch, q quad.Quad, id int64,
} else { } else {
entry.Quad = q entry.Quad = q
} }
entry.History = append(entry.History, id)
if isAdd && len(entry.History)%2 == 0 { if isAdd && len(entry.History)%2 == 1 {
glog.Error("Entry History is out of sync for", entry) glog.Errorf("attempt to add existing quad %v: %#v", entry, q)
return errors.New("odd index history") return graph.ErrQuadExists
} }
if !isAdd && len(entry.History)%2 == 0 {
glog.Error("attempt to delete non-existent quad %v: %#c", entry, q)
return graph.ErrQuadNotExist
}
entry.History = append(entry.History, id)
bytes, err := json.Marshal(entry) bytes, err := json.Marshal(entry)
if err != nil { if err != nil {

View file

@ -100,14 +100,20 @@ func newQuadStore() *QuadStore {
} }
} }
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
for _, d := range deltas { for _, d := range deltas {
var err error var err error
switch d.Action { switch d.Action {
case graph.Add: case graph.Add:
err = qs.AddDelta(d) err = qs.AddDelta(d)
if err != nil && ignoreOpts.IgnoreDup{
err = nil
}
case graph.Delete: case graph.Delete:
err = qs.RemoveDelta(d) err = qs.RemoveDelta(d)
if err != nil && ignoreOpts.IgnoreMissing{
err = nil
}
default: default:
err = errors.New("memstore: invalid action") err = errors.New("memstore: invalid action")
} }

View file

@ -214,7 +214,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error {
return err return err
} }
func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error { func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
qs.session.SetSafe(nil) qs.session.SetSafe(nil)
ids := make(map[string]int) ids := make(map[string]int)
// Pre-check the existence condition. // Pre-check the existence condition.
@ -226,11 +226,19 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta) error {
switch d.Action { switch d.Action {
case graph.Add: case graph.Add:
if qs.checkValid(key) { if qs.checkValid(key) {
return graph.ErrQuadExists if ignoreOpts.IgnoreDup {
continue
}else{
return graph.ErrQuadExists
}
} }
case graph.Delete: case graph.Delete:
if !qs.checkValid(key) { if !qs.checkValid(key) {
return graph.ErrQuadNotExist if ignoreOpts.IgnoreMissing {
continue
}else{
return graph.ErrQuadNotExist
}
} }
} }
} }

View file

@ -44,7 +44,7 @@ type Value interface{}
type QuadStore interface { type QuadStore interface {
// The only way in is through building a transaction, which // The only way in is through building a transaction, which
// is done by a replication strategy. // is done by a replication strategy.
ApplyDeltas([]Delta) error ApplyDeltas([]Delta, IgnoreOpts) error
// Given an opaque token, returns the quad for that token from the store. // Given an opaque token, returns the quad for that token from the store.
Quad(Value) quad.Quad Quad(Value) quad.Quad

View file

@ -24,6 +24,7 @@ package graph
import ( import (
"errors" "errors"
"time" "time"
"flag"
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
) )
@ -48,6 +49,10 @@ type Handle struct {
QuadWriter QuadWriter QuadWriter QuadWriter
} }
type IgnoreOpts struct {
IgnoreDup, IgnoreMissing bool
}
func (h *Handle) Close() { func (h *Handle) Close() {
h.QuadStore.Close() h.QuadStore.Close()
h.QuadWriter.Close() h.QuadWriter.Close()
@ -58,6 +63,11 @@ var (
ErrQuadNotExist = errors.New("quad does not exist") ErrQuadNotExist = errors.New("quad does not exist")
) )
var (
IgnoreDup = flag.Bool("ignoredup", false, "Don't stop loading on duplicated key on add")
IgnoreMissing = flag.Bool("ignoremissing", false, "Don't stop loading on missing key on delete")
)
type QuadWriter interface { type QuadWriter interface {
// Add a quad to the store. // Add a quad to the store.
AddQuad(quad.Quad) error AddQuad(quad.Quad) error

View file

@ -26,12 +26,34 @@ func init() {
} }
type Single struct { type Single struct {
currentID graph.PrimaryKey currentID graph.PrimaryKey
qs graph.QuadStore qs graph.QuadStore
ignoreOpts graph.IgnoreOpts
} }
func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) { func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) {
return &Single{currentID: qs.Horizon(), qs: qs}, nil var ignoreMissing, ignoreDuplicate bool
if *graph.IgnoreMissing{
ignoreMissing = true
}else{
ignoreMissing,_ = opts.BoolKey("ignore_missing")
}
if *graph.IgnoreDup{
ignoreDuplicate = true
}else{
ignoreDuplicate,_ = opts.BoolKey("ignore_duplicate")
}
return &Single{
currentID: qs.Horizon(),
qs: qs,
ignoreOpts: graph.IgnoreOpts{
IgnoreDup: ignoreDuplicate,
IgnoreMissing:ignoreMissing,
},
}, nil
} }
func (s *Single) AddQuad(q quad.Quad) error { func (s *Single) AddQuad(q quad.Quad) error {
@ -42,7 +64,7 @@ func (s *Single) AddQuad(q quad.Quad) error {
Action: graph.Add, Action: graph.Add,
Timestamp: time.Now(), Timestamp: time.Now(),
} }
return s.qs.ApplyDeltas(deltas) return s.qs.ApplyDeltas(deltas, s.ignoreOpts)
} }
func (s *Single) AddQuadSet(set []quad.Quad) error { func (s *Single) AddQuadSet(set []quad.Quad) error {
@ -55,7 +77,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
Timestamp: time.Now(), Timestamp: time.Now(),
} }
} }
return s.qs.ApplyDeltas(deltas)
return s.qs.ApplyDeltas(deltas, s.ignoreOpts)
} }
func (s *Single) RemoveQuad(q quad.Quad) error { func (s *Single) RemoveQuad(q quad.Quad) error {
@ -66,7 +89,7 @@ func (s *Single) RemoveQuad(q quad.Quad) error {
Action: graph.Delete, Action: graph.Delete,
Timestamp: time.Now(), Timestamp: time.Now(),
} }
return s.qs.ApplyDeltas(deltas) return s.qs.ApplyDeltas(deltas, s.ignoreOpts)
} }
func (s *Single) Close() error { func (s *Single) Close() error {