first swing at mongo indexing (iterator todo)
This commit is contained in:
parent
8821c1968d
commit
ff148f58f8
2 changed files with 117 additions and 153 deletions
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"crypto/sha1"
|
"crypto/sha1"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"hash"
|
"hash"
|
||||||
"io"
|
|
||||||
|
|
||||||
"gopkg.in/mgo.v2"
|
"gopkg.in/mgo.v2"
|
||||||
"gopkg.in/mgo.v2/bson"
|
"gopkg.in/mgo.v2/bson"
|
||||||
|
|
@ -33,9 +32,6 @@ func init() {
|
||||||
graph.RegisterTripleStore("mongo", true, newTripleStore, createNewMongoGraph)
|
graph.RegisterTripleStore("mongo", true, newTripleStore, createNewMongoGraph)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Guarantee we satisfy graph.Bulkloader.
|
|
||||||
var _ graph.BulkLoader = (*TripleStore)(nil)
|
|
||||||
|
|
||||||
const DefaultDBName = "cayley"
|
const DefaultDBName = "cayley"
|
||||||
|
|
||||||
type TripleStore struct {
|
type TripleStore struct {
|
||||||
|
|
@ -63,13 +59,21 @@ func createNewMongoGraph(addr string, options graph.Options) error {
|
||||||
Background: true,
|
Background: true,
|
||||||
Sparse: true,
|
Sparse: true,
|
||||||
}
|
}
|
||||||
db.C("triples").EnsureIndex(indexOpts)
|
db.C("quads").EnsureIndex(indexOpts)
|
||||||
indexOpts.Key = []string{"Pred"}
|
indexOpts.Key = []string{"Pred"}
|
||||||
db.C("triples").EnsureIndex(indexOpts)
|
db.C("quads").EnsureIndex(indexOpts)
|
||||||
indexOpts.Key = []string{"Obj"}
|
indexOpts.Key = []string{"Obj"}
|
||||||
db.C("triples").EnsureIndex(indexOpts)
|
db.C("quads").EnsureIndex(indexOpts)
|
||||||
indexOpts.Key = []string{"Label"}
|
indexOpts.Key = []string{"Label"}
|
||||||
db.C("triples").EnsureIndex(indexOpts)
|
db.C("quads").EnsureIndex(indexOpts)
|
||||||
|
logOpts := mgo.Index{
|
||||||
|
Key: []string{"LogID"},
|
||||||
|
Unique: true,
|
||||||
|
DropDups: false,
|
||||||
|
Background: true,
|
||||||
|
Sparse: true,
|
||||||
|
}
|
||||||
|
db.C("log").EnsureIndex(logOpts)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -113,114 +117,121 @@ type MongoNode struct {
|
||||||
Size int "Size"
|
Size int "Size"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) updateNodeBy(node_name string, inc int) {
|
type MongoLogEntry struct {
|
||||||
var size MongoNode
|
LogID int64 "LogID"
|
||||||
node := qs.ValueOf(node_name)
|
Action string "Action"
|
||||||
err := qs.db.C("nodes").FindId(node).One(&size)
|
Key string "Key"
|
||||||
if err != nil {
|
Timestamp int64
|
||||||
if err.Error() == "not found" {
|
|
||||||
// Not found. Okay.
|
|
||||||
size.Id = node.(string)
|
|
||||||
size.Name = node_name
|
|
||||||
size.Size = inc
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Error: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
size.Id = node.(string)
|
|
||||||
size.Name = node_name
|
|
||||||
size.Size += inc
|
|
||||||
}
|
|
||||||
|
|
||||||
// Removing something...
|
|
||||||
if inc < 0 {
|
|
||||||
if size.Size <= 0 {
|
|
||||||
err := qs.db.C("nodes").RemoveId(node)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error: %v while removing node %s", err, node_name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err2 := qs.db.C("nodes").UpsertId(node, size)
|
|
||||||
if err2 != nil {
|
|
||||||
glog.Errorf("Error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) writeTriple(t quad.Quad) bool {
|
func (qs *TripleStore) updateNodeBy(node_name string, inc int) error {
|
||||||
|
node := qs.ValueOf(node_name)
|
||||||
|
doc := bson.M{
|
||||||
|
"_id": node.(string),
|
||||||
|
"Name": node_name,
|
||||||
|
}
|
||||||
|
upsert := bson.M{
|
||||||
|
"$setOnInsert": doc,
|
||||||
|
"$inc": bson.M{
|
||||||
|
"Size": inc,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := qs.db.C("nodes").UpsertId(node, upsert)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error updating node: %v", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) error {
|
||||||
|
var setname string
|
||||||
|
if proc == graph.Add {
|
||||||
|
setname = "Added"
|
||||||
|
} else if proc == graph.Delete {
|
||||||
|
setname = "Deleted"
|
||||||
|
}
|
||||||
tripledoc := bson.M{
|
tripledoc := bson.M{
|
||||||
"_id": qs.getIdForTriple(t),
|
|
||||||
"Subject": t.Subject,
|
"Subject": t.Subject,
|
||||||
"Predicate": t.Predicate,
|
"Predicate": t.Predicate,
|
||||||
"Object": t.Object,
|
"Object": t.Object,
|
||||||
"Label": t.Label,
|
"Label": t.Label,
|
||||||
}
|
}
|
||||||
err := qs.db.C("triples").Insert(tripledoc)
|
upsert := bson.M{
|
||||||
|
"$setOnInsert": tripledoc,
|
||||||
|
"$push": bson.M{
|
||||||
|
setname: id,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := qs.db.C("quads").UpsertId(qs.getIdForTriple(t), upsert)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Among the reasons I hate MongoDB. "Errors don't happen! Right guys?"
|
|
||||||
if err.(*mgo.LastError).Code == 11000 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
glog.Errorf("Error: %v", err)
|
glog.Errorf("Error: %v", err)
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
return true
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) AddTriple(t quad.Quad) {
|
func (qs *TripleStore) updateLog(d *graph.Delta) error {
|
||||||
_ = qs.writeTriple(t)
|
var action string
|
||||||
qs.updateNodeBy(t.Subject, 1)
|
if d.Action == graph.Add {
|
||||||
qs.updateNodeBy(t.Predicate, 1)
|
action = "Add"
|
||||||
qs.updateNodeBy(t.Object, 1)
|
} else {
|
||||||
if t.Label != "" {
|
action = "Delete"
|
||||||
qs.updateNodeBy(t.Label, 1)
|
|
||||||
}
|
}
|
||||||
|
entry := MongoLogEntry{
|
||||||
|
LogID: d.ID,
|
||||||
|
Action: action,
|
||||||
|
Key: qs.getIdForTriple(d.Quad),
|
||||||
|
Timestamp: d.Timestamp.UnixNano(),
|
||||||
|
}
|
||||||
|
err := qs.db.C("log").Insert(entry)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error updating log: %v", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) AddTripleSet(in []quad.Quad) {
|
func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error {
|
||||||
qs.session.SetSafe(nil)
|
qs.session.SetSafe(nil)
|
||||||
ids := make(map[string]int)
|
ids := make(map[string]int)
|
||||||
for _, t := range in {
|
for _, d := range in {
|
||||||
wrote := qs.writeTriple(t)
|
err := qs.updateLog(d)
|
||||||
if wrote {
|
if err != nil {
|
||||||
ids[t.Subject]++
|
return err
|
||||||
ids[t.Object]++
|
}
|
||||||
ids[t.Predicate]++
|
}
|
||||||
if t.Label != "" {
|
for _, d := range in {
|
||||||
ids[t.Label]++
|
err := qs.updateTriple(d.Quad, d.ID, d.Action)
|
||||||
}
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var countdelta int
|
||||||
|
if d.Action == graph.Add {
|
||||||
|
countdelta = 1
|
||||||
|
} else {
|
||||||
|
countdelta = -1
|
||||||
|
}
|
||||||
|
ids[d.Quad.Subject] += countdelta
|
||||||
|
ids[d.Quad.Object] += countdelta
|
||||||
|
ids[d.Quad.Predicate] += countdelta
|
||||||
|
if d.Quad.Label != "" {
|
||||||
|
ids[d.Quad.Label] += countdelta
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for k, v := range ids {
|
for k, v := range ids {
|
||||||
qs.updateNodeBy(k, v)
|
err := qs.updateNodeBy(k, v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
qs.session.SetSafe(&mgo.Safe{})
|
qs.session.SetSafe(&mgo.Safe{})
|
||||||
}
|
return nil
|
||||||
|
|
||||||
func (qs *TripleStore) RemoveTriple(t quad.Quad) {
|
|
||||||
err := qs.db.C("triples").RemoveId(qs.getIdForTriple(t))
|
|
||||||
if err == mgo.ErrNotFound {
|
|
||||||
return
|
|
||||||
} else if err != nil {
|
|
||||||
glog.Errorf("Error: %v while removing triple %v", err, t)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
qs.updateNodeBy(t.Subject, -1)
|
|
||||||
qs.updateNodeBy(t.Predicate, -1)
|
|
||||||
qs.updateNodeBy(t.Object, -1)
|
|
||||||
if t.Label != "" {
|
|
||||||
qs.updateNodeBy(t.Label, -1)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
|
func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
|
||||||
var bsonDoc bson.M
|
var bsonDoc bson.M
|
||||||
err := qs.db.C("triples").FindId(val.(string)).One(&bsonDoc)
|
err := qs.db.C("quads").FindId(val.(string)).One(&bsonDoc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error: Couldn't retrieve triple %s %v", val, err)
|
glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err)
|
||||||
}
|
}
|
||||||
return quad.Quad{
|
return quad.Quad{
|
||||||
bsonDoc["Subject"].(string),
|
bsonDoc["Subject"].(string),
|
||||||
|
|
@ -231,7 +242,7 @@ func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator {
|
func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator {
|
||||||
return NewIterator(qs, "triples", d, val)
|
return NewIterator(qs, "quads", d, val)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) NodesAllIterator() graph.Iterator {
|
func (qs *TripleStore) NodesAllIterator() graph.Iterator {
|
||||||
|
|
@ -239,7 +250,7 @@ func (qs *TripleStore) NodesAllIterator() graph.Iterator {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) TriplesAllIterator() graph.Iterator {
|
func (qs *TripleStore) TriplesAllIterator() graph.Iterator {
|
||||||
return NewAllIterator(qs, "triples")
|
return NewAllIterator(qs, "quads")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) ValueOf(s string) graph.Value {
|
func (qs *TripleStore) ValueOf(s string) graph.Value {
|
||||||
|
|
@ -261,7 +272,8 @@ func (qs *TripleStore) NameOf(v graph.Value) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) Size() int64 {
|
func (qs *TripleStore) Size() int64 {
|
||||||
count, err := qs.db.C("triples").Count()
|
// TODO(barakmich): Make size real; store it in the log, and retrieve it.
|
||||||
|
count, err := qs.db.C("quads").Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error: %v", err)
|
glog.Errorf("Error: %v", err)
|
||||||
return 0
|
return 0
|
||||||
|
|
@ -269,6 +281,15 @@ func (qs *TripleStore) Size() int64 {
|
||||||
return int64(count)
|
return int64(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (qs *TripleStore) Horizon() int64 {
|
||||||
|
var log MongoLogEntry
|
||||||
|
err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Could not get Horizon from Mongo: %v", err)
|
||||||
|
}
|
||||||
|
return log.LogID
|
||||||
|
}
|
||||||
|
|
||||||
func compareStrings(a, b graph.Value) bool {
|
func compareStrings(a, b graph.Value) bool {
|
||||||
return a.(string) == b.(string)
|
return a.(string) == b.(string)
|
||||||
}
|
}
|
||||||
|
|
@ -298,61 +319,4 @@ func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.V
|
||||||
return val
|
return val
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) BulkLoad(dec quad.Unmarshaler) error {
|
// TODO(barakmich): Rewrite bulk loader. For now, iterating around blocks is the way we'll go about it.
|
||||||
if qs.Size() != 0 {
|
|
||||||
return graph.ErrCannotBulkLoad
|
|
||||||
}
|
|
||||||
|
|
||||||
qs.session.SetSafe(nil)
|
|
||||||
for {
|
|
||||||
q, err := dec.Unmarshal()
|
|
||||||
if err != nil {
|
|
||||||
if err != io.EOF {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
qs.writeTriple(q)
|
|
||||||
}
|
|
||||||
|
|
||||||
outputTo := bson.M{"replace": "nodes", "sharded": true}
|
|
||||||
glog.Infoln("Mapreducing")
|
|
||||||
job := mgo.MapReduce{
|
|
||||||
Map: `function() {
|
|
||||||
var len = this["_id"].length
|
|
||||||
var s_key = this["_id"].slice(0, len / 4)
|
|
||||||
var p_key = this["_id"].slice(len / 4, 2 * len / 4)
|
|
||||||
var o_key = this["_id"].slice(2 * len / 4, 3 * len / 4)
|
|
||||||
var c_key = this["_id"].slice(3 * len / 4)
|
|
||||||
emit(s_key, {"_id": s_key, "Name" : this.Subject, "Size" : 1})
|
|
||||||
emit(p_key, {"_id": p_key, "Name" : this.Predicate, "Size" : 1})
|
|
||||||
emit(o_key, {"_id": o_key, "Name" : this.Object, "Size" : 1})
|
|
||||||
if (this.Label != "") {
|
|
||||||
emit(c_key, {"_id": c_key, "Name" : this.Label, "Size" : 1})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
`,
|
|
||||||
Reduce: `
|
|
||||||
function(key, value_list) {
|
|
||||||
out = {"_id": key, "Name": value_list[0].Name}
|
|
||||||
count = 0
|
|
||||||
for (var i = 0; i < value_list.length; i++) {
|
|
||||||
count = count + value_list[i].Size
|
|
||||||
|
|
||||||
}
|
|
||||||
out["Size"] = count
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
`,
|
|
||||||
Out: outputTo,
|
|
||||||
}
|
|
||||||
qs.db.C("triples").Find(nil).MapReduce(&job, nil)
|
|
||||||
glog.Infoln("Fixing")
|
|
||||||
qs.db.Run(bson.D{{"eval", `function() { db.nodes.find().forEach(function (result) {
|
|
||||||
db.nodes.update({"_id": result._id}, result.value)
|
|
||||||
}) }`}, {"args", bson.D{}}}, nil)
|
|
||||||
|
|
||||||
qs.session.SetSafe(&mgo.Safe{})
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,10 @@ import (
|
||||||
"github.com/google/cayley/quad"
|
"github.com/google/cayley/quad"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
graph.RegisterWriter("single", NewSingleReplication)
|
||||||
|
}
|
||||||
|
|
||||||
type Single struct {
|
type Single struct {
|
||||||
nextID int64
|
nextID int64
|
||||||
ts graph.TripleStore
|
ts graph.TripleStore
|
||||||
|
|
@ -79,7 +83,3 @@ func (s *Single) RemoveQuad(q quad.Quad) error {
|
||||||
}
|
}
|
||||||
return s.ts.ApplyDeltas(deltas)
|
return s.ts.ApplyDeltas(deltas)
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
graph.RegisterWriter("single", NewSingleReplication)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue