Merge branch 'log_database' into b
Comparison of b against GoLLRB (as at d5f020). $ benchcmp gollrb.bench b-gen.bench benchmark old ns/op new ns/op delta BenchmarkNamePredicate 1631932 1409531 -13.63% BenchmarkLargeSetsNoIntersection 190792654 63748682 -66.59% BenchmarkVeryLargeSetsSmallIntersection 896154437 373475843 -58.32% BenchmarkHelplessContainsChecker 20719182678 14078301640 -32.05% BenchmarkNetAndSpeed 32519019 20188665 -37.92% BenchmarkKeanuAndNet 18319247 15224988 -16.89% BenchmarkKeanuAndSpeed 30849568 18744134 -39.24% BenchmarkKeanuOther 105552525 107620648 +1.96% BenchmarkKeanuBullockOther 295395338 115193002 -61.00% benchmark old allocs new allocs delta BenchmarkNamePredicate 1339 1341 +0.15% BenchmarkLargeSetsNoIntersection 22585 23632 +4.64% BenchmarkVeryLargeSetsSmallIntersection 65776 69396 +5.50% BenchmarkHelplessContainsChecker 1713541 2036316 +18.84% BenchmarkNetAndSpeed 17104 17240 +0.80% BenchmarkKeanuAndNet 15816 15855 +0.25% BenchmarkKeanuAndSpeed 16368 16493 +0.76% BenchmarkKeanuOther 30134 30634 +1.66% BenchmarkKeanuBullockOther 35510 36454 +2.66% benchmark old bytes new bytes delta BenchmarkNamePredicate 96162 96294 +0.14% BenchmarkLargeSetsNoIntersection 1172356 1249872 +6.61% BenchmarkVeryLargeSetsSmallIntersection 2810080 2992409 +6.49% BenchmarkHelplessContainsChecker 89233264 104999088 +17.67% BenchmarkNetAndSpeed 1388793 1428110 +2.83% BenchmarkKeanuAndNet 1263145 1250079 -1.03% BenchmarkKeanuAndSpeed 1246956 1281546 +2.77% BenchmarkKeanuOther 2021312 2024727 +0.17% BenchmarkKeanuBullockOther 2671448 2742968 +2.68% Conflicts: graph/memstore/triplestore.go
This commit is contained in:
commit
c618e556f4
12 changed files with 113 additions and 125 deletions
|
|
@ -1,8 +1,8 @@
|
|||
language: go
|
||||
|
||||
go:
|
||||
- 1.2
|
||||
- 1.3
|
||||
- 1.3.1
|
||||
- tip
|
||||
|
||||
install:
|
||||
|
|
|
|||
|
|
@ -202,8 +202,7 @@ func (it *HasA) Next() bool {
|
|||
return graph.NextLogOut(it, 0, false)
|
||||
}
|
||||
tID := it.primaryIt.Result()
|
||||
name := it.ts.Quad(tID).Get(it.dir)
|
||||
val := it.ts.ValueOf(name)
|
||||
val := it.ts.TripleDirection(tID, it.dir)
|
||||
it.result = val
|
||||
return graph.NextLogOut(it, val, true)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ func (qs *store) ValueOf(s string) graph.Value {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (qs *store) ApplyDeltas([]*graph.Delta) error { return nil }
|
||||
func (qs *store) ApplyDeltas([]graph.Delta) error { return nil }
|
||||
|
||||
func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} }
|
||||
|
||||
|
|
|
|||
|
|
@ -43,9 +43,9 @@ type Iterator struct {
|
|||
result graph.Value
|
||||
}
|
||||
|
||||
func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) graph.Iterator {
|
||||
func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) *Iterator {
|
||||
vb := value.(Token)
|
||||
p := make([]byte, 0, 2+qs.hasher.Size())
|
||||
p := make([]byte, 0, 2+qs.hasherSize)
|
||||
p = append(p, []byte(prefix)...)
|
||||
p = append(p, []byte(vb[1:])...)
|
||||
|
||||
|
|
@ -70,7 +70,6 @@ func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleS
|
|||
it.open = false
|
||||
it.iter.Release()
|
||||
glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix)
|
||||
return &iterator.Null{}
|
||||
}
|
||||
|
||||
return &it
|
||||
|
|
@ -108,7 +107,7 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) {
|
|||
|
||||
func (it *Iterator) Clone() graph.Iterator {
|
||||
out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs)
|
||||
out.Tagger().CopyFrom(it)
|
||||
out.tags.CopyFrom(it)
|
||||
return out
|
||||
}
|
||||
|
||||
|
|
@ -180,45 +179,45 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int {
|
|||
case quad.Subject:
|
||||
return 2
|
||||
case quad.Predicate:
|
||||
return qs.hasher.Size() + 2
|
||||
return qs.hasherSize + 2
|
||||
case quad.Object:
|
||||
return 2*qs.hasher.Size() + 2
|
||||
return 2*qs.hasherSize + 2
|
||||
case quad.Label:
|
||||
return 3*qs.hasher.Size() + 2
|
||||
return 3*qs.hasherSize + 2
|
||||
}
|
||||
}
|
||||
if bytes.Equal(prefix, []byte("po")) {
|
||||
switch d {
|
||||
case quad.Subject:
|
||||
return 2*qs.hasher.Size() + 2
|
||||
return 2*qs.hasherSize + 2
|
||||
case quad.Predicate:
|
||||
return 2
|
||||
case quad.Object:
|
||||
return qs.hasher.Size() + 2
|
||||
return qs.hasherSize + 2
|
||||
case quad.Label:
|
||||
return 3*qs.hasher.Size() + 2
|
||||
return 3*qs.hasherSize + 2
|
||||
}
|
||||
}
|
||||
if bytes.Equal(prefix, []byte("os")) {
|
||||
switch d {
|
||||
case quad.Subject:
|
||||
return qs.hasher.Size() + 2
|
||||
return qs.hasherSize + 2
|
||||
case quad.Predicate:
|
||||
return 2*qs.hasher.Size() + 2
|
||||
return 2*qs.hasherSize + 2
|
||||
case quad.Object:
|
||||
return 2
|
||||
case quad.Label:
|
||||
return 3*qs.hasher.Size() + 2
|
||||
return 3*qs.hasherSize + 2
|
||||
}
|
||||
}
|
||||
if bytes.Equal(prefix, []byte("cp")) {
|
||||
switch d {
|
||||
case quad.Subject:
|
||||
return 2*qs.hasher.Size() + 2
|
||||
return 2*qs.hasherSize + 2
|
||||
case quad.Predicate:
|
||||
return qs.hasher.Size() + 2
|
||||
return qs.hasherSize + 2
|
||||
case quad.Object:
|
||||
return 3*qs.hasher.Size() + 2
|
||||
return 3*qs.hasherSize + 2
|
||||
case quad.Label:
|
||||
return 2
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,15 +50,17 @@ func (t Token) Key() interface{} {
|
|||
}
|
||||
|
||||
type TripleStore struct {
|
||||
dbOpts *opt.Options
|
||||
db *leveldb.DB
|
||||
path string
|
||||
open bool
|
||||
size int64
|
||||
horizon int64
|
||||
hasher hash.Hash
|
||||
writeopts *opt.WriteOptions
|
||||
readopts *opt.ReadOptions
|
||||
dbOpts *opt.Options
|
||||
db *leveldb.DB
|
||||
path string
|
||||
open bool
|
||||
size int64
|
||||
horizon int64
|
||||
hasher hash.Hash
|
||||
hasherSize int
|
||||
makeHasher func() hash.Hash
|
||||
writeopts *opt.WriteOptions
|
||||
readopts *opt.ReadOptions
|
||||
}
|
||||
|
||||
func createNewLevelDB(path string, _ graph.Options) error {
|
||||
|
|
@ -96,7 +98,8 @@ func newTripleStore(path string, options graph.Options) (graph.TripleStore, erro
|
|||
write_buffer_mb = val
|
||||
}
|
||||
qs.dbOpts.WriteBuffer = write_buffer_mb * opt.MiB
|
||||
qs.hasher = sha1.New()
|
||||
qs.hasherSize = sha1.Size
|
||||
qs.makeHasher = sha1.New
|
||||
qs.writeopts = &opt.WriteOptions{
|
||||
Sync: false,
|
||||
}
|
||||
|
|
@ -133,7 +136,7 @@ func (qs *TripleStore) Horizon() int64 {
|
|||
return qs.horizon
|
||||
}
|
||||
|
||||
func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte {
|
||||
func (qa *TripleStore) createDeltaKeyFor(d graph.Delta) []byte {
|
||||
key := make([]byte, 0, 19)
|
||||
key = append(key, 'd')
|
||||
key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...)
|
||||
|
|
@ -141,20 +144,22 @@ func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte {
|
|||
}
|
||||
|
||||
func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte {
|
||||
key := make([]byte, 0, 2+(qs.hasher.Size()*4))
|
||||
hasher := qs.makeHasher()
|
||||
key := make([]byte, 0, 2+(qs.hasherSize*3))
|
||||
// TODO(kortschak) Remove dependence on String() method.
|
||||
key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...)
|
||||
key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...)
|
||||
key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...)
|
||||
key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...)
|
||||
key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...)
|
||||
key = append(key, qs.convertStringToByteHash(triple.Get(d[0]), hasher)...)
|
||||
key = append(key, qs.convertStringToByteHash(triple.Get(d[1]), hasher)...)
|
||||
key = append(key, qs.convertStringToByteHash(triple.Get(d[2]), hasher)...)
|
||||
key = append(key, qs.convertStringToByteHash(triple.Get(d[3]), hasher)...)
|
||||
return key
|
||||
}
|
||||
|
||||
func (qs *TripleStore) createValueKeyFor(s string) []byte {
|
||||
key := make([]byte, 0, 1+qs.hasher.Size())
|
||||
hasher := qs.makeHasher()
|
||||
key := make([]byte, 0, 1+qs.hasherSize)
|
||||
key = append(key, []byte("z")...)
|
||||
key = append(key, qs.convertStringToByteHash(s)...)
|
||||
key = append(key, qs.convertStringToByteHash(s, hasher)...)
|
||||
return key
|
||||
}
|
||||
|
||||
|
|
@ -171,7 +176,7 @@ var (
|
|||
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
|
||||
)
|
||||
|
||||
func (qs *TripleStore) ApplyDeltas(deltas []*graph.Delta) error {
|
||||
func (qs *TripleStore) ApplyDeltas(deltas []graph.Delta) error {
|
||||
batch := &leveldb.Batch{}
|
||||
resizeMap := make(map[string]int64)
|
||||
size_change := int64(0)
|
||||
|
|
@ -341,11 +346,11 @@ func (qs *TripleStore) Quad(k graph.Value) quad.Quad {
|
|||
return triple
|
||||
}
|
||||
|
||||
func (qs *TripleStore) convertStringToByteHash(s string) []byte {
|
||||
qs.hasher.Reset()
|
||||
key := make([]byte, 0, qs.hasher.Size())
|
||||
qs.hasher.Write([]byte(s))
|
||||
key = qs.hasher.Sum(key)
|
||||
func (qs *TripleStore) convertStringToByteHash(s string, hasher hash.Hash) []byte {
|
||||
hasher.Reset()
|
||||
key := make([]byte, 0, qs.hasherSize)
|
||||
hasher.Write([]byte(s))
|
||||
key = hasher.Sum(key)
|
||||
return key
|
||||
}
|
||||
|
||||
|
|
@ -462,7 +467,7 @@ func (qs *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph.
|
|||
v := val.(Token)
|
||||
offset := PositionOf(v[0:2], d, qs)
|
||||
if offset != -1 {
|
||||
return Token(append([]byte("z"), v[offset:offset+qs.hasher.Size()]...))
|
||||
return Token(append([]byte("z"), v[offset:offset+qs.hasherSize]...))
|
||||
} else {
|
||||
return Token(qs.Quad(val).Get(d))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,17 +35,17 @@ func NewMemstoreNodesAllIterator(ts *TripleStore) *NodesAllIterator {
|
|||
}
|
||||
|
||||
// No subiterators.
|
||||
func (nit *NodesAllIterator) SubIterators() []graph.Iterator {
|
||||
func (it *NodesAllIterator) SubIterators() []graph.Iterator {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nit *NodesAllIterator) Next() bool {
|
||||
if !nit.Int64.Next() {
|
||||
func (it *NodesAllIterator) Next() bool {
|
||||
if !it.Int64.Next() {
|
||||
return false
|
||||
}
|
||||
_, ok := nit.ts.revIdMap[nit.Int64.Result().(int64)]
|
||||
_, ok := it.ts.revIdMap[it.Int64.Result().(int64)]
|
||||
if !ok {
|
||||
return nit.Next()
|
||||
return it.Next()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -94,7 +94,7 @@ func newTripleStore() *TripleStore {
|
|||
}
|
||||
}
|
||||
|
||||
func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error {
|
||||
func (ts *TripleStore) ApplyDeltas(deltas []graph.Delta) error {
|
||||
for _, d := range deltas {
|
||||
var err error
|
||||
if d.Action == graph.Add {
|
||||
|
|
@ -144,12 +144,12 @@ func (ts *TripleStore) indexOf(t quad.Quad) (int64, bool) {
|
|||
return 0, false
|
||||
}
|
||||
|
||||
func (ts *TripleStore) AddDelta(d *graph.Delta) error {
|
||||
func (ts *TripleStore) AddDelta(d graph.Delta) error {
|
||||
if _, exists := ts.indexOf(d.Quad); exists {
|
||||
return graph.ErrQuadExists
|
||||
}
|
||||
qid := ts.quadIdCounter
|
||||
ts.log = append(ts.log, LogEntry{Delta: *d})
|
||||
ts.log = append(ts.log, LogEntry{Delta: d})
|
||||
ts.size++
|
||||
ts.quadIdCounter++
|
||||
|
||||
|
|
@ -178,14 +178,14 @@ func (ts *TripleStore) AddDelta(d *graph.Delta) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ts *TripleStore) RemoveDelta(d *graph.Delta) error {
|
||||
func (ts *TripleStore) RemoveDelta(d graph.Delta) error {
|
||||
prevQuadID, exists := ts.indexOf(d.Quad)
|
||||
if !exists {
|
||||
return graph.ErrQuadNotExist
|
||||
}
|
||||
|
||||
quadID := ts.quadIdCounter
|
||||
ts.log = append(ts.log, LogEntry{Delta: *d})
|
||||
ts.log = append(ts.log, LogEntry{Delta: d})
|
||||
ts.log[prevQuadID].DeletedBy = quadID
|
||||
ts.size--
|
||||
ts.quadIdCounter++
|
||||
|
|
|
|||
|
|
@ -45,17 +45,7 @@ type Iterator struct {
|
|||
func NewIterator(qs *TripleStore, collection string, d quad.Direction, val graph.Value) *Iterator {
|
||||
name := qs.NameOf(val)
|
||||
|
||||
var constraint bson.M
|
||||
switch d {
|
||||
case quad.Subject:
|
||||
constraint = bson.M{"Subject": name}
|
||||
case quad.Predicate:
|
||||
constraint = bson.M{"Predicate": name}
|
||||
case quad.Object:
|
||||
constraint = bson.M{"Object": name}
|
||||
case quad.Label:
|
||||
constraint = bson.M{"Label": name}
|
||||
}
|
||||
constraint := bson.M{d.String(): name}
|
||||
|
||||
size, err := qs.db.C(collection).Find(constraint).Count()
|
||||
if err != nil {
|
||||
|
|
@ -187,13 +177,13 @@ func (it *Iterator) Contains(v graph.Value) bool {
|
|||
case quad.Subject:
|
||||
offset = 0
|
||||
case quad.Predicate:
|
||||
offset = (it.qs.hasher.Size() * 2)
|
||||
offset = (it.qs.hasherSize * 2)
|
||||
case quad.Object:
|
||||
offset = (it.qs.hasher.Size() * 2) * 2
|
||||
offset = (it.qs.hasherSize * 2) * 2
|
||||
case quad.Label:
|
||||
offset = (it.qs.hasher.Size() * 2) * 3
|
||||
offset = (it.qs.hasherSize * 2) * 3
|
||||
}
|
||||
val := v.(string)[offset : it.qs.hasher.Size()*2+offset]
|
||||
val := v.(string)[offset : it.qs.hasherSize*2+offset]
|
||||
if val == it.hash {
|
||||
it.result = v
|
||||
return graph.ContainsLogOut(it, v, true)
|
||||
|
|
|
|||
|
|
@ -35,10 +35,11 @@ func init() {
|
|||
const DefaultDBName = "cayley"
|
||||
|
||||
type TripleStore struct {
|
||||
session *mgo.Session
|
||||
db *mgo.Database
|
||||
hasher hash.Hash
|
||||
idCache *IDLru
|
||||
session *mgo.Session
|
||||
db *mgo.Database
|
||||
hasherSize int
|
||||
makeHasher func() hash.Hash
|
||||
idCache *IDLru
|
||||
}
|
||||
|
||||
func createNewMongoGraph(addr string, options graph.Options) error {
|
||||
|
|
@ -53,18 +54,18 @@ func createNewMongoGraph(addr string, options graph.Options) error {
|
|||
}
|
||||
db := conn.DB(dbName)
|
||||
indexOpts := mgo.Index{
|
||||
Key: []string{"Sub"},
|
||||
Key: []string{"subject"},
|
||||
Unique: false,
|
||||
DropDups: false,
|
||||
Background: true,
|
||||
Sparse: true,
|
||||
}
|
||||
db.C("quads").EnsureIndex(indexOpts)
|
||||
indexOpts.Key = []string{"Pred"}
|
||||
indexOpts.Key = []string{"predicate"}
|
||||
db.C("quads").EnsureIndex(indexOpts)
|
||||
indexOpts.Key = []string{"Obj"}
|
||||
indexOpts.Key = []string{"object"}
|
||||
db.C("quads").EnsureIndex(indexOpts)
|
||||
indexOpts.Key = []string{"Label"}
|
||||
indexOpts.Key = []string{"label"}
|
||||
db.C("quads").EnsureIndex(indexOpts)
|
||||
logOpts := mgo.Index{
|
||||
Key: []string{"LogID"},
|
||||
|
|
@ -90,24 +91,26 @@ func newTripleStore(addr string, options graph.Options) (graph.TripleStore, erro
|
|||
}
|
||||
qs.db = conn.DB(dbName)
|
||||
qs.session = conn
|
||||
qs.hasher = sha1.New()
|
||||
qs.hasherSize = sha1.Size
|
||||
qs.makeHasher = sha1.New
|
||||
qs.idCache = NewIDLru(1 << 16)
|
||||
return &qs, nil
|
||||
}
|
||||
|
||||
func (qs *TripleStore) getIdForTriple(t quad.Quad) string {
|
||||
id := qs.ConvertStringToByteHash(t.Subject)
|
||||
id += qs.ConvertStringToByteHash(t.Predicate)
|
||||
id += qs.ConvertStringToByteHash(t.Object)
|
||||
id += qs.ConvertStringToByteHash(t.Label)
|
||||
func (qs *TripleStore) getIdForQuad(t quad.Quad) string {
|
||||
hasher := qs.makeHasher()
|
||||
id := qs.convertStringToByteHash(t.Subject, hasher)
|
||||
id += qs.convertStringToByteHash(t.Predicate, hasher)
|
||||
id += qs.convertStringToByteHash(t.Object, hasher)
|
||||
id += qs.convertStringToByteHash(t.Label, hasher)
|
||||
return id
|
||||
}
|
||||
|
||||
func (qs *TripleStore) ConvertStringToByteHash(s string) string {
|
||||
qs.hasher.Reset()
|
||||
key := make([]byte, 0, qs.hasher.Size())
|
||||
qs.hasher.Write([]byte(s))
|
||||
key = qs.hasher.Sum(key)
|
||||
func (qs *TripleStore) convertStringToByteHash(s string, hasher hash.Hash) string {
|
||||
hasher.Reset()
|
||||
key := make([]byte, 0, qs.hasherSize)
|
||||
hasher.Write([]byte(s))
|
||||
key = hasher.Sum(key)
|
||||
return hex.EncodeToString(key)
|
||||
}
|
||||
|
||||
|
|
@ -144,26 +147,20 @@ func (qs *TripleStore) updateNodeBy(node_name string, inc int) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) error {
|
||||
func (qs *TripleStore) updateQuad(q quad.Quad, id int64, proc graph.Procedure) error {
|
||||
var setname string
|
||||
if proc == graph.Add {
|
||||
setname = "Added"
|
||||
} else if proc == graph.Delete {
|
||||
setname = "Deleted"
|
||||
}
|
||||
tripledoc := bson.M{
|
||||
"Subject": t.Subject,
|
||||
"Predicate": t.Predicate,
|
||||
"Object": t.Object,
|
||||
"Label": t.Label,
|
||||
}
|
||||
upsert := bson.M{
|
||||
"$setOnInsert": tripledoc,
|
||||
"$setOnInsert": q,
|
||||
"$push": bson.M{
|
||||
setname: id,
|
||||
},
|
||||
}
|
||||
_, err := qs.db.C("quads").UpsertId(qs.getIdForTriple(t), upsert)
|
||||
_, err := qs.db.C("quads").UpsertId(qs.getIdForQuad(q), upsert)
|
||||
if err != nil {
|
||||
glog.Errorf("Error: %v", err)
|
||||
}
|
||||
|
|
@ -189,7 +186,7 @@ func (qs *TripleStore) checkValid(key string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (qs *TripleStore) updateLog(d *graph.Delta) error {
|
||||
func (qs *TripleStore) updateLog(d graph.Delta) error {
|
||||
var action string
|
||||
if d.Action == graph.Add {
|
||||
action = "Add"
|
||||
|
|
@ -199,7 +196,7 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error {
|
|||
entry := MongoLogEntry{
|
||||
LogID: d.ID,
|
||||
Action: action,
|
||||
Key: qs.getIdForTriple(d.Quad),
|
||||
Key: qs.getIdForQuad(d.Quad),
|
||||
Timestamp: d.Timestamp.UnixNano(),
|
||||
}
|
||||
err := qs.db.C("log").Insert(entry)
|
||||
|
|
@ -209,12 +206,12 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error {
|
||||
func (qs *TripleStore) ApplyDeltas(in []graph.Delta) error {
|
||||
qs.session.SetSafe(nil)
|
||||
ids := make(map[string]int)
|
||||
// Pre-check the existence condition.
|
||||
for _, d := range in {
|
||||
key := qs.getIdForTriple(d.Quad)
|
||||
key := qs.getIdForQuad(d.Quad)
|
||||
switch d.Action {
|
||||
case graph.Add:
|
||||
if qs.checkValid(key) {
|
||||
|
|
@ -236,7 +233,7 @@ func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error {
|
|||
}
|
||||
}
|
||||
for _, d := range in {
|
||||
err := qs.updateTriple(d.Quad, d.ID, d.Action)
|
||||
err := qs.updateQuad(d.Quad, d.ID, d.Action)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -264,17 +261,12 @@ func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error {
|
|||
}
|
||||
|
||||
func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
|
||||
var bsonDoc bson.M
|
||||
err := qs.db.C("quads").FindId(val.(string)).One(&bsonDoc)
|
||||
var q quad.Quad
|
||||
err := qs.db.C("quads").FindId(val.(string)).One(&q)
|
||||
if err != nil {
|
||||
glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err)
|
||||
}
|
||||
return quad.Quad{
|
||||
bsonDoc["Subject"].(string),
|
||||
bsonDoc["Predicate"].(string),
|
||||
bsonDoc["Object"].(string),
|
||||
bsonDoc["Label"].(string),
|
||||
}
|
||||
return q
|
||||
}
|
||||
|
||||
func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator {
|
||||
|
|
@ -290,7 +282,8 @@ func (qs *TripleStore) TriplesAllIterator() graph.Iterator {
|
|||
}
|
||||
|
||||
func (qs *TripleStore) ValueOf(s string) graph.Value {
|
||||
return qs.ConvertStringToByteHash(s)
|
||||
h := qs.makeHasher()
|
||||
return qs.convertStringToByteHash(s, h)
|
||||
}
|
||||
|
||||
func (qs *TripleStore) NameOf(v graph.Value) string {
|
||||
|
|
@ -348,13 +341,13 @@ func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.V
|
|||
case quad.Subject:
|
||||
offset = 0
|
||||
case quad.Predicate:
|
||||
offset = (qs.hasher.Size() * 2)
|
||||
offset = (qs.hasherSize * 2)
|
||||
case quad.Object:
|
||||
offset = (qs.hasher.Size() * 2) * 2
|
||||
offset = (qs.hasherSize * 2) * 2
|
||||
case quad.Label:
|
||||
offset = (qs.hasher.Size() * 2) * 3
|
||||
offset = (qs.hasherSize * 2) * 3
|
||||
}
|
||||
val := in.(string)[offset : qs.hasher.Size()*2+offset]
|
||||
val := in.(string)[offset : qs.hasherSize*2+offset]
|
||||
return val
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -53,8 +53,10 @@ func (h *Handle) Close() {
|
|||
h.QuadWriter.Close()
|
||||
}
|
||||
|
||||
var ErrQuadExists = errors.New("Quad exists")
|
||||
var ErrQuadNotExist = errors.New("Quad doesn't exist")
|
||||
var (
|
||||
ErrQuadExists = errors.New("Quad exists")
|
||||
ErrQuadNotExist = errors.New("Quad doesn't exist")
|
||||
)
|
||||
|
||||
type QuadWriter interface {
|
||||
// Add a quad to the store.
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ type Value interface{}
|
|||
type TripleStore interface {
|
||||
// The only way in is through building a transaction, which
|
||||
// is done by a replication strategy.
|
||||
ApplyDeltas([]*Delta) error
|
||||
ApplyDeltas([]Delta) error
|
||||
|
||||
// Given an opaque token, returns the triple for that token from the store.
|
||||
Quad(Value) quad.Quad
|
||||
|
|
|
|||
|
|
@ -50,8 +50,8 @@ func (s *Single) AcquireNextID() int64 {
|
|||
}
|
||||
|
||||
func (s *Single) AddQuad(q quad.Quad) error {
|
||||
deltas := make([]*graph.Delta, 1)
|
||||
deltas[0] = &graph.Delta{
|
||||
deltas := make([]graph.Delta, 1)
|
||||
deltas[0] = graph.Delta{
|
||||
ID: s.AcquireNextID(),
|
||||
Quad: q,
|
||||
Action: graph.Add,
|
||||
|
|
@ -61,9 +61,9 @@ func (s *Single) AddQuad(q quad.Quad) error {
|
|||
}
|
||||
|
||||
func (s *Single) AddQuadSet(set []quad.Quad) error {
|
||||
deltas := make([]*graph.Delta, len(set))
|
||||
deltas := make([]graph.Delta, len(set))
|
||||
for i, q := range set {
|
||||
deltas[i] = &graph.Delta{
|
||||
deltas[i] = graph.Delta{
|
||||
ID: s.AcquireNextID(),
|
||||
Quad: q,
|
||||
Action: graph.Add,
|
||||
|
|
@ -75,8 +75,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
|
|||
}
|
||||
|
||||
func (s *Single) RemoveQuad(q quad.Quad) error {
|
||||
deltas := make([]*graph.Delta, 1)
|
||||
deltas[0] = &graph.Delta{
|
||||
deltas := make([]graph.Delta, 1)
|
||||
deltas[0] = graph.Delta{
|
||||
ID: s.AcquireNextID(),
|
||||
Quad: q,
|
||||
Action: graph.Delete,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue