From 12859a69a5a554866d5aa0af021284ca429904a2 Mon Sep 17 00:00:00 2001 From: kortschak Date: Wed, 27 Aug 2014 19:20:03 +0930 Subject: [PATCH] Rename triple* -> quad* files --- graph/leveldb/quadstore.go | 485 ++++++++++++++++++++++++ graph/leveldb/quadstore_iterator_optimize.go | 55 +++ graph/leveldb/triplestore.go | 485 ------------------------ graph/leveldb/triplestore_iterator_optimize.go | 55 --- graph/memstore/quadstore.go | 250 ++++++++++++ graph/memstore/quadstore_iterator_optimize.go | 55 +++ graph/memstore/quadstore_test.go | 198 ++++++++++ graph/memstore/triplestore.go | 250 ------------ graph/memstore/triplestore_iterator_optimize.go | 55 --- graph/memstore/triplestore_test.go | 198 ---------- graph/mongo/quadstore.go | 359 ++++++++++++++++++ graph/mongo/quadstore_iterator_optimize.go | 55 +++ graph/mongo/triplestore.go | 359 ------------------ graph/mongo/triplestore_iterator_optimize.go | 55 --- graph/quadstore.go | 193 ++++++++++ graph/triplestore.go | 193 ---------- 16 files changed, 1650 insertions(+), 1650 deletions(-) create mode 100644 graph/leveldb/quadstore.go create mode 100644 graph/leveldb/quadstore_iterator_optimize.go delete mode 100644 graph/leveldb/triplestore.go delete mode 100644 graph/leveldb/triplestore_iterator_optimize.go create mode 100644 graph/memstore/quadstore.go create mode 100644 graph/memstore/quadstore_iterator_optimize.go create mode 100644 graph/memstore/quadstore_test.go delete mode 100644 graph/memstore/triplestore.go delete mode 100644 graph/memstore/triplestore_iterator_optimize.go delete mode 100644 graph/memstore/triplestore_test.go create mode 100644 graph/mongo/quadstore.go create mode 100644 graph/mongo/quadstore_iterator_optimize.go delete mode 100644 graph/mongo/triplestore.go delete mode 100644 graph/mongo/triplestore_iterator_optimize.go create mode 100644 graph/quadstore.go delete mode 100644 graph/triplestore.go diff --git a/graph/leveldb/quadstore.go b/graph/leveldb/quadstore.go new file mode 100644 index 0000000..dee63f8 --- /dev/null +++ b/graph/leveldb/quadstore.go @@ -0,0 +1,485 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "bytes" + "crypto/sha1" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "hash" + "sync" + + "github.com/barakmich/glog" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/cache" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +func init() { + graph.RegisterTripleStore("leveldb", true, newTripleStore, createNewLevelDB) +} + +const ( + DefaultCacheSize = 2 + DefaultWriteBufferSize = 20 +) + +var ( + hashPool = sync.Pool{ + New: func() interface{} { return sha1.New() }, + } + hashSize = sha1.Size +) + +type Token []byte + +func (t Token) Key() interface{} { + return string(t) +} + +type TripleStore struct { + dbOpts *opt.Options + db *leveldb.DB + path string + open bool + size int64 + horizon int64 + writeopts *opt.WriteOptions + readopts *opt.ReadOptions +} + +func createNewLevelDB(path string, _ graph.Options) error { + opts := &opt.Options{} + db, err := leveldb.OpenFile(path, opts) + if err != nil { + glog.Errorf("Error: couldn't create database: %v", err) + return err + } + defer db.Close() + qs := &TripleStore{} + qs.db = db + qs.writeopts = &opt.WriteOptions{ + Sync: true, + } + qs.Close() + return nil +} + +func newTripleStore(path string, options graph.Options) (graph.TripleStore, error) { + var qs TripleStore + var err error + qs.path = path + cache_size := DefaultCacheSize + if val, ok := options.IntKey("cache_size_mb"); ok { + cache_size = val + } + qs.dbOpts = &opt.Options{ + BlockCache: cache.NewLRUCache(cache_size * opt.MiB), + } + qs.dbOpts.ErrorIfMissing = true + + write_buffer_mb := DefaultWriteBufferSize + if val, ok := options.IntKey("write_buffer_mb"); ok { + write_buffer_mb = val + } + qs.dbOpts.WriteBuffer = write_buffer_mb * opt.MiB + qs.writeopts = &opt.WriteOptions{ + Sync: false, + } + qs.readopts = &opt.ReadOptions{} + db, err := leveldb.OpenFile(qs.path, qs.dbOpts) + if err != nil { + glog.Errorln("Error, couldn't open! ", err) + return nil, err + } + qs.db = db + glog.Infoln(qs.GetStats()) + err = qs.getMetadata() + if err != nil { + return nil, err + } + return &qs, nil +} + +func (qs *TripleStore) GetStats() string { + out := "" + stats, err := qs.db.GetProperty("leveldb.stats") + if err == nil { + out += fmt.Sprintln("Stats: ", stats) + } + out += fmt.Sprintln("Size: ", qs.size) + return out +} + +func (qs *TripleStore) Size() int64 { + return qs.size +} + +func (qs *TripleStore) Horizon() int64 { + return qs.horizon +} + +func (qa *TripleStore) createDeltaKeyFor(d graph.Delta) []byte { + key := make([]byte, 0, 19) + key = append(key, 'd') + key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...) + return key +} + +func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { + key := make([]byte, 0, 2+(hashSize*3)) + // TODO(kortschak) Remove dependence on String() method. + key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) + return key +} + +func (qs *TripleStore) createValueKeyFor(s string) []byte { + key := make([]byte, 0, 1+hashSize) + key = append(key, []byte("z")...) + key = append(key, qs.convertStringToByteHash(s)...) + return key +} + +type IndexEntry struct { + quad.Quad + History []int64 +} + +// Short hand for direction permutations. +var ( + spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} + osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} + pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} + cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} +) + +func (qs *TripleStore) ApplyDeltas(deltas []graph.Delta) error { + batch := &leveldb.Batch{} + resizeMap := make(map[string]int64) + size_change := int64(0) + for _, d := range deltas { + bytes, err := json.Marshal(d) + if err != nil { + return err + } + batch.Put(qs.createDeltaKeyFor(d), bytes) + err = qs.buildQuadWrite(batch, d.Quad, d.ID, d.Action == graph.Add) + if err != nil { + return err + } + delta := int64(1) + if d.Action == graph.Delete { + delta = int64(-1) + } + resizeMap[d.Quad.Subject] += delta + resizeMap[d.Quad.Predicate] += delta + resizeMap[d.Quad.Object] += delta + if d.Quad.Label != "" { + resizeMap[d.Quad.Label] += delta + } + size_change += delta + qs.horizon = d.ID + } + for k, v := range resizeMap { + if v != 0 { + err := qs.UpdateValueKeyBy(k, v, batch) + if err != nil { + return err + } + } + } + err := qs.db.Write(batch, qs.writeopts) + if err != nil { + glog.Error("Couldn't write to DB for tripleset.") + return err + } + qs.size += size_change + return nil +} + +func (qs *TripleStore) buildQuadWrite(batch *leveldb.Batch, q quad.Quad, id int64, isAdd bool) error { + var entry IndexEntry + data, err := qs.db.Get(qs.createKeyFor(spo, q), qs.readopts) + if err != nil && err != leveldb.ErrNotFound { + glog.Error("Couldn't access DB to prepare index: ", err) + return err + } + if err == nil { + // We got something. + err = json.Unmarshal(data, &entry) + if err != nil { + return err + } + } else { + entry.Quad = q + } + entry.History = append(entry.History, id) + + if isAdd && len(entry.History)%2 == 0 { + glog.Error("Entry History is out of sync for", entry) + return errors.New("Odd index history") + } + + bytes, err := json.Marshal(entry) + if err != nil { + glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) + return err + } + batch.Put(qs.createKeyFor(spo, q), bytes) + batch.Put(qs.createKeyFor(osp, q), bytes) + batch.Put(qs.createKeyFor(pos, q), bytes) + if q.Get(quad.Label) != "" { + batch.Put(qs.createKeyFor(cps, q), bytes) + } + return nil +} + +type ValueData struct { + Name string + Size int64 +} + +func (qs *TripleStore) UpdateValueKeyBy(name string, amount int64, batch *leveldb.Batch) error { + value := &ValueData{name, amount} + key := qs.createValueKeyFor(name) + b, err := qs.db.Get(key, qs.readopts) + + // Error getting the node from the database. + if err != nil && err != leveldb.ErrNotFound { + glog.Errorf("Error reading Value %s from the DB.", name) + return err + } + + // Node exists in the database -- unmarshal and update. + if b != nil && err != leveldb.ErrNotFound { + err = json.Unmarshal(b, value) + if err != nil { + glog.Errorf("Error: couldn't reconstruct value: %v", err) + return err + } + value.Size += amount + } + + // Are we deleting something? + if value.Size <= 0 { + value.Size = 0 + } + + // Repackage and rewrite. + bytes, err := json.Marshal(&value) + if err != nil { + glog.Errorf("Couldn't write to buffer for value %s: %s", name, err) + return err + } + if batch == nil { + qs.db.Put(key, bytes, qs.writeopts) + } else { + batch.Put(key, bytes) + } + return nil +} + +func (qs *TripleStore) Close() { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, qs.size) + if err == nil { + werr := qs.db.Put([]byte("__size"), buf.Bytes(), qs.writeopts) + if werr != nil { + glog.Error("Couldn't write size before closing!") + } + } else { + glog.Errorf("Couldn't convert size before closing!") + } + buf.Reset() + err = binary.Write(buf, binary.LittleEndian, qs.horizon) + if err == nil { + werr := qs.db.Put([]byte("__horizon"), buf.Bytes(), qs.writeopts) + if werr != nil { + glog.Error("Couldn't write horizon before closing!") + } + } else { + glog.Errorf("Couldn't convert horizon before closing!") + } + qs.db.Close() + qs.open = false +} + +func (qs *TripleStore) Quad(k graph.Value) quad.Quad { + var triple quad.Quad + b, err := qs.db.Get(k.(Token), qs.readopts) + if err != nil && err != leveldb.ErrNotFound { + glog.Error("Error: couldn't get triple from DB.") + return quad.Quad{} + } + if err == leveldb.ErrNotFound { + // No harm, no foul. + return quad.Quad{} + } + err = json.Unmarshal(b, &triple) + if err != nil { + glog.Error("Error: couldn't reconstruct triple.") + return quad.Quad{} + } + return triple +} + +func (qs *TripleStore) convertStringToByteHash(s string) []byte { + h := hashPool.Get().(hash.Hash) + h.Reset() + defer hashPool.Put(h) + key := make([]byte, 0, hashSize) + h.Write([]byte(s)) + key = h.Sum(key) + return key +} + +func (qs *TripleStore) ValueOf(s string) graph.Value { + return Token(qs.createValueKeyFor(s)) +} + +func (qs *TripleStore) valueData(value_key []byte) ValueData { + var out ValueData + if glog.V(3) { + glog.V(3).Infof("%s %v", string(value_key[0]), value_key) + } + b, err := qs.db.Get(value_key, qs.readopts) + if err != nil && err != leveldb.ErrNotFound { + glog.Errorln("Error: couldn't get value from DB") + return out + } + if b != nil && err != leveldb.ErrNotFound { + err = json.Unmarshal(b, &out) + if err != nil { + glog.Errorln("Error: couldn't reconstruct value") + return ValueData{} + } + } + return out +} + +func (qs *TripleStore) NameOf(k graph.Value) string { + if k == nil { + glog.V(2).Info("k was nil") + return "" + } + return qs.valueData(k.(Token)).Name +} + +func (qs *TripleStore) SizeOf(k graph.Value) int64 { + if k == nil { + return 0 + } + return int64(qs.valueData(k.(Token)).Size) +} + +func (qs *TripleStore) getInt64ForKey(key string, empty int64) (int64, error) { + var out int64 + b, err := qs.db.Get([]byte(key), qs.readopts) + if err != nil && err != leveldb.ErrNotFound { + glog.Errorln("Couldn't read " + key + ": " + err.Error()) + return 0, err + } + if err == leveldb.ErrNotFound { + // Must be a new database. Cool + return empty, nil + } + buf := bytes.NewBuffer(b) + err = binary.Read(buf, binary.LittleEndian, &out) + if err != nil { + glog.Errorln("Error: couldn't parse", key) + return 0, err + } + return out, nil +} + +func (qs *TripleStore) getMetadata() error { + var err error + qs.size, err = qs.getInt64ForKey("__size", 0) + if err != nil { + return err + } + qs.horizon, err = qs.getInt64ForKey("__horizon", 0) + return err +} + +func (qs *TripleStore) SizeOfPrefix(pre []byte) (int64, error) { + limit := make([]byte, len(pre)) + copy(limit, pre) + end := len(limit) - 1 + limit[end]++ + ranges := make([]util.Range, 1) + ranges[0].Start = pre + ranges[0].Limit = limit + sizes, err := qs.db.SizeOf(ranges) + if err == nil { + return (int64(sizes[0]) >> 6) + 1, nil + } + return 0, nil +} + +func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { + var prefix string + switch d { + case quad.Subject: + prefix = "sp" + case quad.Predicate: + prefix = "po" + case quad.Object: + prefix = "os" + case quad.Label: + prefix = "cp" + default: + panic("unreachable " + d.String()) + } + return NewIterator(prefix, d, val, qs) +} + +func (qs *TripleStore) NodesAllIterator() graph.Iterator { + return NewAllIterator("z", quad.Any, qs) +} + +func (qs *TripleStore) TriplesAllIterator() graph.Iterator { + return NewAllIterator("po", quad.Predicate, qs) +} + +func (qs *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph.Value { + v := val.(Token) + offset := PositionOf(v[0:2], d, qs) + if offset != -1 { + return Token(append([]byte("z"), v[offset:offset+hashSize]...)) + } else { + return Token(qs.Quad(val).Get(d)) + } +} + +func compareBytes(a, b graph.Value) bool { + return bytes.Equal(a.(Token), b.(Token)) +} + +func (qs *TripleStore) FixedIterator() graph.FixedIterator { + return iterator.NewFixedIteratorWithCompare(compareBytes) +} diff --git a/graph/leveldb/quadstore_iterator_optimize.go b/graph/leveldb/quadstore_iterator_optimize.go new file mode 100644 index 0000000..31b7f7d --- /dev/null +++ b/graph/leveldb/quadstore_iterator_optimize.go @@ -0,0 +1,55 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" +) + +func (ts *TripleStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { + switch it.Type() { + case graph.LinksTo: + return ts.optimizeLinksTo(it.(*iterator.LinksTo)) + + } + return it, false +} + +func (ts *TripleStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { + subs := it.SubIterators() + if len(subs) != 1 { + return it, false + } + primary := subs[0] + if primary.Type() == graph.Fixed { + size, _ := primary.Size() + if size == 1 { + if !graph.Next(primary) { + panic("unexpected size during optimize") + } + val := primary.Result() + newIt := ts.TripleIterator(it.Direction(), val) + nt := newIt.Tagger() + nt.CopyFrom(it) + for _, tag := range primary.Tagger().Tags() { + nt.AddFixed(tag, val) + } + it.Close() + return newIt, true + } + } + return it, false +} diff --git a/graph/leveldb/triplestore.go b/graph/leveldb/triplestore.go deleted file mode 100644 index dee63f8..0000000 --- a/graph/leveldb/triplestore.go +++ /dev/null @@ -1,485 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package leveldb - -import ( - "bytes" - "crypto/sha1" - "encoding/binary" - "encoding/json" - "errors" - "fmt" - "hash" - "sync" - - "github.com/barakmich/glog" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/cache" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" - - "github.com/google/cayley/graph" - "github.com/google/cayley/graph/iterator" - "github.com/google/cayley/quad" -) - -func init() { - graph.RegisterTripleStore("leveldb", true, newTripleStore, createNewLevelDB) -} - -const ( - DefaultCacheSize = 2 - DefaultWriteBufferSize = 20 -) - -var ( - hashPool = sync.Pool{ - New: func() interface{} { return sha1.New() }, - } - hashSize = sha1.Size -) - -type Token []byte - -func (t Token) Key() interface{} { - return string(t) -} - -type TripleStore struct { - dbOpts *opt.Options - db *leveldb.DB - path string - open bool - size int64 - horizon int64 - writeopts *opt.WriteOptions - readopts *opt.ReadOptions -} - -func createNewLevelDB(path string, _ graph.Options) error { - opts := &opt.Options{} - db, err := leveldb.OpenFile(path, opts) - if err != nil { - glog.Errorf("Error: couldn't create database: %v", err) - return err - } - defer db.Close() - qs := &TripleStore{} - qs.db = db - qs.writeopts = &opt.WriteOptions{ - Sync: true, - } - qs.Close() - return nil -} - -func newTripleStore(path string, options graph.Options) (graph.TripleStore, error) { - var qs TripleStore - var err error - qs.path = path - cache_size := DefaultCacheSize - if val, ok := options.IntKey("cache_size_mb"); ok { - cache_size = val - } - qs.dbOpts = &opt.Options{ - BlockCache: cache.NewLRUCache(cache_size * opt.MiB), - } - qs.dbOpts.ErrorIfMissing = true - - write_buffer_mb := DefaultWriteBufferSize - if val, ok := options.IntKey("write_buffer_mb"); ok { - write_buffer_mb = val - } - qs.dbOpts.WriteBuffer = write_buffer_mb * opt.MiB - qs.writeopts = &opt.WriteOptions{ - Sync: false, - } - qs.readopts = &opt.ReadOptions{} - db, err := leveldb.OpenFile(qs.path, qs.dbOpts) - if err != nil { - glog.Errorln("Error, couldn't open! ", err) - return nil, err - } - qs.db = db - glog.Infoln(qs.GetStats()) - err = qs.getMetadata() - if err != nil { - return nil, err - } - return &qs, nil -} - -func (qs *TripleStore) GetStats() string { - out := "" - stats, err := qs.db.GetProperty("leveldb.stats") - if err == nil { - out += fmt.Sprintln("Stats: ", stats) - } - out += fmt.Sprintln("Size: ", qs.size) - return out -} - -func (qs *TripleStore) Size() int64 { - return qs.size -} - -func (qs *TripleStore) Horizon() int64 { - return qs.horizon -} - -func (qa *TripleStore) createDeltaKeyFor(d graph.Delta) []byte { - key := make([]byte, 0, 19) - key = append(key, 'd') - key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...) - return key -} - -func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { - key := make([]byte, 0, 2+(hashSize*3)) - // TODO(kortschak) Remove dependence on String() method. - key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) - return key -} - -func (qs *TripleStore) createValueKeyFor(s string) []byte { - key := make([]byte, 0, 1+hashSize) - key = append(key, []byte("z")...) - key = append(key, qs.convertStringToByteHash(s)...) - return key -} - -type IndexEntry struct { - quad.Quad - History []int64 -} - -// Short hand for direction permutations. -var ( - spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} - osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} - pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} - cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} -) - -func (qs *TripleStore) ApplyDeltas(deltas []graph.Delta) error { - batch := &leveldb.Batch{} - resizeMap := make(map[string]int64) - size_change := int64(0) - for _, d := range deltas { - bytes, err := json.Marshal(d) - if err != nil { - return err - } - batch.Put(qs.createDeltaKeyFor(d), bytes) - err = qs.buildQuadWrite(batch, d.Quad, d.ID, d.Action == graph.Add) - if err != nil { - return err - } - delta := int64(1) - if d.Action == graph.Delete { - delta = int64(-1) - } - resizeMap[d.Quad.Subject] += delta - resizeMap[d.Quad.Predicate] += delta - resizeMap[d.Quad.Object] += delta - if d.Quad.Label != "" { - resizeMap[d.Quad.Label] += delta - } - size_change += delta - qs.horizon = d.ID - } - for k, v := range resizeMap { - if v != 0 { - err := qs.UpdateValueKeyBy(k, v, batch) - if err != nil { - return err - } - } - } - err := qs.db.Write(batch, qs.writeopts) - if err != nil { - glog.Error("Couldn't write to DB for tripleset.") - return err - } - qs.size += size_change - return nil -} - -func (qs *TripleStore) buildQuadWrite(batch *leveldb.Batch, q quad.Quad, id int64, isAdd bool) error { - var entry IndexEntry - data, err := qs.db.Get(qs.createKeyFor(spo, q), qs.readopts) - if err != nil && err != leveldb.ErrNotFound { - glog.Error("Couldn't access DB to prepare index: ", err) - return err - } - if err == nil { - // We got something. - err = json.Unmarshal(data, &entry) - if err != nil { - return err - } - } else { - entry.Quad = q - } - entry.History = append(entry.History, id) - - if isAdd && len(entry.History)%2 == 0 { - glog.Error("Entry History is out of sync for", entry) - return errors.New("Odd index history") - } - - bytes, err := json.Marshal(entry) - if err != nil { - glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) - return err - } - batch.Put(qs.createKeyFor(spo, q), bytes) - batch.Put(qs.createKeyFor(osp, q), bytes) - batch.Put(qs.createKeyFor(pos, q), bytes) - if q.Get(quad.Label) != "" { - batch.Put(qs.createKeyFor(cps, q), bytes) - } - return nil -} - -type ValueData struct { - Name string - Size int64 -} - -func (qs *TripleStore) UpdateValueKeyBy(name string, amount int64, batch *leveldb.Batch) error { - value := &ValueData{name, amount} - key := qs.createValueKeyFor(name) - b, err := qs.db.Get(key, qs.readopts) - - // Error getting the node from the database. - if err != nil && err != leveldb.ErrNotFound { - glog.Errorf("Error reading Value %s from the DB.", name) - return err - } - - // Node exists in the database -- unmarshal and update. - if b != nil && err != leveldb.ErrNotFound { - err = json.Unmarshal(b, value) - if err != nil { - glog.Errorf("Error: couldn't reconstruct value: %v", err) - return err - } - value.Size += amount - } - - // Are we deleting something? - if value.Size <= 0 { - value.Size = 0 - } - - // Repackage and rewrite. - bytes, err := json.Marshal(&value) - if err != nil { - glog.Errorf("Couldn't write to buffer for value %s: %s", name, err) - return err - } - if batch == nil { - qs.db.Put(key, bytes, qs.writeopts) - } else { - batch.Put(key, bytes) - } - return nil -} - -func (qs *TripleStore) Close() { - buf := new(bytes.Buffer) - err := binary.Write(buf, binary.LittleEndian, qs.size) - if err == nil { - werr := qs.db.Put([]byte("__size"), buf.Bytes(), qs.writeopts) - if werr != nil { - glog.Error("Couldn't write size before closing!") - } - } else { - glog.Errorf("Couldn't convert size before closing!") - } - buf.Reset() - err = binary.Write(buf, binary.LittleEndian, qs.horizon) - if err == nil { - werr := qs.db.Put([]byte("__horizon"), buf.Bytes(), qs.writeopts) - if werr != nil { - glog.Error("Couldn't write horizon before closing!") - } - } else { - glog.Errorf("Couldn't convert horizon before closing!") - } - qs.db.Close() - qs.open = false -} - -func (qs *TripleStore) Quad(k graph.Value) quad.Quad { - var triple quad.Quad - b, err := qs.db.Get(k.(Token), qs.readopts) - if err != nil && err != leveldb.ErrNotFound { - glog.Error("Error: couldn't get triple from DB.") - return quad.Quad{} - } - if err == leveldb.ErrNotFound { - // No harm, no foul. - return quad.Quad{} - } - err = json.Unmarshal(b, &triple) - if err != nil { - glog.Error("Error: couldn't reconstruct triple.") - return quad.Quad{} - } - return triple -} - -func (qs *TripleStore) convertStringToByteHash(s string) []byte { - h := hashPool.Get().(hash.Hash) - h.Reset() - defer hashPool.Put(h) - key := make([]byte, 0, hashSize) - h.Write([]byte(s)) - key = h.Sum(key) - return key -} - -func (qs *TripleStore) ValueOf(s string) graph.Value { - return Token(qs.createValueKeyFor(s)) -} - -func (qs *TripleStore) valueData(value_key []byte) ValueData { - var out ValueData - if glog.V(3) { - glog.V(3).Infof("%s %v", string(value_key[0]), value_key) - } - b, err := qs.db.Get(value_key, qs.readopts) - if err != nil && err != leveldb.ErrNotFound { - glog.Errorln("Error: couldn't get value from DB") - return out - } - if b != nil && err != leveldb.ErrNotFound { - err = json.Unmarshal(b, &out) - if err != nil { - glog.Errorln("Error: couldn't reconstruct value") - return ValueData{} - } - } - return out -} - -func (qs *TripleStore) NameOf(k graph.Value) string { - if k == nil { - glog.V(2).Info("k was nil") - return "" - } - return qs.valueData(k.(Token)).Name -} - -func (qs *TripleStore) SizeOf(k graph.Value) int64 { - if k == nil { - return 0 - } - return int64(qs.valueData(k.(Token)).Size) -} - -func (qs *TripleStore) getInt64ForKey(key string, empty int64) (int64, error) { - var out int64 - b, err := qs.db.Get([]byte(key), qs.readopts) - if err != nil && err != leveldb.ErrNotFound { - glog.Errorln("Couldn't read " + key + ": " + err.Error()) - return 0, err - } - if err == leveldb.ErrNotFound { - // Must be a new database. Cool - return empty, nil - } - buf := bytes.NewBuffer(b) - err = binary.Read(buf, binary.LittleEndian, &out) - if err != nil { - glog.Errorln("Error: couldn't parse", key) - return 0, err - } - return out, nil -} - -func (qs *TripleStore) getMetadata() error { - var err error - qs.size, err = qs.getInt64ForKey("__size", 0) - if err != nil { - return err - } - qs.horizon, err = qs.getInt64ForKey("__horizon", 0) - return err -} - -func (qs *TripleStore) SizeOfPrefix(pre []byte) (int64, error) { - limit := make([]byte, len(pre)) - copy(limit, pre) - end := len(limit) - 1 - limit[end]++ - ranges := make([]util.Range, 1) - ranges[0].Start = pre - ranges[0].Limit = limit - sizes, err := qs.db.SizeOf(ranges) - if err == nil { - return (int64(sizes[0]) >> 6) + 1, nil - } - return 0, nil -} - -func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { - var prefix string - switch d { - case quad.Subject: - prefix = "sp" - case quad.Predicate: - prefix = "po" - case quad.Object: - prefix = "os" - case quad.Label: - prefix = "cp" - default: - panic("unreachable " + d.String()) - } - return NewIterator(prefix, d, val, qs) -} - -func (qs *TripleStore) NodesAllIterator() graph.Iterator { - return NewAllIterator("z", quad.Any, qs) -} - -func (qs *TripleStore) TriplesAllIterator() graph.Iterator { - return NewAllIterator("po", quad.Predicate, qs) -} - -func (qs *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph.Value { - v := val.(Token) - offset := PositionOf(v[0:2], d, qs) - if offset != -1 { - return Token(append([]byte("z"), v[offset:offset+hashSize]...)) - } else { - return Token(qs.Quad(val).Get(d)) - } -} - -func compareBytes(a, b graph.Value) bool { - return bytes.Equal(a.(Token), b.(Token)) -} - -func (qs *TripleStore) FixedIterator() graph.FixedIterator { - return iterator.NewFixedIteratorWithCompare(compareBytes) -} diff --git a/graph/leveldb/triplestore_iterator_optimize.go b/graph/leveldb/triplestore_iterator_optimize.go deleted file mode 100644 index 31b7f7d..0000000 --- a/graph/leveldb/triplestore_iterator_optimize.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package leveldb - -import ( - "github.com/google/cayley/graph" - "github.com/google/cayley/graph/iterator" -) - -func (ts *TripleStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { - switch it.Type() { - case graph.LinksTo: - return ts.optimizeLinksTo(it.(*iterator.LinksTo)) - - } - return it, false -} - -func (ts *TripleStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { - subs := it.SubIterators() - if len(subs) != 1 { - return it, false - } - primary := subs[0] - if primary.Type() == graph.Fixed { - size, _ := primary.Size() - if size == 1 { - if !graph.Next(primary) { - panic("unexpected size during optimize") - } - val := primary.Result() - newIt := ts.TripleIterator(it.Direction(), val) - nt := newIt.Tagger() - nt.CopyFrom(it) - for _, tag := range primary.Tagger().Tags() { - nt.AddFixed(tag, val) - } - it.Close() - return newIt, true - } - } - return it, false -} diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go new file mode 100644 index 0000000..da03810 --- /dev/null +++ b/graph/memstore/quadstore.go @@ -0,0 +1,250 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memstore + +import ( + "fmt" + + "github.com/barakmich/glog" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/graph/memstore/b" + "github.com/google/cayley/quad" +) + +func init() { + graph.RegisterTripleStore("memstore", false, func(string, graph.Options) (graph.TripleStore, error) { + return newTripleStore(), nil + }, nil) +} + +type QuadDirectionIndex struct { + index [4]map[int64]*b.Tree +} + +func NewQuadDirectionIndex() QuadDirectionIndex { + return QuadDirectionIndex{[...]map[int64]*b.Tree{ + quad.Subject - 1: make(map[int64]*b.Tree), + quad.Predicate - 1: make(map[int64]*b.Tree), + quad.Object - 1: make(map[int64]*b.Tree), + quad.Label - 1: make(map[int64]*b.Tree), + }} +} + +func (qdi QuadDirectionIndex) Tree(d quad.Direction, id int64) *b.Tree { + if d < quad.Subject || d > quad.Label { + panic("illegal direction") + } + tree, ok := qdi.index[d-1][id] + if !ok { + tree = b.TreeNew(cmp) + qdi.index[d-1][id] = tree + } + return tree +} + +func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*b.Tree, bool) { + if d < quad.Subject || d > quad.Label { + panic("illegal direction") + } + tree, ok := qdi.index[d-1][id] + return tree, ok +} + +type LogEntry struct { + graph.Delta + DeletedBy int64 +} + +type TripleStore struct { + idCounter int64 + quadIdCounter int64 + idMap map[string]int64 + revIdMap map[int64]string + log []LogEntry + size int64 + index QuadDirectionIndex + // vip_index map[string]map[int64]map[string]map[int64]*b.Tree +} + +func newTripleStore() *TripleStore { + return &TripleStore{ + idMap: make(map[string]int64), + revIdMap: make(map[int64]string), + + // Sentinel null entry so indices start at 1 + log: make([]LogEntry, 1, 200), + + index: NewQuadDirectionIndex(), + idCounter: 1, + quadIdCounter: 1, + } +} + +func (ts *TripleStore) ApplyDeltas(deltas []graph.Delta) error { + for _, d := range deltas { + var err error + if d.Action == graph.Add { + err = ts.AddDelta(d) + } else { + err = ts.RemoveDelta(d) + } + if err != nil { + return err + } + } + return nil +} + +const maxInt = int(^uint(0) >> 1) + +func (ts *TripleStore) indexOf(t quad.Quad) (int64, bool) { + min := maxInt + var tree *b.Tree + for d := quad.Subject; d <= quad.Label; d++ { + sid := t.Get(d) + if d == quad.Label && sid == "" { + continue + } + id, ok := ts.idMap[sid] + // If we've never heard about a node, it must not exist + if !ok { + return 0, false + } + index, ok := ts.index.Get(d, id) + if !ok { + // If it's never been indexed in this direction, it can't exist. + return 0, false + } + if l := index.Len(); l < min { + min, tree = l, index + } + } + it := NewIterator(tree, "", ts) + + for it.Next() { + val := it.Result() + if t == ts.log[val.(int64)].Quad { + return val.(int64), true + } + } + return 0, false +} + +func (ts *TripleStore) AddDelta(d graph.Delta) error { + if _, exists := ts.indexOf(d.Quad); exists { + return graph.ErrQuadExists + } + qid := ts.quadIdCounter + ts.log = append(ts.log, LogEntry{Delta: d}) + ts.size++ + ts.quadIdCounter++ + + for dir := quad.Subject; dir <= quad.Label; dir++ { + sid := d.Quad.Get(dir) + if dir == quad.Label && sid == "" { + continue + } + if _, ok := ts.idMap[sid]; !ok { + ts.idMap[sid] = ts.idCounter + ts.revIdMap[ts.idCounter] = sid + ts.idCounter++ + } + } + + for dir := quad.Subject; dir <= quad.Label; dir++ { + if dir == quad.Label && d.Quad.Get(dir) == "" { + continue + } + id := ts.idMap[d.Quad.Get(dir)] + tree := ts.index.Tree(dir, id) + tree.Set(qid, struct{}{}) + } + + // TODO(barakmich): Add VIP indexing + return nil +} + +func (ts *TripleStore) RemoveDelta(d graph.Delta) error { + prevQuadID, exists := ts.indexOf(d.Quad) + if !exists { + return graph.ErrQuadNotExist + } + + quadID := ts.quadIdCounter + ts.log = append(ts.log, LogEntry{Delta: d}) + ts.log[prevQuadID].DeletedBy = quadID + ts.size-- + ts.quadIdCounter++ + return nil +} + +func (ts *TripleStore) Quad(index graph.Value) quad.Quad { + return ts.log[index.(int64)].Quad +} + +func (ts *TripleStore) TripleIterator(d quad.Direction, value graph.Value) graph.Iterator { + index, ok := ts.index.Get(d, value.(int64)) + data := fmt.Sprintf("dir:%s val:%d", d, value.(int64)) + if ok { + return NewIterator(index, data, ts) + } + return &iterator.Null{} +} + +func (ts *TripleStore) Horizon() int64 { + return ts.log[len(ts.log)-1].ID +} + +func (ts *TripleStore) Size() int64 { + return ts.size +} + +func (ts *TripleStore) DebugPrint() { + for i, l := range ts.log { + if i == 0 { + continue + } + glog.V(2).Infof("%d: %#v", i, l) + } +} + +func (ts *TripleStore) ValueOf(name string) graph.Value { + return ts.idMap[name] +} + +func (ts *TripleStore) NameOf(id graph.Value) string { + return ts.revIdMap[id.(int64)] +} + +func (ts *TripleStore) TriplesAllIterator() graph.Iterator { + return NewMemstoreQuadsAllIterator(ts) +} + +func (ts *TripleStore) FixedIterator() graph.FixedIterator { + return iterator.NewFixedIteratorWithCompare(iterator.BasicEquality) +} + +func (ts *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph.Value { + name := ts.Quad(val).Get(d) + return ts.ValueOf(name) +} + +func (ts *TripleStore) NodesAllIterator() graph.Iterator { + return NewMemstoreNodesAllIterator(ts) +} + +func (ts *TripleStore) Close() {} diff --git a/graph/memstore/quadstore_iterator_optimize.go b/graph/memstore/quadstore_iterator_optimize.go new file mode 100644 index 0000000..ae5628f --- /dev/null +++ b/graph/memstore/quadstore_iterator_optimize.go @@ -0,0 +1,55 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memstore + +import ( + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" +) + +func (ts *TripleStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { + switch it.Type() { + case graph.LinksTo: + return ts.optimizeLinksTo(it.(*iterator.LinksTo)) + + } + return it, false +} + +func (ts *TripleStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { + subs := it.SubIterators() + if len(subs) != 1 { + return it, false + } + primary := subs[0] + if primary.Type() == graph.Fixed { + size, _ := primary.Size() + if size == 1 { + if !graph.Next(primary) { + panic("unexpected size during optimize") + } + val := primary.Result() + newIt := ts.TripleIterator(it.Direction(), val) + nt := newIt.Tagger() + nt.CopyFrom(it) + for _, tag := range primary.Tagger().Tags() { + nt.AddFixed(tag, val) + } + return newIt, true + } + } + it.Close() + return it, false +} diff --git a/graph/memstore/quadstore_test.go b/graph/memstore/quadstore_test.go new file mode 100644 index 0000000..6f1959f --- /dev/null +++ b/graph/memstore/quadstore_test.go @@ -0,0 +1,198 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memstore + +import ( + "reflect" + "sort" + "testing" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" + "github.com/google/cayley/writer" +) + +// This is a simple test graph. +// +// +---+ +---+ +// | A |------- ->| F |<-- +// +---+ \------>+---+-/ +---+ \--+---+ +// ------>|#B#| | | E | +// +---+-------/ >+---+ | +---+ +// | C | / v +// +---+ -/ +---+ +// ---- +---+/ |#G#| +// \-->|#D#|------------->+---+ +// +---+ +// +var simpleGraph = []quad.Quad{ + {"A", "follows", "B", ""}, + {"C", "follows", "B", ""}, + {"C", "follows", "D", ""}, + {"D", "follows", "B", ""}, + {"B", "follows", "F", ""}, + {"F", "follows", "G", ""}, + {"D", "follows", "G", ""}, + {"E", "follows", "F", ""}, + {"B", "status", "cool", "status_graph"}, + {"D", "status", "cool", "status_graph"}, + {"G", "status", "cool", "status_graph"}, +} + +func makeTestStore(data []quad.Quad) (*TripleStore, graph.QuadWriter, []pair) { + seen := make(map[string]struct{}) + ts := newTripleStore() + var ( + val int64 + ind []pair + ) + writer, _ := writer.NewSingleReplication(ts, nil) + for _, t := range data { + for _, qp := range []string{t.Subject, t.Predicate, t.Object, t.Label} { + if _, ok := seen[qp]; !ok && qp != "" { + val++ + ind = append(ind, pair{qp, val}) + seen[qp] = struct{}{} + } + } + + writer.AddQuad(t) + } + return ts, writer, ind +} + +type pair struct { + query string + value int64 +} + +func TestMemstore(t *testing.T) { + ts, _, index := makeTestStore(simpleGraph) + if size := ts.Size(); size != int64(len(simpleGraph)) { + t.Errorf("Triple store has unexpected size, got:%d expected %d", size, len(simpleGraph)) + } + for _, test := range index { + v := ts.ValueOf(test.query) + switch v := v.(type) { + default: + t.Errorf("ValueOf(%q) returned unexpected type, got:%T expected int64", test.query, v) + case int64: + if v != test.value { + t.Errorf("ValueOf(%q) returned unexpected value, got:%d expected:%d", test.query, v, test.value) + } + } + } +} + +func TestIteratorsAndNextResultOrderA(t *testing.T) { + ts, _, _ := makeTestStore(simpleGraph) + + fixed := ts.FixedIterator() + fixed.Add(ts.ValueOf("C")) + + fixed2 := ts.FixedIterator() + fixed2.Add(ts.ValueOf("follows")) + + all := ts.NodesAllIterator() + + innerAnd := iterator.NewAnd() + innerAnd.AddSubIterator(iterator.NewLinksTo(ts, fixed2, quad.Predicate)) + innerAnd.AddSubIterator(iterator.NewLinksTo(ts, all, quad.Object)) + + hasa := iterator.NewHasA(ts, innerAnd, quad.Subject) + outerAnd := iterator.NewAnd() + outerAnd.AddSubIterator(fixed) + outerAnd.AddSubIterator(hasa) + + if !outerAnd.Next() { + t.Error("Expected one matching subtree") + } + val := outerAnd.Result() + if ts.NameOf(val) != "C" { + t.Errorf("Matching subtree should be %s, got %s", "barak", ts.NameOf(val)) + } + + var ( + got []string + expect = []string{"B", "D"} + ) + for { + got = append(got, ts.NameOf(all.Result())) + if !outerAnd.NextPath() { + break + } + } + sort.Strings(got) + + if !reflect.DeepEqual(got, expect) { + t.Errorf("Unexpected result, got:%q expect:%q", got, expect) + } + + if outerAnd.Next() { + t.Error("More than one possible top level output?") + } +} + +func TestLinksToOptimization(t *testing.T) { + ts, _, _ := makeTestStore(simpleGraph) + + fixed := ts.FixedIterator() + fixed.Add(ts.ValueOf("cool")) + + lto := iterator.NewLinksTo(ts, fixed, quad.Object) + lto.Tagger().Add("foo") + + newIt, changed := lto.Optimize() + if !changed { + t.Error("Iterator didn't change") + } + if newIt.Type() != Type() { + t.Fatal("Didn't swap out to LLRB") + } + + v := newIt.(*Iterator) + v_clone := v.Clone() + if v_clone.DebugString(0) != v.DebugString(0) { + t.Fatal("Wrong iterator. Got ", v_clone.DebugString(0)) + } + vt := v_clone.Tagger() + if len(vt.Tags()) < 1 || vt.Tags()[0] != "foo" { + t.Fatal("Tag on LinksTo did not persist") + } +} + +func TestRemoveTriple(t *testing.T) { + ts, w, _ := makeTestStore(simpleGraph) + + w.RemoveQuad(quad.Quad{"E", "follows", "F", ""}) + + fixed := ts.FixedIterator() + fixed.Add(ts.ValueOf("E")) + + fixed2 := ts.FixedIterator() + fixed2.Add(ts.ValueOf("follows")) + + innerAnd := iterator.NewAnd() + innerAnd.AddSubIterator(iterator.NewLinksTo(ts, fixed, quad.Subject)) + innerAnd.AddSubIterator(iterator.NewLinksTo(ts, fixed2, quad.Predicate)) + + hasa := iterator.NewHasA(ts, innerAnd, quad.Object) + + newIt, _ := hasa.Optimize() + if graph.Next(newIt) { + t.Error("E should not have any followers.") + } +} diff --git a/graph/memstore/triplestore.go b/graph/memstore/triplestore.go deleted file mode 100644 index da03810..0000000 --- a/graph/memstore/triplestore.go +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package memstore - -import ( - "fmt" - - "github.com/barakmich/glog" - - "github.com/google/cayley/graph" - "github.com/google/cayley/graph/iterator" - "github.com/google/cayley/graph/memstore/b" - "github.com/google/cayley/quad" -) - -func init() { - graph.RegisterTripleStore("memstore", false, func(string, graph.Options) (graph.TripleStore, error) { - return newTripleStore(), nil - }, nil) -} - -type QuadDirectionIndex struct { - index [4]map[int64]*b.Tree -} - -func NewQuadDirectionIndex() QuadDirectionIndex { - return QuadDirectionIndex{[...]map[int64]*b.Tree{ - quad.Subject - 1: make(map[int64]*b.Tree), - quad.Predicate - 1: make(map[int64]*b.Tree), - quad.Object - 1: make(map[int64]*b.Tree), - quad.Label - 1: make(map[int64]*b.Tree), - }} -} - -func (qdi QuadDirectionIndex) Tree(d quad.Direction, id int64) *b.Tree { - if d < quad.Subject || d > quad.Label { - panic("illegal direction") - } - tree, ok := qdi.index[d-1][id] - if !ok { - tree = b.TreeNew(cmp) - qdi.index[d-1][id] = tree - } - return tree -} - -func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*b.Tree, bool) { - if d < quad.Subject || d > quad.Label { - panic("illegal direction") - } - tree, ok := qdi.index[d-1][id] - return tree, ok -} - -type LogEntry struct { - graph.Delta - DeletedBy int64 -} - -type TripleStore struct { - idCounter int64 - quadIdCounter int64 - idMap map[string]int64 - revIdMap map[int64]string - log []LogEntry - size int64 - index QuadDirectionIndex - // vip_index map[string]map[int64]map[string]map[int64]*b.Tree -} - -func newTripleStore() *TripleStore { - return &TripleStore{ - idMap: make(map[string]int64), - revIdMap: make(map[int64]string), - - // Sentinel null entry so indices start at 1 - log: make([]LogEntry, 1, 200), - - index: NewQuadDirectionIndex(), - idCounter: 1, - quadIdCounter: 1, - } -} - -func (ts *TripleStore) ApplyDeltas(deltas []graph.Delta) error { - for _, d := range deltas { - var err error - if d.Action == graph.Add { - err = ts.AddDelta(d) - } else { - err = ts.RemoveDelta(d) - } - if err != nil { - return err - } - } - return nil -} - -const maxInt = int(^uint(0) >> 1) - -func (ts *TripleStore) indexOf(t quad.Quad) (int64, bool) { - min := maxInt - var tree *b.Tree - for d := quad.Subject; d <= quad.Label; d++ { - sid := t.Get(d) - if d == quad.Label && sid == "" { - continue - } - id, ok := ts.idMap[sid] - // If we've never heard about a node, it must not exist - if !ok { - return 0, false - } - index, ok := ts.index.Get(d, id) - if !ok { - // If it's never been indexed in this direction, it can't exist. - return 0, false - } - if l := index.Len(); l < min { - min, tree = l, index - } - } - it := NewIterator(tree, "", ts) - - for it.Next() { - val := it.Result() - if t == ts.log[val.(int64)].Quad { - return val.(int64), true - } - } - return 0, false -} - -func (ts *TripleStore) AddDelta(d graph.Delta) error { - if _, exists := ts.indexOf(d.Quad); exists { - return graph.ErrQuadExists - } - qid := ts.quadIdCounter - ts.log = append(ts.log, LogEntry{Delta: d}) - ts.size++ - ts.quadIdCounter++ - - for dir := quad.Subject; dir <= quad.Label; dir++ { - sid := d.Quad.Get(dir) - if dir == quad.Label && sid == "" { - continue - } - if _, ok := ts.idMap[sid]; !ok { - ts.idMap[sid] = ts.idCounter - ts.revIdMap[ts.idCounter] = sid - ts.idCounter++ - } - } - - for dir := quad.Subject; dir <= quad.Label; dir++ { - if dir == quad.Label && d.Quad.Get(dir) == "" { - continue - } - id := ts.idMap[d.Quad.Get(dir)] - tree := ts.index.Tree(dir, id) - tree.Set(qid, struct{}{}) - } - - // TODO(barakmich): Add VIP indexing - return nil -} - -func (ts *TripleStore) RemoveDelta(d graph.Delta) error { - prevQuadID, exists := ts.indexOf(d.Quad) - if !exists { - return graph.ErrQuadNotExist - } - - quadID := ts.quadIdCounter - ts.log = append(ts.log, LogEntry{Delta: d}) - ts.log[prevQuadID].DeletedBy = quadID - ts.size-- - ts.quadIdCounter++ - return nil -} - -func (ts *TripleStore) Quad(index graph.Value) quad.Quad { - return ts.log[index.(int64)].Quad -} - -func (ts *TripleStore) TripleIterator(d quad.Direction, value graph.Value) graph.Iterator { - index, ok := ts.index.Get(d, value.(int64)) - data := fmt.Sprintf("dir:%s val:%d", d, value.(int64)) - if ok { - return NewIterator(index, data, ts) - } - return &iterator.Null{} -} - -func (ts *TripleStore) Horizon() int64 { - return ts.log[len(ts.log)-1].ID -} - -func (ts *TripleStore) Size() int64 { - return ts.size -} - -func (ts *TripleStore) DebugPrint() { - for i, l := range ts.log { - if i == 0 { - continue - } - glog.V(2).Infof("%d: %#v", i, l) - } -} - -func (ts *TripleStore) ValueOf(name string) graph.Value { - return ts.idMap[name] -} - -func (ts *TripleStore) NameOf(id graph.Value) string { - return ts.revIdMap[id.(int64)] -} - -func (ts *TripleStore) TriplesAllIterator() graph.Iterator { - return NewMemstoreQuadsAllIterator(ts) -} - -func (ts *TripleStore) FixedIterator() graph.FixedIterator { - return iterator.NewFixedIteratorWithCompare(iterator.BasicEquality) -} - -func (ts *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph.Value { - name := ts.Quad(val).Get(d) - return ts.ValueOf(name) -} - -func (ts *TripleStore) NodesAllIterator() graph.Iterator { - return NewMemstoreNodesAllIterator(ts) -} - -func (ts *TripleStore) Close() {} diff --git a/graph/memstore/triplestore_iterator_optimize.go b/graph/memstore/triplestore_iterator_optimize.go deleted file mode 100644 index ae5628f..0000000 --- a/graph/memstore/triplestore_iterator_optimize.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package memstore - -import ( - "github.com/google/cayley/graph" - "github.com/google/cayley/graph/iterator" -) - -func (ts *TripleStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { - switch it.Type() { - case graph.LinksTo: - return ts.optimizeLinksTo(it.(*iterator.LinksTo)) - - } - return it, false -} - -func (ts *TripleStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { - subs := it.SubIterators() - if len(subs) != 1 { - return it, false - } - primary := subs[0] - if primary.Type() == graph.Fixed { - size, _ := primary.Size() - if size == 1 { - if !graph.Next(primary) { - panic("unexpected size during optimize") - } - val := primary.Result() - newIt := ts.TripleIterator(it.Direction(), val) - nt := newIt.Tagger() - nt.CopyFrom(it) - for _, tag := range primary.Tagger().Tags() { - nt.AddFixed(tag, val) - } - return newIt, true - } - } - it.Close() - return it, false -} diff --git a/graph/memstore/triplestore_test.go b/graph/memstore/triplestore_test.go deleted file mode 100644 index 6f1959f..0000000 --- a/graph/memstore/triplestore_test.go +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package memstore - -import ( - "reflect" - "sort" - "testing" - - "github.com/google/cayley/graph" - "github.com/google/cayley/graph/iterator" - "github.com/google/cayley/quad" - "github.com/google/cayley/writer" -) - -// This is a simple test graph. -// -// +---+ +---+ -// | A |------- ->| F |<-- -// +---+ \------>+---+-/ +---+ \--+---+ -// ------>|#B#| | | E | -// +---+-------/ >+---+ | +---+ -// | C | / v -// +---+ -/ +---+ -// ---- +---+/ |#G#| -// \-->|#D#|------------->+---+ -// +---+ -// -var simpleGraph = []quad.Quad{ - {"A", "follows", "B", ""}, - {"C", "follows", "B", ""}, - {"C", "follows", "D", ""}, - {"D", "follows", "B", ""}, - {"B", "follows", "F", ""}, - {"F", "follows", "G", ""}, - {"D", "follows", "G", ""}, - {"E", "follows", "F", ""}, - {"B", "status", "cool", "status_graph"}, - {"D", "status", "cool", "status_graph"}, - {"G", "status", "cool", "status_graph"}, -} - -func makeTestStore(data []quad.Quad) (*TripleStore, graph.QuadWriter, []pair) { - seen := make(map[string]struct{}) - ts := newTripleStore() - var ( - val int64 - ind []pair - ) - writer, _ := writer.NewSingleReplication(ts, nil) - for _, t := range data { - for _, qp := range []string{t.Subject, t.Predicate, t.Object, t.Label} { - if _, ok := seen[qp]; !ok && qp != "" { - val++ - ind = append(ind, pair{qp, val}) - seen[qp] = struct{}{} - } - } - - writer.AddQuad(t) - } - return ts, writer, ind -} - -type pair struct { - query string - value int64 -} - -func TestMemstore(t *testing.T) { - ts, _, index := makeTestStore(simpleGraph) - if size := ts.Size(); size != int64(len(simpleGraph)) { - t.Errorf("Triple store has unexpected size, got:%d expected %d", size, len(simpleGraph)) - } - for _, test := range index { - v := ts.ValueOf(test.query) - switch v := v.(type) { - default: - t.Errorf("ValueOf(%q) returned unexpected type, got:%T expected int64", test.query, v) - case int64: - if v != test.value { - t.Errorf("ValueOf(%q) returned unexpected value, got:%d expected:%d", test.query, v, test.value) - } - } - } -} - -func TestIteratorsAndNextResultOrderA(t *testing.T) { - ts, _, _ := makeTestStore(simpleGraph) - - fixed := ts.FixedIterator() - fixed.Add(ts.ValueOf("C")) - - fixed2 := ts.FixedIterator() - fixed2.Add(ts.ValueOf("follows")) - - all := ts.NodesAllIterator() - - innerAnd := iterator.NewAnd() - innerAnd.AddSubIterator(iterator.NewLinksTo(ts, fixed2, quad.Predicate)) - innerAnd.AddSubIterator(iterator.NewLinksTo(ts, all, quad.Object)) - - hasa := iterator.NewHasA(ts, innerAnd, quad.Subject) - outerAnd := iterator.NewAnd() - outerAnd.AddSubIterator(fixed) - outerAnd.AddSubIterator(hasa) - - if !outerAnd.Next() { - t.Error("Expected one matching subtree") - } - val := outerAnd.Result() - if ts.NameOf(val) != "C" { - t.Errorf("Matching subtree should be %s, got %s", "barak", ts.NameOf(val)) - } - - var ( - got []string - expect = []string{"B", "D"} - ) - for { - got = append(got, ts.NameOf(all.Result())) - if !outerAnd.NextPath() { - break - } - } - sort.Strings(got) - - if !reflect.DeepEqual(got, expect) { - t.Errorf("Unexpected result, got:%q expect:%q", got, expect) - } - - if outerAnd.Next() { - t.Error("More than one possible top level output?") - } -} - -func TestLinksToOptimization(t *testing.T) { - ts, _, _ := makeTestStore(simpleGraph) - - fixed := ts.FixedIterator() - fixed.Add(ts.ValueOf("cool")) - - lto := iterator.NewLinksTo(ts, fixed, quad.Object) - lto.Tagger().Add("foo") - - newIt, changed := lto.Optimize() - if !changed { - t.Error("Iterator didn't change") - } - if newIt.Type() != Type() { - t.Fatal("Didn't swap out to LLRB") - } - - v := newIt.(*Iterator) - v_clone := v.Clone() - if v_clone.DebugString(0) != v.DebugString(0) { - t.Fatal("Wrong iterator. Got ", v_clone.DebugString(0)) - } - vt := v_clone.Tagger() - if len(vt.Tags()) < 1 || vt.Tags()[0] != "foo" { - t.Fatal("Tag on LinksTo did not persist") - } -} - -func TestRemoveTriple(t *testing.T) { - ts, w, _ := makeTestStore(simpleGraph) - - w.RemoveQuad(quad.Quad{"E", "follows", "F", ""}) - - fixed := ts.FixedIterator() - fixed.Add(ts.ValueOf("E")) - - fixed2 := ts.FixedIterator() - fixed2.Add(ts.ValueOf("follows")) - - innerAnd := iterator.NewAnd() - innerAnd.AddSubIterator(iterator.NewLinksTo(ts, fixed, quad.Subject)) - innerAnd.AddSubIterator(iterator.NewLinksTo(ts, fixed2, quad.Predicate)) - - hasa := iterator.NewHasA(ts, innerAnd, quad.Object) - - newIt, _ := hasa.Optimize() - if graph.Next(newIt) { - t.Error("E should not have any followers.") - } -} diff --git a/graph/mongo/quadstore.go b/graph/mongo/quadstore.go new file mode 100644 index 0000000..87adf7d --- /dev/null +++ b/graph/mongo/quadstore.go @@ -0,0 +1,359 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongo + +import ( + "crypto/sha1" + "encoding/hex" + "hash" + "sync" + + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" + + "github.com/barakmich/glog" + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +func init() { + graph.RegisterTripleStore("mongo", true, newTripleStore, createNewMongoGraph) +} + +const DefaultDBName = "cayley" + +var ( + hashPool = sync.Pool{ + New: func() interface{} { return sha1.New() }, + } + hashSize = sha1.Size +) + +type TripleStore struct { + session *mgo.Session + db *mgo.Database + idCache *IDLru +} + +func createNewMongoGraph(addr string, options graph.Options) error { + conn, err := mgo.Dial(addr) + if err != nil { + return err + } + conn.SetSafe(&mgo.Safe{}) + dbName := DefaultDBName + if val, ok := options.StringKey("database_name"); ok { + dbName = val + } + db := conn.DB(dbName) + indexOpts := mgo.Index{ + Key: []string{"subject"}, + Unique: false, + DropDups: false, + Background: true, + Sparse: true, + } + db.C("quads").EnsureIndex(indexOpts) + indexOpts.Key = []string{"predicate"} + db.C("quads").EnsureIndex(indexOpts) + indexOpts.Key = []string{"object"} + db.C("quads").EnsureIndex(indexOpts) + indexOpts.Key = []string{"label"} + db.C("quads").EnsureIndex(indexOpts) + logOpts := mgo.Index{ + Key: []string{"LogID"}, + Unique: true, + DropDups: false, + Background: true, + Sparse: true, + } + db.C("log").EnsureIndex(logOpts) + return nil +} + +func newTripleStore(addr string, options graph.Options) (graph.TripleStore, error) { + var qs TripleStore + conn, err := mgo.Dial(addr) + if err != nil { + return nil, err + } + conn.SetSafe(&mgo.Safe{}) + dbName := DefaultDBName + if val, ok := options.StringKey("database_name"); ok { + dbName = val + } + qs.db = conn.DB(dbName) + qs.session = conn + qs.idCache = NewIDLru(1 << 16) + return &qs, nil +} + +func (qs *TripleStore) getIdForQuad(t quad.Quad) string { + id := qs.convertStringToByteHash(t.Subject) + id += qs.convertStringToByteHash(t.Predicate) + id += qs.convertStringToByteHash(t.Object) + id += qs.convertStringToByteHash(t.Label) + return id +} + +func (qs *TripleStore) convertStringToByteHash(s string) string { + h := hashPool.Get().(hash.Hash) + h.Reset() + defer hashPool.Put(h) + + key := make([]byte, 0, hashSize) + h.Write([]byte(s)) + key = h.Sum(key) + return hex.EncodeToString(key) +} + +type MongoNode struct { + Id string `bson:"_id"` + Name string `bson:"Name"` + Size int `bson:"Size"` +} + +type MongoLogEntry struct { + LogID int64 `bson:"LogID"` + Action string `bson:"Action"` + Key string `bson:"Key"` + Timestamp int64 +} + +func (qs *TripleStore) updateNodeBy(node_name string, inc int) error { + node := qs.ValueOf(node_name) + doc := bson.M{ + "_id": node.(string), + "Name": node_name, + } + upsert := bson.M{ + "$setOnInsert": doc, + "$inc": bson.M{ + "Size": inc, + }, + } + + _, err := qs.db.C("nodes").UpsertId(node, upsert) + if err != nil { + glog.Errorf("Error updating node: %v", err) + } + return err +} + +func (qs *TripleStore) updateQuad(q quad.Quad, id int64, proc graph.Procedure) error { + var setname string + if proc == graph.Add { + setname = "Added" + } else if proc == graph.Delete { + setname = "Deleted" + } + upsert := bson.M{ + "$setOnInsert": q, + "$push": bson.M{ + setname: id, + }, + } + _, err := qs.db.C("quads").UpsertId(qs.getIdForQuad(q), upsert) + if err != nil { + glog.Errorf("Error: %v", err) + } + return err +} + +func (qs *TripleStore) checkValid(key string) bool { + var indexEntry struct { + Added []int64 `bson:"Added"` + Deleted []int64 `bson:"Deleted"` + } + err := qs.db.C("quads").FindId(key).One(&indexEntry) + if err == mgo.ErrNotFound { + return false + } + if err != nil { + glog.Errorln("Other error checking valid quad: %s %v.", key, err) + return false + } + if len(indexEntry.Added) <= len(indexEntry.Deleted) { + return false + } + return true +} + +func (qs *TripleStore) updateLog(d graph.Delta) error { + var action string + if d.Action == graph.Add { + action = "Add" + } else { + action = "Delete" + } + entry := MongoLogEntry{ + LogID: d.ID, + Action: action, + Key: qs.getIdForQuad(d.Quad), + Timestamp: d.Timestamp.UnixNano(), + } + err := qs.db.C("log").Insert(entry) + if err != nil { + glog.Errorf("Error updating log: %v", err) + } + return err +} + +func (qs *TripleStore) ApplyDeltas(in []graph.Delta) error { + qs.session.SetSafe(nil) + ids := make(map[string]int) + // Pre-check the existence condition. + for _, d := range in { + key := qs.getIdForQuad(d.Quad) + switch d.Action { + case graph.Add: + if qs.checkValid(key) { + return graph.ErrQuadExists + } + case graph.Delete: + if !qs.checkValid(key) { + return graph.ErrQuadNotExist + } + } + } + if glog.V(2) { + glog.Infoln("Existence verified. Proceeding.") + } + for _, d := range in { + err := qs.updateLog(d) + if err != nil { + return err + } + } + for _, d := range in { + err := qs.updateQuad(d.Quad, d.ID, d.Action) + if err != nil { + return err + } + var countdelta int + if d.Action == graph.Add { + countdelta = 1 + } else { + countdelta = -1 + } + ids[d.Quad.Subject] += countdelta + ids[d.Quad.Object] += countdelta + ids[d.Quad.Predicate] += countdelta + if d.Quad.Label != "" { + ids[d.Quad.Label] += countdelta + } + } + for k, v := range ids { + err := qs.updateNodeBy(k, v) + if err != nil { + return err + } + } + qs.session.SetSafe(&mgo.Safe{}) + return nil +} + +func (qs *TripleStore) Quad(val graph.Value) quad.Quad { + var q quad.Quad + err := qs.db.C("quads").FindId(val.(string)).One(&q) + if err != nil { + glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err) + } + return q +} + +func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { + return NewIterator(qs, "quads", d, val) +} + +func (qs *TripleStore) NodesAllIterator() graph.Iterator { + return NewAllIterator(qs, "nodes") +} + +func (qs *TripleStore) TriplesAllIterator() graph.Iterator { + return NewAllIterator(qs, "quads") +} + +func (qs *TripleStore) ValueOf(s string) graph.Value { + return qs.convertStringToByteHash(s) +} + +func (qs *TripleStore) NameOf(v graph.Value) string { + val, ok := qs.idCache.Get(v.(string)) + if ok { + return val + } + var node MongoNode + err := qs.db.C("nodes").FindId(v.(string)).One(&node) + if err != nil { + glog.Errorf("Error: Couldn't retrieve node %s %v", v, err) + } + qs.idCache.Put(v.(string), node.Name) + return node.Name +} + +func (qs *TripleStore) Size() int64 { + // TODO(barakmich): Make size real; store it in the log, and retrieve it. + count, err := qs.db.C("quads").Count() + if err != nil { + glog.Errorf("Error: %v", err) + return 0 + } + return int64(count) +} + +func (qs *TripleStore) Horizon() int64 { + var log MongoLogEntry + err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log) + if err != nil { + if err == mgo.ErrNotFound { + return 0 + } + glog.Errorf("Could not get Horizon from Mongo: %v", err) + } + return log.LogID +} + +func compareStrings(a, b graph.Value) bool { + return a.(string) == b.(string) +} + +func (qs *TripleStore) FixedIterator() graph.FixedIterator { + return iterator.NewFixedIteratorWithCompare(compareStrings) +} + +func (qs *TripleStore) Close() { + qs.db.Session.Close() +} + +func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.Value { + // Maybe do the trick here + var offset int + switch d { + case quad.Subject: + offset = 0 + case quad.Predicate: + offset = (hashSize * 2) + case quad.Object: + offset = (hashSize * 2) * 2 + case quad.Label: + offset = (hashSize * 2) * 3 + } + val := in.(string)[offset : hashSize*2+offset] + return val +} + +// TODO(barakmich): Rewrite bulk loader. For now, iterating around blocks is the way we'll go about it. diff --git a/graph/mongo/quadstore_iterator_optimize.go b/graph/mongo/quadstore_iterator_optimize.go new file mode 100644 index 0000000..99fb25b --- /dev/null +++ b/graph/mongo/quadstore_iterator_optimize.go @@ -0,0 +1,55 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongo + +import ( + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" +) + +func (ts *TripleStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { + switch it.Type() { + case graph.LinksTo: + return ts.optimizeLinksTo(it.(*iterator.LinksTo)) + + } + return it, false +} + +func (ts *TripleStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { + subs := it.SubIterators() + if len(subs) != 1 { + return it, false + } + primary := subs[0] + if primary.Type() == graph.Fixed { + size, _ := primary.Size() + if size == 1 { + if !graph.Next(primary) { + panic("unexpected size during optimize") + } + val := primary.Result() + newIt := ts.TripleIterator(it.Direction(), val) + nt := newIt.Tagger() + nt.CopyFrom(it) + for _, tag := range primary.Tagger().Tags() { + nt.AddFixed(tag, val) + } + it.Close() + return newIt, true + } + } + return it, false +} diff --git a/graph/mongo/triplestore.go b/graph/mongo/triplestore.go deleted file mode 100644 index 87adf7d..0000000 --- a/graph/mongo/triplestore.go +++ /dev/null @@ -1,359 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mongo - -import ( - "crypto/sha1" - "encoding/hex" - "hash" - "sync" - - "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" - - "github.com/barakmich/glog" - "github.com/google/cayley/graph" - "github.com/google/cayley/graph/iterator" - "github.com/google/cayley/quad" -) - -func init() { - graph.RegisterTripleStore("mongo", true, newTripleStore, createNewMongoGraph) -} - -const DefaultDBName = "cayley" - -var ( - hashPool = sync.Pool{ - New: func() interface{} { return sha1.New() }, - } - hashSize = sha1.Size -) - -type TripleStore struct { - session *mgo.Session - db *mgo.Database - idCache *IDLru -} - -func createNewMongoGraph(addr string, options graph.Options) error { - conn, err := mgo.Dial(addr) - if err != nil { - return err - } - conn.SetSafe(&mgo.Safe{}) - dbName := DefaultDBName - if val, ok := options.StringKey("database_name"); ok { - dbName = val - } - db := conn.DB(dbName) - indexOpts := mgo.Index{ - Key: []string{"subject"}, - Unique: false, - DropDups: false, - Background: true, - Sparse: true, - } - db.C("quads").EnsureIndex(indexOpts) - indexOpts.Key = []string{"predicate"} - db.C("quads").EnsureIndex(indexOpts) - indexOpts.Key = []string{"object"} - db.C("quads").EnsureIndex(indexOpts) - indexOpts.Key = []string{"label"} - db.C("quads").EnsureIndex(indexOpts) - logOpts := mgo.Index{ - Key: []string{"LogID"}, - Unique: true, - DropDups: false, - Background: true, - Sparse: true, - } - db.C("log").EnsureIndex(logOpts) - return nil -} - -func newTripleStore(addr string, options graph.Options) (graph.TripleStore, error) { - var qs TripleStore - conn, err := mgo.Dial(addr) - if err != nil { - return nil, err - } - conn.SetSafe(&mgo.Safe{}) - dbName := DefaultDBName - if val, ok := options.StringKey("database_name"); ok { - dbName = val - } - qs.db = conn.DB(dbName) - qs.session = conn - qs.idCache = NewIDLru(1 << 16) - return &qs, nil -} - -func (qs *TripleStore) getIdForQuad(t quad.Quad) string { - id := qs.convertStringToByteHash(t.Subject) - id += qs.convertStringToByteHash(t.Predicate) - id += qs.convertStringToByteHash(t.Object) - id += qs.convertStringToByteHash(t.Label) - return id -} - -func (qs *TripleStore) convertStringToByteHash(s string) string { - h := hashPool.Get().(hash.Hash) - h.Reset() - defer hashPool.Put(h) - - key := make([]byte, 0, hashSize) - h.Write([]byte(s)) - key = h.Sum(key) - return hex.EncodeToString(key) -} - -type MongoNode struct { - Id string `bson:"_id"` - Name string `bson:"Name"` - Size int `bson:"Size"` -} - -type MongoLogEntry struct { - LogID int64 `bson:"LogID"` - Action string `bson:"Action"` - Key string `bson:"Key"` - Timestamp int64 -} - -func (qs *TripleStore) updateNodeBy(node_name string, inc int) error { - node := qs.ValueOf(node_name) - doc := bson.M{ - "_id": node.(string), - "Name": node_name, - } - upsert := bson.M{ - "$setOnInsert": doc, - "$inc": bson.M{ - "Size": inc, - }, - } - - _, err := qs.db.C("nodes").UpsertId(node, upsert) - if err != nil { - glog.Errorf("Error updating node: %v", err) - } - return err -} - -func (qs *TripleStore) updateQuad(q quad.Quad, id int64, proc graph.Procedure) error { - var setname string - if proc == graph.Add { - setname = "Added" - } else if proc == graph.Delete { - setname = "Deleted" - } - upsert := bson.M{ - "$setOnInsert": q, - "$push": bson.M{ - setname: id, - }, - } - _, err := qs.db.C("quads").UpsertId(qs.getIdForQuad(q), upsert) - if err != nil { - glog.Errorf("Error: %v", err) - } - return err -} - -func (qs *TripleStore) checkValid(key string) bool { - var indexEntry struct { - Added []int64 `bson:"Added"` - Deleted []int64 `bson:"Deleted"` - } - err := qs.db.C("quads").FindId(key).One(&indexEntry) - if err == mgo.ErrNotFound { - return false - } - if err != nil { - glog.Errorln("Other error checking valid quad: %s %v.", key, err) - return false - } - if len(indexEntry.Added) <= len(indexEntry.Deleted) { - return false - } - return true -} - -func (qs *TripleStore) updateLog(d graph.Delta) error { - var action string - if d.Action == graph.Add { - action = "Add" - } else { - action = "Delete" - } - entry := MongoLogEntry{ - LogID: d.ID, - Action: action, - Key: qs.getIdForQuad(d.Quad), - Timestamp: d.Timestamp.UnixNano(), - } - err := qs.db.C("log").Insert(entry) - if err != nil { - glog.Errorf("Error updating log: %v", err) - } - return err -} - -func (qs *TripleStore) ApplyDeltas(in []graph.Delta) error { - qs.session.SetSafe(nil) - ids := make(map[string]int) - // Pre-check the existence condition. - for _, d := range in { - key := qs.getIdForQuad(d.Quad) - switch d.Action { - case graph.Add: - if qs.checkValid(key) { - return graph.ErrQuadExists - } - case graph.Delete: - if !qs.checkValid(key) { - return graph.ErrQuadNotExist - } - } - } - if glog.V(2) { - glog.Infoln("Existence verified. Proceeding.") - } - for _, d := range in { - err := qs.updateLog(d) - if err != nil { - return err - } - } - for _, d := range in { - err := qs.updateQuad(d.Quad, d.ID, d.Action) - if err != nil { - return err - } - var countdelta int - if d.Action == graph.Add { - countdelta = 1 - } else { - countdelta = -1 - } - ids[d.Quad.Subject] += countdelta - ids[d.Quad.Object] += countdelta - ids[d.Quad.Predicate] += countdelta - if d.Quad.Label != "" { - ids[d.Quad.Label] += countdelta - } - } - for k, v := range ids { - err := qs.updateNodeBy(k, v) - if err != nil { - return err - } - } - qs.session.SetSafe(&mgo.Safe{}) - return nil -} - -func (qs *TripleStore) Quad(val graph.Value) quad.Quad { - var q quad.Quad - err := qs.db.C("quads").FindId(val.(string)).One(&q) - if err != nil { - glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err) - } - return q -} - -func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { - return NewIterator(qs, "quads", d, val) -} - -func (qs *TripleStore) NodesAllIterator() graph.Iterator { - return NewAllIterator(qs, "nodes") -} - -func (qs *TripleStore) TriplesAllIterator() graph.Iterator { - return NewAllIterator(qs, "quads") -} - -func (qs *TripleStore) ValueOf(s string) graph.Value { - return qs.convertStringToByteHash(s) -} - -func (qs *TripleStore) NameOf(v graph.Value) string { - val, ok := qs.idCache.Get(v.(string)) - if ok { - return val - } - var node MongoNode - err := qs.db.C("nodes").FindId(v.(string)).One(&node) - if err != nil { - glog.Errorf("Error: Couldn't retrieve node %s %v", v, err) - } - qs.idCache.Put(v.(string), node.Name) - return node.Name -} - -func (qs *TripleStore) Size() int64 { - // TODO(barakmich): Make size real; store it in the log, and retrieve it. - count, err := qs.db.C("quads").Count() - if err != nil { - glog.Errorf("Error: %v", err) - return 0 - } - return int64(count) -} - -func (qs *TripleStore) Horizon() int64 { - var log MongoLogEntry - err := qs.db.C("log").Find(nil).Sort("-LogID").One(&log) - if err != nil { - if err == mgo.ErrNotFound { - return 0 - } - glog.Errorf("Could not get Horizon from Mongo: %v", err) - } - return log.LogID -} - -func compareStrings(a, b graph.Value) bool { - return a.(string) == b.(string) -} - -func (qs *TripleStore) FixedIterator() graph.FixedIterator { - return iterator.NewFixedIteratorWithCompare(compareStrings) -} - -func (qs *TripleStore) Close() { - qs.db.Session.Close() -} - -func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.Value { - // Maybe do the trick here - var offset int - switch d { - case quad.Subject: - offset = 0 - case quad.Predicate: - offset = (hashSize * 2) - case quad.Object: - offset = (hashSize * 2) * 2 - case quad.Label: - offset = (hashSize * 2) * 3 - } - val := in.(string)[offset : hashSize*2+offset] - return val -} - -// TODO(barakmich): Rewrite bulk loader. For now, iterating around blocks is the way we'll go about it. diff --git a/graph/mongo/triplestore_iterator_optimize.go b/graph/mongo/triplestore_iterator_optimize.go deleted file mode 100644 index 99fb25b..0000000 --- a/graph/mongo/triplestore_iterator_optimize.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mongo - -import ( - "github.com/google/cayley/graph" - "github.com/google/cayley/graph/iterator" -) - -func (ts *TripleStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { - switch it.Type() { - case graph.LinksTo: - return ts.optimizeLinksTo(it.(*iterator.LinksTo)) - - } - return it, false -} - -func (ts *TripleStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { - subs := it.SubIterators() - if len(subs) != 1 { - return it, false - } - primary := subs[0] - if primary.Type() == graph.Fixed { - size, _ := primary.Size() - if size == 1 { - if !graph.Next(primary) { - panic("unexpected size during optimize") - } - val := primary.Result() - newIt := ts.TripleIterator(it.Direction(), val) - nt := newIt.Tagger() - nt.CopyFrom(it) - for _, tag := range primary.Tagger().Tags() { - nt.AddFixed(tag, val) - } - it.Close() - return newIt, true - } - } - return it, false -} diff --git a/graph/quadstore.go b/graph/quadstore.go new file mode 100644 index 0000000..741297c --- /dev/null +++ b/graph/quadstore.go @@ -0,0 +1,193 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package graph + +// Defines the TripleStore interface. Every backing store must implement at +// least this interface. +// +// Most of these are pretty straightforward. As long as we can surface this +// interface, the rest of the stack will "just work" and we can connect to any +// triple backing store we prefer. + +import ( + "errors" + + "github.com/barakmich/glog" + "github.com/google/cayley/quad" +) + +// Value defines an opaque "triple store value" type. However the backend wishes +// to implement it, a Value is merely a token to a triple or a node that the +// backing store itself understands, and the base iterators pass around. +// +// For example, in a very traditional, graphd-style graph, these are int64s +// (guids of the primitives). In a very direct sort of graph, these could be +// pointers to structs, or merely triples, or whatever works best for the +// backing store. +// +// These must be comparable, or implement a `Key() interface{}` function +// so that they may be stored in maps. +type Value interface{} + +type TripleStore interface { + // The only way in is through building a transaction, which + // is done by a replication strategy. + ApplyDeltas([]Delta) error + + // Given an opaque token, returns the triple for that token from the store. + Quad(Value) quad.Quad + + // Given a direction and a token, creates an iterator of links which have + // that node token in that directional field. + TripleIterator(quad.Direction, Value) Iterator + + // Returns an iterator enumerating all nodes in the graph. + NodesAllIterator() Iterator + + // Returns an iterator enumerating all links in the graph. + TriplesAllIterator() Iterator + + // Given a node ID, return the opaque token used by the TripleStore + // to represent that id. + ValueOf(string) Value + + // Given an opaque token, return the node that it represents. + NameOf(Value) string + + // Returns the number of triples currently stored. + Size() int64 + + // The last replicated transaction ID that this triplestore has verified. + Horizon() int64 + + // Creates a fixed iterator which can compare Values + FixedIterator() FixedIterator + + // Optimize an iterator in the context of the triple store. + // Suppose we have a better index for the passed tree; this + // gives the TripleStore the opportunity to replace it + // with a more efficient iterator. + OptimizeIterator(it Iterator) (Iterator, bool) + + // Close the triple store and clean up. (Flush to disk, cleanly + // sever connections, etc) + Close() + + // Convenience function for speed. Given a triple token and a direction + // return the node token for that direction. Sometimes, a TripleStore + // can do this without going all the way to the backing store, and + // gives the TripleStore the opportunity to make this optimization. + // + // Iterators will call this. At worst, a valid implementation is + // ts.IdFor(ts.quad.Quad(id).Get(dir)) + TripleDirection(id Value, d quad.Direction) Value +} + +type Options map[string]interface{} + +func (d Options) IntKey(key string) (int, bool) { + if val, ok := d[key]; ok { + switch vv := val.(type) { + case float64: + return int(vv), true + default: + glog.Fatalln("Invalid", key, "parameter type from config.") + } + } + return 0, false +} + +func (d Options) StringKey(key string) (string, bool) { + if val, ok := d[key]; ok { + switch vv := val.(type) { + case string: + return vv, true + default: + glog.Fatalln("Invalid", key, "parameter type from config.") + } + } + return "", false +} + +func (d Options) BoolKey(key string) (bool, bool) { + if val, ok := d[key]; ok { + switch vv := val.(type) { + case bool: + return vv, true + default: + glog.Fatalln("Invalid", key, "parameter type from config.") + } + } + return false, false +} + +var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load") + +type BulkLoader interface { + // BulkLoad loads Quads from a quad.Unmarshaler in bulk to the TripleStore. + // It returns ErrCannotBulkLoad if bulk loading is not possible. For example if + // you cannot load in bulk to a non-empty database, and the db is non-empty. + BulkLoad(quad.Unmarshaler) error +} + +type NewStoreFunc func(string, Options) (TripleStore, error) +type InitStoreFunc func(string, Options) error + +type register struct { + newFunc NewStoreFunc + initFunc InitStoreFunc + isPersistent bool +} + +var storeRegistry = make(map[string]register) + +func RegisterTripleStore(name string, persists bool, newFunc NewStoreFunc, initFunc InitStoreFunc) { + if _, found := storeRegistry[name]; found { + panic("already registered TripleStore " + name) + } + storeRegistry[name] = register{ + newFunc: newFunc, + initFunc: initFunc, + isPersistent: persists, + } +} + +func NewTripleStore(name, dbpath string, opts Options) (TripleStore, error) { + r, registered := storeRegistry[name] + if !registered { + return nil, errors.New("triplestore: name '" + name + "' is not registered") + } + return r.newFunc(dbpath, opts) +} + +func InitTripleStore(name, dbpath string, opts Options) error { + r, registered := storeRegistry[name] + if registered { + return r.initFunc(dbpath, opts) + } + return errors.New("triplestore: name '" + name + "' is not registered") +} + +func IsPersistent(name string) bool { + return storeRegistry[name].isPersistent +} + +func TripleStores() []string { + t := make([]string, 0, len(storeRegistry)) + for n := range storeRegistry { + t = append(t, n) + } + return t +} diff --git a/graph/triplestore.go b/graph/triplestore.go deleted file mode 100644 index 741297c..0000000 --- a/graph/triplestore.go +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright 2014 The Cayley Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package graph - -// Defines the TripleStore interface. Every backing store must implement at -// least this interface. -// -// Most of these are pretty straightforward. As long as we can surface this -// interface, the rest of the stack will "just work" and we can connect to any -// triple backing store we prefer. - -import ( - "errors" - - "github.com/barakmich/glog" - "github.com/google/cayley/quad" -) - -// Value defines an opaque "triple store value" type. However the backend wishes -// to implement it, a Value is merely a token to a triple or a node that the -// backing store itself understands, and the base iterators pass around. -// -// For example, in a very traditional, graphd-style graph, these are int64s -// (guids of the primitives). In a very direct sort of graph, these could be -// pointers to structs, or merely triples, or whatever works best for the -// backing store. -// -// These must be comparable, or implement a `Key() interface{}` function -// so that they may be stored in maps. -type Value interface{} - -type TripleStore interface { - // The only way in is through building a transaction, which - // is done by a replication strategy. - ApplyDeltas([]Delta) error - - // Given an opaque token, returns the triple for that token from the store. - Quad(Value) quad.Quad - - // Given a direction and a token, creates an iterator of links which have - // that node token in that directional field. - TripleIterator(quad.Direction, Value) Iterator - - // Returns an iterator enumerating all nodes in the graph. - NodesAllIterator() Iterator - - // Returns an iterator enumerating all links in the graph. - TriplesAllIterator() Iterator - - // Given a node ID, return the opaque token used by the TripleStore - // to represent that id. - ValueOf(string) Value - - // Given an opaque token, return the node that it represents. - NameOf(Value) string - - // Returns the number of triples currently stored. - Size() int64 - - // The last replicated transaction ID that this triplestore has verified. - Horizon() int64 - - // Creates a fixed iterator which can compare Values - FixedIterator() FixedIterator - - // Optimize an iterator in the context of the triple store. - // Suppose we have a better index for the passed tree; this - // gives the TripleStore the opportunity to replace it - // with a more efficient iterator. - OptimizeIterator(it Iterator) (Iterator, bool) - - // Close the triple store and clean up. (Flush to disk, cleanly - // sever connections, etc) - Close() - - // Convenience function for speed. Given a triple token and a direction - // return the node token for that direction. Sometimes, a TripleStore - // can do this without going all the way to the backing store, and - // gives the TripleStore the opportunity to make this optimization. - // - // Iterators will call this. At worst, a valid implementation is - // ts.IdFor(ts.quad.Quad(id).Get(dir)) - TripleDirection(id Value, d quad.Direction) Value -} - -type Options map[string]interface{} - -func (d Options) IntKey(key string) (int, bool) { - if val, ok := d[key]; ok { - switch vv := val.(type) { - case float64: - return int(vv), true - default: - glog.Fatalln("Invalid", key, "parameter type from config.") - } - } - return 0, false -} - -func (d Options) StringKey(key string) (string, bool) { - if val, ok := d[key]; ok { - switch vv := val.(type) { - case string: - return vv, true - default: - glog.Fatalln("Invalid", key, "parameter type from config.") - } - } - return "", false -} - -func (d Options) BoolKey(key string) (bool, bool) { - if val, ok := d[key]; ok { - switch vv := val.(type) { - case bool: - return vv, true - default: - glog.Fatalln("Invalid", key, "parameter type from config.") - } - } - return false, false -} - -var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load") - -type BulkLoader interface { - // BulkLoad loads Quads from a quad.Unmarshaler in bulk to the TripleStore. - // It returns ErrCannotBulkLoad if bulk loading is not possible. For example if - // you cannot load in bulk to a non-empty database, and the db is non-empty. - BulkLoad(quad.Unmarshaler) error -} - -type NewStoreFunc func(string, Options) (TripleStore, error) -type InitStoreFunc func(string, Options) error - -type register struct { - newFunc NewStoreFunc - initFunc InitStoreFunc - isPersistent bool -} - -var storeRegistry = make(map[string]register) - -func RegisterTripleStore(name string, persists bool, newFunc NewStoreFunc, initFunc InitStoreFunc) { - if _, found := storeRegistry[name]; found { - panic("already registered TripleStore " + name) - } - storeRegistry[name] = register{ - newFunc: newFunc, - initFunc: initFunc, - isPersistent: persists, - } -} - -func NewTripleStore(name, dbpath string, opts Options) (TripleStore, error) { - r, registered := storeRegistry[name] - if !registered { - return nil, errors.New("triplestore: name '" + name + "' is not registered") - } - return r.newFunc(dbpath, opts) -} - -func InitTripleStore(name, dbpath string, opts Options) error { - r, registered := storeRegistry[name] - if registered { - return r.initFunc(dbpath, opts) - } - return errors.New("triplestore: name '" + name + "' is not registered") -} - -func IsPersistent(name string) bool { - return storeRegistry[name].isPersistent -} - -func TripleStores() []string { - t := make([]string, 0, len(storeRegistry)) - for n := range storeRegistry { - t = append(t, n) - } - return t -}