Reorganize to go get will work
This makes almost no changes to source, but touches every almost file. Also fixes error in gremlin test code.
This commit is contained in:
parent
e46a5bbe4a
commit
e0df752618
130 changed files with 8766 additions and 10167 deletions
62
graph/mongo/lru.go
Normal file
62
graph/mongo/lru.go
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
// Copyright 2014 The Cayley Authors. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mongo
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
)
|
||||
|
||||
type IDLru struct {
|
||||
cache map[string]*list.Element
|
||||
priority *list.List
|
||||
maxSize int
|
||||
}
|
||||
|
||||
type KV struct {
|
||||
key string
|
||||
value string
|
||||
}
|
||||
|
||||
func NewIDLru(size int) *IDLru {
|
||||
var lru IDLru
|
||||
lru.maxSize = size
|
||||
lru.priority = list.New()
|
||||
lru.cache = make(map[string]*list.Element)
|
||||
return &lru
|
||||
}
|
||||
|
||||
func (lru *IDLru) Put(key string, value string) {
|
||||
if _, ok := lru.Get(key); ok {
|
||||
return
|
||||
}
|
||||
if len(lru.cache) == lru.maxSize {
|
||||
lru.removeOldest()
|
||||
}
|
||||
lru.priority.PushFront(KV{key: key, value: value})
|
||||
lru.cache[key] = lru.priority.Front()
|
||||
}
|
||||
|
||||
func (lru *IDLru) Get(key string) (string, bool) {
|
||||
if element, ok := lru.cache[key]; ok {
|
||||
lru.priority.MoveToFront(element)
|
||||
return element.Value.(KV).value, true
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func (lru *IDLru) removeOldest() {
|
||||
last := lru.priority.Remove(lru.priority.Back())
|
||||
delete(lru.cache, last.(KV).key)
|
||||
}
|
||||
181
graph/mongo/mongo-iterator.go
Normal file
181
graph/mongo/mongo-iterator.go
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
// Copyright 2014 The Cayley Authors. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mongo
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/barakmich/glog"
|
||||
"labix.org/v2/mgo"
|
||||
"labix.org/v2/mgo/bson"
|
||||
|
||||
"github.com/google/cayley/graph"
|
||||
)
|
||||
|
||||
type MongoIterator struct {
|
||||
graph.BaseIterator
|
||||
ts *MongoTripleStore
|
||||
dir string
|
||||
iter *mgo.Iter
|
||||
hash string
|
||||
name string
|
||||
size int64
|
||||
isAll bool
|
||||
constraint bson.M
|
||||
collection string
|
||||
}
|
||||
|
||||
func NewMongoIterator(ts *MongoTripleStore, collection string, dir string, val graph.TSVal) *MongoIterator {
|
||||
var m MongoIterator
|
||||
graph.BaseIteratorInit(&m.BaseIterator)
|
||||
|
||||
m.name = ts.GetNameFor(val)
|
||||
m.collection = collection
|
||||
switch dir {
|
||||
|
||||
case "s":
|
||||
m.constraint = bson.M{"Sub": m.name}
|
||||
case "p":
|
||||
m.constraint = bson.M{"Pred": m.name}
|
||||
case "o":
|
||||
m.constraint = bson.M{"Obj": m.name}
|
||||
case "c":
|
||||
m.constraint = bson.M{"Provenance": m.name}
|
||||
}
|
||||
|
||||
m.ts = ts
|
||||
m.dir = dir
|
||||
m.iter = ts.db.C(collection).Find(m.constraint).Iter()
|
||||
size, err := ts.db.C(collection).Find(m.constraint).Count()
|
||||
if err != nil {
|
||||
glog.Errorln("Trouble getting size for iterator! ", err)
|
||||
return nil
|
||||
}
|
||||
m.size = int64(size)
|
||||
m.hash = val.(string)
|
||||
m.isAll = false
|
||||
return &m
|
||||
}
|
||||
|
||||
func NewMongoAllIterator(ts *MongoTripleStore, collection string) *MongoIterator {
|
||||
var m MongoIterator
|
||||
m.ts = ts
|
||||
m.dir = "all"
|
||||
m.constraint = nil
|
||||
m.collection = collection
|
||||
m.iter = ts.db.C(collection).Find(nil).Iter()
|
||||
size, err := ts.db.C(collection).Count()
|
||||
if err != nil {
|
||||
glog.Errorln("Trouble getting size for iterator! ", err)
|
||||
return nil
|
||||
}
|
||||
m.size = int64(size)
|
||||
m.hash = ""
|
||||
m.isAll = true
|
||||
return &m
|
||||
}
|
||||
|
||||
func (m *MongoIterator) Reset() {
|
||||
m.iter.Close()
|
||||
m.iter = m.ts.db.C(m.collection).Find(m.constraint).Iter()
|
||||
|
||||
}
|
||||
|
||||
func (m *MongoIterator) Close() {
|
||||
m.iter.Close()
|
||||
}
|
||||
|
||||
func (m *MongoIterator) Clone() graph.Iterator {
|
||||
var newM graph.Iterator
|
||||
if m.isAll {
|
||||
newM = NewMongoAllIterator(m.ts, m.collection)
|
||||
} else {
|
||||
newM = NewMongoIterator(m.ts, m.collection, m.dir, m.hash)
|
||||
}
|
||||
newM.CopyTagsFrom(m)
|
||||
return newM
|
||||
}
|
||||
|
||||
func (m *MongoIterator) Next() (graph.TSVal, bool) {
|
||||
var result struct {
|
||||
Id string "_id"
|
||||
//Sub string "Sub"
|
||||
//Pred string "Pred"
|
||||
//Obj string "Obj"
|
||||
}
|
||||
found := m.iter.Next(&result)
|
||||
if !found {
|
||||
err := m.iter.Err()
|
||||
if err != nil {
|
||||
glog.Errorln("Error Nexting MongoIterator: ", err)
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
m.Last = result.Id
|
||||
return result.Id, true
|
||||
}
|
||||
|
||||
func (m *MongoIterator) Check(v graph.TSVal) bool {
|
||||
graph.CheckLogIn(m, v)
|
||||
if m.isAll {
|
||||
m.Last = v
|
||||
return graph.CheckLogOut(m, v, true)
|
||||
}
|
||||
var offset int
|
||||
switch m.dir {
|
||||
case "s":
|
||||
offset = 0
|
||||
case "p":
|
||||
offset = (m.ts.hasher.Size() * 2)
|
||||
case "o":
|
||||
offset = (m.ts.hasher.Size() * 2) * 2
|
||||
case "c":
|
||||
offset = (m.ts.hasher.Size() * 2) * 3
|
||||
}
|
||||
val := v.(string)[offset : m.ts.hasher.Size()*2+offset]
|
||||
if val == m.hash {
|
||||
m.Last = v
|
||||
return graph.CheckLogOut(m, v, true)
|
||||
}
|
||||
return graph.CheckLogOut(m, v, false)
|
||||
}
|
||||
|
||||
func (m *MongoIterator) Size() (int64, bool) {
|
||||
return m.size, true
|
||||
}
|
||||
|
||||
func (m *MongoIterator) Type() string {
|
||||
if m.isAll {
|
||||
return "all"
|
||||
}
|
||||
return "mongo"
|
||||
}
|
||||
func (m *MongoIterator) Sorted() bool { return true }
|
||||
func (m *MongoIterator) Optimize() (graph.Iterator, bool) { return m, false }
|
||||
|
||||
func (m *MongoIterator) DebugString(indent int) string {
|
||||
size, _ := m.Size()
|
||||
return fmt.Sprintf("%s(%s size:%d %s %s)", strings.Repeat(" ", indent), m.Type(), size, m.hash, m.name)
|
||||
}
|
||||
|
||||
func (m *MongoIterator) GetStats() *graph.IteratorStats {
|
||||
size, _ := m.Size()
|
||||
return &graph.IteratorStats{
|
||||
CheckCost: 1,
|
||||
NextCost: 5,
|
||||
Size: size,
|
||||
}
|
||||
}
|
||||
53
graph/mongo/mongo-triplestore-iterator-optimize.go
Normal file
53
graph/mongo/mongo-triplestore-iterator-optimize.go
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
// Copyright 2014 The Cayley Authors. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mongo
|
||||
|
||||
import (
|
||||
"github.com/google/cayley/graph"
|
||||
)
|
||||
|
||||
func (ts *MongoTripleStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) {
|
||||
switch it.Type() {
|
||||
case "linksto":
|
||||
return ts.optimizeLinksTo(it.(*graph.LinksToIterator))
|
||||
|
||||
}
|
||||
return it, false
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) optimizeLinksTo(it *graph.LinksToIterator) (graph.Iterator, bool) {
|
||||
l := it.GetSubIterators()
|
||||
if l.Len() != 1 {
|
||||
return it, false
|
||||
}
|
||||
primaryIt := l.Front().Value.(graph.Iterator)
|
||||
if primaryIt.Type() == "fixed" {
|
||||
size, _ := primaryIt.Size()
|
||||
if size == 1 {
|
||||
val, ok := primaryIt.Next()
|
||||
if !ok {
|
||||
panic("Sizes lie")
|
||||
}
|
||||
newIt := ts.GetTripleIterator(it.Direction(), val)
|
||||
newIt.CopyTagsFrom(it)
|
||||
for _, tag := range primaryIt.Tags() {
|
||||
newIt.AddFixedTag(tag, val)
|
||||
}
|
||||
it.Close()
|
||||
return newIt, true
|
||||
}
|
||||
}
|
||||
return it, false
|
||||
}
|
||||
329
graph/mongo/mongo-triplestore.go
Normal file
329
graph/mongo/mongo-triplestore.go
Normal file
|
|
@ -0,0 +1,329 @@
|
|||
// Copyright 2014 The Cayley Authors. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mongo
|
||||
|
||||
import (
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"hash"
|
||||
"log"
|
||||
|
||||
"labix.org/v2/mgo"
|
||||
"labix.org/v2/mgo/bson"
|
||||
|
||||
"github.com/barakmich/glog"
|
||||
"github.com/google/cayley/graph"
|
||||
)
|
||||
|
||||
const DefaultDBName = "cayley"
|
||||
|
||||
type MongoTripleStore struct {
|
||||
session *mgo.Session
|
||||
db *mgo.Database
|
||||
hasher hash.Hash
|
||||
idCache *IDLru
|
||||
}
|
||||
|
||||
func CreateNewMongoGraph(addr string, options graph.OptionsDict) bool {
|
||||
conn, err := mgo.Dial(addr)
|
||||
if err != nil {
|
||||
glog.Fatal("Error connecting: ", err)
|
||||
return false
|
||||
}
|
||||
conn.SetSafe(&mgo.Safe{})
|
||||
dbName := DefaultDBName
|
||||
if val, ok := options.GetStringKey("database_name"); ok {
|
||||
dbName = val
|
||||
}
|
||||
db := conn.DB(dbName)
|
||||
indexOpts := mgo.Index{
|
||||
Key: []string{"Sub"},
|
||||
Unique: false,
|
||||
DropDups: false,
|
||||
Background: true,
|
||||
Sparse: true,
|
||||
}
|
||||
db.C("triples").EnsureIndex(indexOpts)
|
||||
indexOpts.Key = []string{"Pred"}
|
||||
db.C("triples").EnsureIndex(indexOpts)
|
||||
indexOpts.Key = []string{"Obj"}
|
||||
db.C("triples").EnsureIndex(indexOpts)
|
||||
indexOpts.Key = []string{"Provenance"}
|
||||
db.C("triples").EnsureIndex(indexOpts)
|
||||
return true
|
||||
}
|
||||
|
||||
func NewMongoTripleStore(addr string, options graph.OptionsDict) *MongoTripleStore {
|
||||
var ts MongoTripleStore
|
||||
conn, err := mgo.Dial(addr)
|
||||
if err != nil {
|
||||
glog.Fatal("Error connecting: ", err)
|
||||
}
|
||||
conn.SetSafe(&mgo.Safe{})
|
||||
dbName := DefaultDBName
|
||||
if val, ok := options.GetStringKey("database_name"); ok {
|
||||
dbName = val
|
||||
}
|
||||
ts.db = conn.DB(dbName)
|
||||
ts.session = conn
|
||||
ts.hasher = sha1.New()
|
||||
ts.idCache = NewIDLru(1 << 16)
|
||||
return &ts
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) getIdForTriple(t *graph.Triple) string {
|
||||
id := ts.ConvertStringToByteHash(t.Sub)
|
||||
id += ts.ConvertStringToByteHash(t.Pred)
|
||||
id += ts.ConvertStringToByteHash(t.Obj)
|
||||
id += ts.ConvertStringToByteHash(t.Provenance)
|
||||
return id
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) ConvertStringToByteHash(s string) string {
|
||||
ts.hasher.Reset()
|
||||
key := make([]byte, 0, ts.hasher.Size())
|
||||
ts.hasher.Write([]byte(s))
|
||||
key = ts.hasher.Sum(key)
|
||||
return hex.EncodeToString(key)
|
||||
}
|
||||
|
||||
type MongoNode struct {
|
||||
Id string "_id"
|
||||
Name string "Name"
|
||||
Size int "Size"
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) updateNodeBy(node_name string, inc int) {
|
||||
var size MongoNode
|
||||
node := ts.GetIdFor(node_name)
|
||||
err := ts.db.C("nodes").FindId(node).One(&size)
|
||||
if err != nil {
|
||||
if err.Error() == "not found" {
|
||||
// Not found. Okay.
|
||||
size.Id = node.(string)
|
||||
size.Name = node_name
|
||||
size.Size = inc
|
||||
} else {
|
||||
glog.Error("Error:", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
size.Id = node.(string)
|
||||
size.Name = node_name
|
||||
size.Size += inc
|
||||
}
|
||||
|
||||
// Removing something...
|
||||
if inc < 0 {
|
||||
if size.Size <= 0 {
|
||||
err := ts.db.C("nodes").RemoveId(node)
|
||||
if err != nil {
|
||||
glog.Error("Error: ", err, " while removing node ", node_name)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err2 := ts.db.C("nodes").UpsertId(node, size)
|
||||
if err2 != nil {
|
||||
glog.Error("Error: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) writeTriple(t *graph.Triple) bool {
|
||||
tripledoc := bson.M{"_id": ts.getIdForTriple(t), "Sub": t.Sub, "Pred": t.Pred, "Obj": t.Obj, "Provenance": t.Provenance}
|
||||
err := ts.db.C("triples").Insert(tripledoc)
|
||||
if err != nil {
|
||||
// Among the reasons I hate MongoDB. "Errors don't happen! Right guys?"
|
||||
if err.(*mgo.LastError).Code == 11000 {
|
||||
return false
|
||||
}
|
||||
glog.Error("Error: ", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) AddTriple(t *graph.Triple) {
|
||||
_ = ts.writeTriple(t)
|
||||
ts.updateNodeBy(t.Sub, 1)
|
||||
ts.updateNodeBy(t.Pred, 1)
|
||||
ts.updateNodeBy(t.Obj, 1)
|
||||
if t.Provenance != "" {
|
||||
ts.updateNodeBy(t.Provenance, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) AddTripleSet(in []*graph.Triple) {
|
||||
ts.session.SetSafe(nil)
|
||||
idMap := make(map[string]int)
|
||||
for _, t := range in {
|
||||
wrote := ts.writeTriple(t)
|
||||
if wrote {
|
||||
idMap[t.Sub]++
|
||||
idMap[t.Obj]++
|
||||
idMap[t.Pred]++
|
||||
if t.Provenance != "" {
|
||||
idMap[t.Provenance]++
|
||||
}
|
||||
}
|
||||
}
|
||||
for k, v := range idMap {
|
||||
ts.updateNodeBy(k, v)
|
||||
}
|
||||
ts.session.SetSafe(&mgo.Safe{})
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) RemoveTriple(t *graph.Triple) {
|
||||
err := ts.db.C("triples").RemoveId(ts.getIdForTriple(t))
|
||||
if err == mgo.ErrNotFound {
|
||||
return
|
||||
} else if err != nil {
|
||||
log.Println("Error: ", err, " while removing triple ", t)
|
||||
return
|
||||
}
|
||||
ts.updateNodeBy(t.Sub, -1)
|
||||
ts.updateNodeBy(t.Pred, -1)
|
||||
ts.updateNodeBy(t.Obj, -1)
|
||||
if t.Provenance != "" {
|
||||
ts.updateNodeBy(t.Provenance, -1)
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) GetTriple(val graph.TSVal) *graph.Triple {
|
||||
var bsonDoc bson.M
|
||||
err := ts.db.C("triples").FindId(val.(string)).One(&bsonDoc)
|
||||
if err != nil {
|
||||
log.Println("Error: Couldn't retrieve triple", val.(string), err)
|
||||
}
|
||||
return graph.MakeTriple(
|
||||
bsonDoc["Sub"].(string),
|
||||
bsonDoc["Pred"].(string),
|
||||
bsonDoc["Obj"].(string),
|
||||
bsonDoc["Provenance"].(string))
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) GetTripleIterator(dir string, val graph.TSVal) graph.Iterator {
|
||||
return NewMongoIterator(ts, "triples", dir, val)
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) GetNodesAllIterator() graph.Iterator {
|
||||
return NewMongoAllIterator(ts, "nodes")
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) GetTriplesAllIterator() graph.Iterator {
|
||||
return NewMongoAllIterator(ts, "triples")
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) GetIdFor(s string) graph.TSVal {
|
||||
return ts.ConvertStringToByteHash(s)
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) GetNameFor(v graph.TSVal) string {
|
||||
val, ok := ts.idCache.Get(v.(string))
|
||||
if ok {
|
||||
return val
|
||||
}
|
||||
var node MongoNode
|
||||
err := ts.db.C("nodes").FindId(v.(string)).One(&node)
|
||||
if err != nil {
|
||||
log.Println("Error: Couldn't retrieve node", v.(string), err)
|
||||
}
|
||||
ts.idCache.Put(v.(string), node.Name)
|
||||
return node.Name
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) Size() int64 {
|
||||
count, err := ts.db.C("triples").Count()
|
||||
if err != nil {
|
||||
glog.Error("Error: ", err)
|
||||
return 0
|
||||
}
|
||||
return int64(count)
|
||||
}
|
||||
|
||||
func compareStrings(a, b graph.TSVal) bool {
|
||||
return a.(string) == b.(string)
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) MakeFixed() *graph.FixedIterator {
|
||||
return graph.NewFixedIteratorWithCompare(compareStrings)
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) Close() {
|
||||
ts.db.Session.Close()
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) GetTripleDirection(in graph.TSVal, dir string) graph.TSVal {
|
||||
// Maybe do the trick here
|
||||
var offset int
|
||||
switch dir {
|
||||
case "s":
|
||||
offset = 0
|
||||
case "p":
|
||||
offset = (ts.hasher.Size() * 2)
|
||||
case "o":
|
||||
offset = (ts.hasher.Size() * 2) * 2
|
||||
case "c":
|
||||
offset = (ts.hasher.Size() * 2) * 3
|
||||
}
|
||||
val := in.(string)[offset : ts.hasher.Size()*2+offset]
|
||||
return val
|
||||
}
|
||||
|
||||
func (ts *MongoTripleStore) BulkLoad(t_chan chan *graph.Triple) {
|
||||
ts.session.SetSafe(nil)
|
||||
for triple := range t_chan {
|
||||
ts.writeTriple(triple)
|
||||
}
|
||||
outputTo := bson.M{"replace": "nodes", "sharded": true}
|
||||
glog.Infoln("Mapreducing")
|
||||
job := mgo.MapReduce{
|
||||
Map: `function() {
|
||||
var len = this["_id"].length
|
||||
var s_key = this["_id"].slice(0, len / 4)
|
||||
var p_key = this["_id"].slice(len / 4, 2 * len / 4)
|
||||
var o_key = this["_id"].slice(2 * len / 4, 3 * len / 4)
|
||||
var c_key = this["_id"].slice(3 * len / 4)
|
||||
emit(s_key, {"_id": s_key, "Name" : this.Sub, "Size" : 1})
|
||||
emit(p_key, {"_id": p_key, "Name" : this.Pred, "Size" : 1})
|
||||
emit(o_key, {"_id": o_key, "Name" : this.Obj, "Size" : 1})
|
||||
if (this.Provenance != "") {
|
||||
emit(c_key, {"_id": c_key, "Name" : this.Provenance, "Size" : 1})
|
||||
}
|
||||
}
|
||||
`,
|
||||
Reduce: `
|
||||
function(key, value_list) {
|
||||
out = {"_id": key, "Name": value_list[0].Name}
|
||||
count = 0
|
||||
for (var i = 0; i < value_list.length; i++) {
|
||||
count = count + value_list[i].Size
|
||||
|
||||
}
|
||||
out["Size"] = count
|
||||
return out
|
||||
}
|
||||
`,
|
||||
Out: outputTo,
|
||||
}
|
||||
ts.db.C("triples").Find(nil).MapReduce(&job, nil)
|
||||
glog.Infoln("Fixing")
|
||||
ts.db.Run(bson.D{{"eval", `function() { db.nodes.find().forEach(function (result) {
|
||||
db.nodes.update({"_id": result._id}, result.value)
|
||||
}) }`}, {"args", bson.D{}}}, nil)
|
||||
|
||||
ts.session.SetSafe(&mgo.Safe{})
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue