cayley/graph/mongo/triplestore.go
kortschak 6acfdcc5d6 Use concrete value for quad.Quad
Comparison of -short benchmarks in cayley.

$ benchcmp pointer.bench concrete.bench
benchmark                                   old ns/op     new ns/op	delta
BenchmarkNamePredicate                      1673276       1655093	-1.09%
BenchmarkLargeSetsNoIntersection            318985907     261499984	-18.02%
BenchmarkNetAndSpeed                        104403743     41516981	-60.23%
BenchmarkKeanuAndNet                        17309258      16857513	-2.61%
BenchmarkKeanuAndSpeed                      20159161      19282833	-4.35%

Comparison of pathological cases are not so happy.

benchmark                                   old ns/op       new ns/op		delta
BenchmarkVeryLargeSetsSmallIntersection     55269775527     246084606672	+345.24%
BenchmarkHelplessContainsChecker            23436501319     24308906949		+3.72%

Profiling the worst case:

Pointer:
Total: 6121 samples
    1973  32.2%  32.2%     1973  32.2% runtime.findfunc
     773  12.6%  44.9%      773  12.6% readvarint
     510   8.3%  53.2%      511   8.3% step
     409   6.7%  59.9%      410   6.7% runtime.gentraceback
     390   6.4%  66.2%      391   6.4% pcvalue
     215   3.5%  69.8%      215   3.5% runtime.funcdata
     181   3.0%  72.7%      181   3.0% checkframecopy
     118   1.9%  74.6%      119   1.9% runtime.funcspdelta
      96   1.6%  76.2%       96   1.6% runtime.topofstack
      76   1.2%  77.5%       76   1.2% scanblock

Concrete:
Total: 25027 samples
    9437  37.7%  37.7%     9437  37.7% runtime.findfunc
    3853  15.4%  53.1%     3853  15.4% readvarint
    2366   9.5%  62.6%     2366   9.5% step
    2186   8.7%  71.3%     2186   8.7% runtime.gentraceback
    1816   7.3%  78.5%     1816   7.3% pcvalue
    1016   4.1%  82.6%     1016   4.1% runtime.funcdata
     859   3.4%  86.0%      859   3.4% checkframecopy
     506   2.0%  88.1%      506   2.0% runtime.funcspdelta
     410   1.6%  89.7%      410   1.6% runtime.topofstack
     303   1.2%  90.9%      303   1.2% runtime.newstack
2014-08-05 23:25:02 +09:30

358 lines
8.5 KiB
Go

// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mongo
import (
"crypto/sha1"
"encoding/hex"
"hash"
"io"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
func init() {
graph.RegisterTripleStore("mongo", newTripleStore, createNewMongoGraph)
}
// Guarantee we satisfy graph.Bulkloader.
var _ graph.BulkLoader = (*TripleStore)(nil)
const DefaultDBName = "cayley"
type TripleStore struct {
session *mgo.Session
db *mgo.Database
hasher hash.Hash
idCache *IDLru
}
func createNewMongoGraph(addr string, options graph.Options) error {
conn, err := mgo.Dial(addr)
if err != nil {
return err
}
conn.SetSafe(&mgo.Safe{})
dbName := DefaultDBName
if val, ok := options.StringKey("database_name"); ok {
dbName = val
}
db := conn.DB(dbName)
indexOpts := mgo.Index{
Key: []string{"Sub"},
Unique: false,
DropDups: false,
Background: true,
Sparse: true,
}
db.C("triples").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Pred"}
db.C("triples").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Obj"}
db.C("triples").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Label"}
db.C("triples").EnsureIndex(indexOpts)
return nil
}
func newTripleStore(addr string, options graph.Options) (graph.TripleStore, error) {
var qs TripleStore
conn, err := mgo.Dial(addr)
if err != nil {
return nil, err
}
conn.SetSafe(&mgo.Safe{})
dbName := DefaultDBName
if val, ok := options.StringKey("database_name"); ok {
dbName = val
}
qs.db = conn.DB(dbName)
qs.session = conn
qs.hasher = sha1.New()
qs.idCache = NewIDLru(1 << 16)
return &qs, nil
}
func (qs *TripleStore) getIdForTriple(t quad.Quad) string {
id := qs.ConvertStringToByteHash(t.Subject)
id += qs.ConvertStringToByteHash(t.Predicate)
id += qs.ConvertStringToByteHash(t.Object)
id += qs.ConvertStringToByteHash(t.Label)
return id
}
func (qs *TripleStore) ConvertStringToByteHash(s string) string {
qs.hasher.Reset()
key := make([]byte, 0, qs.hasher.Size())
qs.hasher.Write([]byte(s))
key = qs.hasher.Sum(key)
return hex.EncodeToString(key)
}
type MongoNode struct {
Id string "_id"
Name string "Name"
Size int "Size"
}
func (qs *TripleStore) updateNodeBy(node_name string, inc int) {
var size MongoNode
node := qs.ValueOf(node_name)
err := qs.db.C("nodes").FindId(node).One(&size)
if err != nil {
if err.Error() == "not found" {
// Not found. Okay.
size.Id = node.(string)
size.Name = node_name
size.Size = inc
} else {
glog.Errorf("Error: %v", err)
return
}
} else {
size.Id = node.(string)
size.Name = node_name
size.Size += inc
}
// Removing something...
if inc < 0 {
if size.Size <= 0 {
err := qs.db.C("nodes").RemoveId(node)
if err != nil {
glog.Errorf("Error: %v while removing node %s", err, node_name)
return
}
}
}
_, err2 := qs.db.C("nodes").UpsertId(node, size)
if err2 != nil {
glog.Errorf("Error: %v", err)
}
}
func (qs *TripleStore) writeTriple(t quad.Quad) bool {
tripledoc := bson.M{
"_id": qs.getIdForTriple(t),
"Subject": t.Subject,
"Predicate": t.Predicate,
"Object": t.Object,
"Label": t.Label,
}
err := qs.db.C("triples").Insert(tripledoc)
if err != nil {
// Among the reasons I hate MongoDB. "Errors don't happen! Right guys?"
if err.(*mgo.LastError).Code == 11000 {
return false
}
glog.Errorf("Error: %v", err)
return false
}
return true
}
func (qs *TripleStore) AddTriple(t quad.Quad) {
_ = qs.writeTriple(t)
qs.updateNodeBy(t.Subject, 1)
qs.updateNodeBy(t.Predicate, 1)
qs.updateNodeBy(t.Object, 1)
if t.Label != "" {
qs.updateNodeBy(t.Label, 1)
}
}
func (qs *TripleStore) AddTripleSet(in []quad.Quad) {
qs.session.SetSafe(nil)
ids := make(map[string]int)
for _, t := range in {
wrote := qs.writeTriple(t)
if wrote {
ids[t.Subject]++
ids[t.Object]++
ids[t.Predicate]++
if t.Label != "" {
ids[t.Label]++
}
}
}
for k, v := range ids {
qs.updateNodeBy(k, v)
}
qs.session.SetSafe(&mgo.Safe{})
}
func (qs *TripleStore) RemoveTriple(t quad.Quad) {
err := qs.db.C("triples").RemoveId(qs.getIdForTriple(t))
if err == mgo.ErrNotFound {
return
} else if err != nil {
glog.Errorf("Error: %v while removing triple %v", err, t)
return
}
qs.updateNodeBy(t.Subject, -1)
qs.updateNodeBy(t.Predicate, -1)
qs.updateNodeBy(t.Object, -1)
if t.Label != "" {
qs.updateNodeBy(t.Label, -1)
}
}
func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
var bsonDoc bson.M
err := qs.db.C("triples").FindId(val.(string)).One(&bsonDoc)
if err != nil {
glog.Errorf("Error: Couldn't retrieve triple %s %v", val, err)
}
return quad.Quad{
bsonDoc["Subject"].(string),
bsonDoc["Predicate"].(string),
bsonDoc["Object"].(string),
bsonDoc["Label"].(string),
}
}
func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator {
return NewIterator(qs, "triples", d, val)
}
func (qs *TripleStore) NodesAllIterator() graph.Iterator {
return NewAllIterator(qs, "nodes")
}
func (qs *TripleStore) TriplesAllIterator() graph.Iterator {
return NewAllIterator(qs, "triples")
}
func (qs *TripleStore) ValueOf(s string) graph.Value {
return qs.ConvertStringToByteHash(s)
}
func (qs *TripleStore) NameOf(v graph.Value) string {
val, ok := qs.idCache.Get(v.(string))
if ok {
return val
}
var node MongoNode
err := qs.db.C("nodes").FindId(v.(string)).One(&node)
if err != nil {
glog.Errorf("Error: Couldn't retrieve node %s %v", v, err)
}
qs.idCache.Put(v.(string), node.Name)
return node.Name
}
func (qs *TripleStore) Size() int64 {
count, err := qs.db.C("triples").Count()
if err != nil {
glog.Errorf("Error: %v", err)
return 0
}
return int64(count)
}
func compareStrings(a, b graph.Value) bool {
return a.(string) == b.(string)
}
func (qs *TripleStore) FixedIterator() graph.FixedIterator {
return iterator.NewFixedIteratorWithCompare(compareStrings)
}
func (qs *TripleStore) Close() {
qs.db.Session.Close()
}
func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.Value {
// Maybe do the trick here
var offset int
switch d {
case quad.Subject:
offset = 0
case quad.Predicate:
offset = (qs.hasher.Size() * 2)
case quad.Object:
offset = (qs.hasher.Size() * 2) * 2
case quad.Label:
offset = (qs.hasher.Size() * 2) * 3
}
val := in.(string)[offset : qs.hasher.Size()*2+offset]
return val
}
func (qs *TripleStore) BulkLoad(dec quad.Unmarshaler) error {
if qs.Size() != 0 {
return graph.ErrCannotBulkLoad
}
qs.session.SetSafe(nil)
for {
q, err := dec.Unmarshal()
if err != nil {
if err != io.EOF {
return err
}
break
}
qs.writeTriple(q)
}
outputTo := bson.M{"replace": "nodes", "sharded": true}
glog.Infoln("Mapreducing")
job := mgo.MapReduce{
Map: `function() {
var len = this["_id"].length
var s_key = this["_id"].slice(0, len / 4)
var p_key = this["_id"].slice(len / 4, 2 * len / 4)
var o_key = this["_id"].slice(2 * len / 4, 3 * len / 4)
var c_key = this["_id"].slice(3 * len / 4)
emit(s_key, {"_id": s_key, "Name" : this.Subject, "Size" : 1})
emit(p_key, {"_id": p_key, "Name" : this.Predicate, "Size" : 1})
emit(o_key, {"_id": o_key, "Name" : this.Object, "Size" : 1})
if (this.Label != "") {
emit(c_key, {"_id": c_key, "Name" : this.Label, "Size" : 1})
}
}
`,
Reduce: `
function(key, value_list) {
out = {"_id": key, "Name": value_list[0].Name}
count = 0
for (var i = 0; i < value_list.length; i++) {
count = count + value_list[i].Size
}
out["Size"] = count
return out
}
`,
Out: outputTo,
}
qs.db.C("triples").Find(nil).MapReduce(&job, nil)
glog.Infoln("Fixing")
qs.db.Run(bson.D{{"eval", `function() { db.nodes.find().forEach(function (result) {
db.nodes.update({"_id": result._id}, result.value)
}) }`}, {"args", bson.D{}}}, nil)
qs.session.SetSafe(&mgo.Safe{})
return nil
}