diff --git a/graph/mongo/iterator.go b/graph/mongo/iterator.go index 8485ca9..1495eca 100644 --- a/graph/mongo/iterator.go +++ b/graph/mongo/iterator.go @@ -140,9 +140,9 @@ func (it *Iterator) Clone() graph.Iterator { func (it *Iterator) Next() bool { var result struct { - Id string "_id" - Added []int64 - Deleted []int64 + Id string "_id" + Added []int64 "Added" + Deleted []int64 "Deleted" } found := it.iter.Next(&result) if !found { diff --git a/graph/mongo/triplestore.go b/graph/mongo/triplestore.go index 79fe38d..7165900 100644 --- a/graph/mongo/triplestore.go +++ b/graph/mongo/triplestore.go @@ -170,6 +170,25 @@ func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) return err } +func (qs *TripleStore) checkValid(key string) bool { + var indexEntry struct { + Added []int64 "Added" + Deleted []int64 "Deleted" + } + err := qs.db.C("quads").FindId(key).One(&indexEntry) + if err == mgo.ErrNotFound { + return false + } + if err != nil { + glog.Errorln("Other error checking valid quad: %s %v.", key, err) + return false + } + if len(indexEntry.Added) <= len(indexEntry.Deleted) { + return false + } + return true +} + func (qs *TripleStore) updateLog(d *graph.Delta) error { var action string if d.Action == graph.Add { @@ -193,6 +212,23 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error { func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { qs.session.SetSafe(nil) ids := make(map[string]int) + // Pre-check the existence condition. + for _, d := range in { + key := qs.getIdForTriple(d.Quad) + switch d.Action { + case graph.Add: + if qs.checkValid(key) { + return graph.ErrQuadExists + } + case graph.Delete: + if !qs.checkValid(key) { + return graph.ErrQuadNotExist + } + } + } + if glog.V(2) { + glog.Infoln("Existence verified. Proceeding.") + } for _, d := range in { err := qs.updateLog(d) if err != nil { @@ -285,6 +321,9 @@ func (qs *TripleStore) Horizon() int64 { var log MongoLogEntry err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log) if err != nil { + if err == mgo.ErrNotFound { + return 0 + } glog.Errorf("Could not get Horizon from Mongo: %v", err) } return log.LogID diff --git a/writer/single.go b/writer/single.go index 794f1a8..4a3a787 100644 --- a/writer/single.go +++ b/writer/single.go @@ -33,8 +33,9 @@ type Single struct { } func NewSingleReplication(ts graph.TripleStore, opts graph.Options) (graph.QuadWriter, error) { - rep := &Single{nextID: ts.Horizon(), ts: ts} - if rep.nextID <= 0 { + horizon := ts.Horizon() + rep := &Single{nextID: horizon + 1, ts: ts} + if horizon <= 0 { rep.nextID = 1 } return rep, nil