diff --git a/graph/mongo/triplestore.go b/graph/mongo/triplestore.go index 364d195..79fe38d 100644 --- a/graph/mongo/triplestore.go +++ b/graph/mongo/triplestore.go @@ -18,7 +18,6 @@ import ( "crypto/sha1" "encoding/hex" "hash" - "io" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" @@ -33,9 +32,6 @@ func init() { graph.RegisterTripleStore("mongo", true, newTripleStore, createNewMongoGraph) } -// Guarantee we satisfy graph.Bulkloader. -var _ graph.BulkLoader = (*TripleStore)(nil) - const DefaultDBName = "cayley" type TripleStore struct { @@ -63,13 +59,21 @@ func createNewMongoGraph(addr string, options graph.Options) error { Background: true, Sparse: true, } - db.C("triples").EnsureIndex(indexOpts) + db.C("quads").EnsureIndex(indexOpts) indexOpts.Key = []string{"Pred"} - db.C("triples").EnsureIndex(indexOpts) + db.C("quads").EnsureIndex(indexOpts) indexOpts.Key = []string{"Obj"} - db.C("triples").EnsureIndex(indexOpts) + db.C("quads").EnsureIndex(indexOpts) indexOpts.Key = []string{"Label"} - db.C("triples").EnsureIndex(indexOpts) + db.C("quads").EnsureIndex(indexOpts) + logOpts := mgo.Index{ + Key: []string{"LogID"}, + Unique: true, + DropDups: false, + Background: true, + Sparse: true, + } + db.C("log").EnsureIndex(logOpts) return nil } @@ -113,114 +117,121 @@ type MongoNode struct { Size int "Size" } -func (qs *TripleStore) updateNodeBy(node_name string, inc int) { - var size MongoNode - node := qs.ValueOf(node_name) - err := qs.db.C("nodes").FindId(node).One(&size) - if err != nil { - if err.Error() == "not found" { - // Not found. Okay. - size.Id = node.(string) - size.Name = node_name - size.Size = inc - } else { - glog.Errorf("Error: %v", err) - return - } - } else { - size.Id = node.(string) - size.Name = node_name - size.Size += inc - } - - // Removing something... - if inc < 0 { - if size.Size <= 0 { - err := qs.db.C("nodes").RemoveId(node) - if err != nil { - glog.Errorf("Error: %v while removing node %s", err, node_name) - return - } - } - } - - _, err2 := qs.db.C("nodes").UpsertId(node, size) - if err2 != nil { - glog.Errorf("Error: %v", err) - } +type MongoLogEntry struct { + LogID int64 "LogID" + Action string "Action" + Key string "Key" + Timestamp int64 } -func (qs *TripleStore) writeTriple(t quad.Quad) bool { +func (qs *TripleStore) updateNodeBy(node_name string, inc int) error { + node := qs.ValueOf(node_name) + doc := bson.M{ + "_id": node.(string), + "Name": node_name, + } + upsert := bson.M{ + "$setOnInsert": doc, + "$inc": bson.M{ + "Size": inc, + }, + } + + _, err := qs.db.C("nodes").UpsertId(node, upsert) + if err != nil { + glog.Errorf("Error updating node: %v", err) + } + return err +} + +func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) error { + var setname string + if proc == graph.Add { + setname = "Added" + } else if proc == graph.Delete { + setname = "Deleted" + } tripledoc := bson.M{ - "_id": qs.getIdForTriple(t), "Subject": t.Subject, "Predicate": t.Predicate, "Object": t.Object, "Label": t.Label, } - err := qs.db.C("triples").Insert(tripledoc) + upsert := bson.M{ + "$setOnInsert": tripledoc, + "$push": bson.M{ + setname: id, + }, + } + _, err := qs.db.C("quads").UpsertId(qs.getIdForTriple(t), upsert) if err != nil { - // Among the reasons I hate MongoDB. "Errors don't happen! Right guys?" - if err.(*mgo.LastError).Code == 11000 { - return false - } glog.Errorf("Error: %v", err) - return false } - return true + return err } -func (qs *TripleStore) AddTriple(t quad.Quad) { - _ = qs.writeTriple(t) - qs.updateNodeBy(t.Subject, 1) - qs.updateNodeBy(t.Predicate, 1) - qs.updateNodeBy(t.Object, 1) - if t.Label != "" { - qs.updateNodeBy(t.Label, 1) +func (qs *TripleStore) updateLog(d *graph.Delta) error { + var action string + if d.Action == graph.Add { + action = "Add" + } else { + action = "Delete" } + entry := MongoLogEntry{ + LogID: d.ID, + Action: action, + Key: qs.getIdForTriple(d.Quad), + Timestamp: d.Timestamp.UnixNano(), + } + err := qs.db.C("log").Insert(entry) + if err != nil { + glog.Errorf("Error updating log: %v", err) + } + return err } -func (qs *TripleStore) AddTripleSet(in []quad.Quad) { +func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { qs.session.SetSafe(nil) ids := make(map[string]int) - for _, t := range in { - wrote := qs.writeTriple(t) - if wrote { - ids[t.Subject]++ - ids[t.Object]++ - ids[t.Predicate]++ - if t.Label != "" { - ids[t.Label]++ - } + for _, d := range in { + err := qs.updateLog(d) + if err != nil { + return err + } + } + for _, d := range in { + err := qs.updateTriple(d.Quad, d.ID, d.Action) + if err != nil { + return err + } + var countdelta int + if d.Action == graph.Add { + countdelta = 1 + } else { + countdelta = -1 + } + ids[d.Quad.Subject] += countdelta + ids[d.Quad.Object] += countdelta + ids[d.Quad.Predicate] += countdelta + if d.Quad.Label != "" { + ids[d.Quad.Label] += countdelta } } for k, v := range ids { - qs.updateNodeBy(k, v) + err := qs.updateNodeBy(k, v) + if err != nil { + return err + } } qs.session.SetSafe(&mgo.Safe{}) -} - -func (qs *TripleStore) RemoveTriple(t quad.Quad) { - err := qs.db.C("triples").RemoveId(qs.getIdForTriple(t)) - if err == mgo.ErrNotFound { - return - } else if err != nil { - glog.Errorf("Error: %v while removing triple %v", err, t) - return - } - qs.updateNodeBy(t.Subject, -1) - qs.updateNodeBy(t.Predicate, -1) - qs.updateNodeBy(t.Object, -1) - if t.Label != "" { - qs.updateNodeBy(t.Label, -1) - } + return nil } func (qs *TripleStore) Quad(val graph.Value) quad.Quad { var bsonDoc bson.M - err := qs.db.C("triples").FindId(val.(string)).One(&bsonDoc) + err := qs.db.C("quads").FindId(val.(string)).One(&bsonDoc) if err != nil { - glog.Errorf("Error: Couldn't retrieve triple %s %v", val, err) + glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err) } return quad.Quad{ bsonDoc["Subject"].(string), @@ -231,7 +242,7 @@ func (qs *TripleStore) Quad(val graph.Value) quad.Quad { } func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { - return NewIterator(qs, "triples", d, val) + return NewIterator(qs, "quads", d, val) } func (qs *TripleStore) NodesAllIterator() graph.Iterator { @@ -239,7 +250,7 @@ func (qs *TripleStore) NodesAllIterator() graph.Iterator { } func (qs *TripleStore) TriplesAllIterator() graph.Iterator { - return NewAllIterator(qs, "triples") + return NewAllIterator(qs, "quads") } func (qs *TripleStore) ValueOf(s string) graph.Value { @@ -261,7 +272,8 @@ func (qs *TripleStore) NameOf(v graph.Value) string { } func (qs *TripleStore) Size() int64 { - count, err := qs.db.C("triples").Count() + // TODO(barakmich): Make size real; store it in the log, and retrieve it. + count, err := qs.db.C("quads").Count() if err != nil { glog.Errorf("Error: %v", err) return 0 @@ -269,6 +281,15 @@ func (qs *TripleStore) Size() int64 { return int64(count) } +func (qs *TripleStore) Horizon() int64 { + var log MongoLogEntry + err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log) + if err != nil { + glog.Errorf("Could not get Horizon from Mongo: %v", err) + } + return log.LogID +} + func compareStrings(a, b graph.Value) bool { return a.(string) == b.(string) } @@ -298,61 +319,4 @@ func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.V return val } -func (qs *TripleStore) BulkLoad(dec quad.Unmarshaler) error { - if qs.Size() != 0 { - return graph.ErrCannotBulkLoad - } - - qs.session.SetSafe(nil) - for { - q, err := dec.Unmarshal() - if err != nil { - if err != io.EOF { - return err - } - break - } - qs.writeTriple(q) - } - - outputTo := bson.M{"replace": "nodes", "sharded": true} - glog.Infoln("Mapreducing") - job := mgo.MapReduce{ - Map: `function() { - var len = this["_id"].length - var s_key = this["_id"].slice(0, len / 4) - var p_key = this["_id"].slice(len / 4, 2 * len / 4) - var o_key = this["_id"].slice(2 * len / 4, 3 * len / 4) - var c_key = this["_id"].slice(3 * len / 4) - emit(s_key, {"_id": s_key, "Name" : this.Subject, "Size" : 1}) - emit(p_key, {"_id": p_key, "Name" : this.Predicate, "Size" : 1}) - emit(o_key, {"_id": o_key, "Name" : this.Object, "Size" : 1}) - if (this.Label != "") { - emit(c_key, {"_id": c_key, "Name" : this.Label, "Size" : 1}) - } - } - `, - Reduce: ` - function(key, value_list) { - out = {"_id": key, "Name": value_list[0].Name} - count = 0 - for (var i = 0; i < value_list.length; i++) { - count = count + value_list[i].Size - - } - out["Size"] = count - return out - } - `, - Out: outputTo, - } - qs.db.C("triples").Find(nil).MapReduce(&job, nil) - glog.Infoln("Fixing") - qs.db.Run(bson.D{{"eval", `function() { db.nodes.find().forEach(function (result) { - db.nodes.update({"_id": result._id}, result.value) - }) }`}, {"args", bson.D{}}}, nil) - - qs.session.SetSafe(&mgo.Safe{}) - - return nil -} +// TODO(barakmich): Rewrite bulk loader. For now, iterating around blocks is the way we'll go about it. diff --git a/writer/single.go b/writer/single.go index 51641ca..0dc4882 100644 --- a/writer/single.go +++ b/writer/single.go @@ -22,6 +22,10 @@ import ( "github.com/google/cayley/quad" ) +func init() { + graph.RegisterWriter("single", NewSingleReplication) +} + type Single struct { nextID int64 ts graph.TripleStore @@ -79,7 +83,3 @@ func (s *Single) RemoveQuad(q quad.Quad) error { } return s.ts.ApplyDeltas(deltas) } - -func init() { - graph.RegisterWriter("single", NewSingleReplication) -}