Changed the ApplyDeltas signature
This commit is contained in:
parent
50c3e5f93c
commit
472d86223e
8 changed files with 26 additions and 21 deletions
|
|
@ -185,7 +185,7 @@ var (
|
||||||
metaBucket = []byte("meta")
|
metaBucket = []byte("meta")
|
||||||
)
|
)
|
||||||
|
|
||||||
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss bool) error {
|
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
|
||||||
oldSize := qs.size
|
oldSize := qs.size
|
||||||
oldHorizon := qs.horizon
|
oldHorizon := qs.horizon
|
||||||
err := qs.db.Update(func(tx *bolt.Tx) error {
|
err := qs.db.Update(func(tx *bolt.Tx) error {
|
||||||
|
|
@ -209,10 +209,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss boo
|
||||||
for _, d := range deltas {
|
for _, d := range deltas {
|
||||||
err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add)
|
err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == graph.ErrQuadExists && ignoreDup{
|
if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup{
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err == graph.ErrQuadNotExist && ignoreMiss{
|
if err == graph.ErrQuadNotExist && ignoreOpts.IgnoreMissing{
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ func (qs *store) ValueOf(s string) graph.Value {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *store) ApplyDeltas([]graph.Delta, bool, bool) error { return nil }
|
func (qs *store) ApplyDeltas([]graph.Delta, graph.IgnoreOpts) error { return nil }
|
||||||
|
|
||||||
func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} }
|
func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} }
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -181,7 +181,7 @@ var (
|
||||||
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
|
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss bool) error {
|
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
|
||||||
batch := &leveldb.Batch{}
|
batch := &leveldb.Batch{}
|
||||||
resizeMap := make(map[string]int64)
|
resizeMap := make(map[string]int64)
|
||||||
sizeChange := int64(0)
|
sizeChange := int64(0)
|
||||||
|
|
@ -196,10 +196,10 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDup, ignoreMiss boo
|
||||||
batch.Put(keyFor(d), bytes)
|
batch.Put(keyFor(d), bytes)
|
||||||
err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add)
|
err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == graph.ErrQuadExists && ignoreDup{
|
if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err == graph.ErrQuadNotExist && ignoreMiss{
|
if err == graph.ErrQuadNotExist && ignoreOpts.IgnoreMissing {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -100,18 +100,18 @@ func newQuadStore() *QuadStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreDupl, ignoreMiss bool) error {
|
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
|
||||||
for _, d := range deltas {
|
for _, d := range deltas {
|
||||||
var err error
|
var err error
|
||||||
switch d.Action {
|
switch d.Action {
|
||||||
case graph.Add:
|
case graph.Add:
|
||||||
err = qs.AddDelta(d)
|
err = qs.AddDelta(d)
|
||||||
if err != nil && ignoreDupl{
|
if err != nil && ignoreOpts.IgnoreDup{
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
case graph.Delete:
|
case graph.Delete:
|
||||||
err = qs.RemoveDelta(d)
|
err = qs.RemoveDelta(d)
|
||||||
if err != nil && ignoreMiss{
|
if err != nil && ignoreOpts.IgnoreMissing{
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
|
|
||||||
|
|
@ -214,7 +214,7 @@ func (qs *QuadStore) updateLog(d graph.Delta) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDup, ignoreMiss bool) error {
|
func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
|
||||||
qs.session.SetSafe(nil)
|
qs.session.SetSafe(nil)
|
||||||
ids := make(map[string]int)
|
ids := make(map[string]int)
|
||||||
// Pre-check the existence condition.
|
// Pre-check the existence condition.
|
||||||
|
|
@ -226,7 +226,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDup, ignoreMiss bool) e
|
||||||
switch d.Action {
|
switch d.Action {
|
||||||
case graph.Add:
|
case graph.Add:
|
||||||
if qs.checkValid(key) {
|
if qs.checkValid(key) {
|
||||||
if ignoreDup {
|
if ignoreOpts.IgnoreDup {
|
||||||
continue
|
continue
|
||||||
}else{
|
}else{
|
||||||
return graph.ErrQuadExists
|
return graph.ErrQuadExists
|
||||||
|
|
@ -234,7 +234,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreDup, ignoreMiss bool) e
|
||||||
}
|
}
|
||||||
case graph.Delete:
|
case graph.Delete:
|
||||||
if !qs.checkValid(key) {
|
if !qs.checkValid(key) {
|
||||||
if ignoreMiss {
|
if ignoreOpts.IgnoreMissing {
|
||||||
continue
|
continue
|
||||||
}else{
|
}else{
|
||||||
return graph.ErrQuadNotExist
|
return graph.ErrQuadNotExist
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ type Value interface{}
|
||||||
type QuadStore interface {
|
type QuadStore interface {
|
||||||
// The only way in is through building a transaction, which
|
// The only way in is through building a transaction, which
|
||||||
// is done by a replication strategy.
|
// is done by a replication strategy.
|
||||||
ApplyDeltas([]Delta, bool, bool) error
|
ApplyDeltas([]Delta, IgnoreOpts) error
|
||||||
|
|
||||||
// Given an opaque token, returns the quad for that token from the store.
|
// Given an opaque token, returns the quad for that token from the store.
|
||||||
Quad(Value) quad.Quad
|
Quad(Value) quad.Quad
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,10 @@ type Handle struct {
|
||||||
QuadWriter QuadWriter
|
QuadWriter QuadWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type IgnoreOpts struct {
|
||||||
|
IgnoreDup, IgnoreMissing bool
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Handle) Close() {
|
func (h *Handle) Close() {
|
||||||
h.QuadStore.Close()
|
h.QuadStore.Close()
|
||||||
h.QuadWriter.Close()
|
h.QuadWriter.Close()
|
||||||
|
|
|
||||||
|
|
@ -28,8 +28,7 @@ func init() {
|
||||||
type Single struct {
|
type Single struct {
|
||||||
currentID graph.PrimaryKey
|
currentID graph.PrimaryKey
|
||||||
qs graph.QuadStore
|
qs graph.QuadStore
|
||||||
ignoreMissing bool
|
ignoreOpts graph.IgnoreOpts
|
||||||
ignoreDuplicate bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) {
|
func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) {
|
||||||
|
|
@ -50,8 +49,10 @@ func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWri
|
||||||
return &Single{
|
return &Single{
|
||||||
currentID: qs.Horizon(),
|
currentID: qs.Horizon(),
|
||||||
qs: qs,
|
qs: qs,
|
||||||
ignoreMissing: ignoreMissing,
|
ignoreOpts: graph.IgnoreOpts{
|
||||||
ignoreDuplicate: ignoreDuplicate,
|
IgnoreDup: ignoreDuplicate,
|
||||||
|
IgnoreMissing:ignoreMissing,
|
||||||
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -63,7 +64,7 @@ func (s *Single) AddQuad(q quad.Quad) error {
|
||||||
Action: graph.Add,
|
Action: graph.Add,
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
}
|
}
|
||||||
return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing)
|
return s.qs.ApplyDeltas(deltas, s.ignoreOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Single) AddQuadSet(set []quad.Quad) error {
|
func (s *Single) AddQuadSet(set []quad.Quad) error {
|
||||||
|
|
@ -77,7 +78,7 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing)
|
return s.qs.ApplyDeltas(deltas, s.ignoreOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Single) RemoveQuad(q quad.Quad) error {
|
func (s *Single) RemoveQuad(q quad.Quad) error {
|
||||||
|
|
@ -88,7 +89,7 @@ func (s *Single) RemoveQuad(q quad.Quad) error {
|
||||||
Action: graph.Delete,
|
Action: graph.Delete,
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
}
|
}
|
||||||
return s.qs.ApplyDeltas(deltas, s.ignoreDuplicate, s.ignoreMissing)
|
return s.qs.ApplyDeltas(deltas, s.ignoreOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Single) Close() error {
|
func (s *Single) Close() error {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue