diff --git a/.travis.yml b/.travis.yml index 08d2ea7..f4a9eb3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,6 +19,7 @@ install: - go get github.com/syndtr/goleveldb/leveldb/iterator - go get github.com/syndtr/goleveldb/leveldb/opt - go get github.com/syndtr/goleveldb/leveldb/util + - go get github.com/boltdb/bolt - go get gopkg.in/mgo.v2 - go get gopkg.in/mgo.v2/bson diff --git a/README.md b/README.md index 65c0faa..8d7752c 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,9 @@ Its goal is to be a part of the developer's toolbox where [Linked Data](http://l * JavaScript, with a [Gremlin](http://gremlindocs.com/)-inspired\* graph object. * (simplified) [MQL](https://developers.google.com/freebase/v1/mql-overview), for Freebase fans * Plays well with multiple backend stores: - * [LevelDB](http://code.google.com/p/leveldb/) for single-machine storage - * [MongoDB](http://mongodb.org) + * [LevelDB](http://code.google.com/p/leveldb/) + * [Bolt](http://github.com/boltdb/bolt) + * [MongoDB](http://mongodb.org) for distributed stores * In-memory, ephemeral * Modular design; easy to extend with new languages and backends * Good test coverage diff --git a/cayley.go b/cayley.go index 4d97387..74fe033 100644 --- a/cayley.go +++ b/cayley.go @@ -42,6 +42,7 @@ import ( "github.com/google/cayley/quad/nquads" // Load all supported backends. + _ "github.com/google/cayley/graph/bolt" _ "github.com/google/cayley/graph/leveldb" _ "github.com/google/cayley/graph/memstore" _ "github.com/google/cayley/graph/mongo" diff --git a/cayley_test.go b/cayley_test.go index a5c312f..68b303c 100644 --- a/cayley_test.go +++ b/cayley_test.go @@ -18,9 +18,11 @@ import ( "bytes" "compress/bzip2" "compress/gzip" + "flag" "fmt" "github.com/google/cayley/quad" "io" + "os" "reflect" "sort" "strings" @@ -34,6 +36,8 @@ import ( "github.com/google/cayley/query/gremlin" ) +var backend = flag.String("backend", "memstore", "Which backend to test. Loads test data to /tmp if not present.") + var benchmarkQueries = []struct { message string long bool @@ -379,15 +383,42 @@ var ( ) func prepare(t testing.TB) { + switch *backend { + case "memstore": + break + case "leveldb": + fallthrough + case "bolt": + cfg.DatabaseType = *backend + cfg.DatabasePath = fmt.Sprint("/tmp/cayley_test_", *backend) + cfg.DatabaseOptions = map[string]interface{}{ + "nosync": true, // It's a test. If we need to load, do it fast. + } + default: + t.Fatalf("Untestable backend store %s", *backend) + } + var err error create.Do(func() { + needsLoad := true + if graph.IsPersistent(cfg.DatabaseType) { + if _, err := os.Stat(cfg.DatabasePath); os.IsNotExist(err) { + err = db.Init(cfg) + if err != nil { + t.Fatalf("Could not initialize database: %v", err) + } + } else { + needsLoad = false + } + } + handle, err = db.Open(cfg) if err != nil { t.Fatalf("Failed to open %q: %v", cfg.DatabasePath, err) } - if !graph.IsPersistent(cfg.DatabaseType) { - err = load(handle.QuadWriter, cfg, "", "cquad") + if needsLoad { + err = load(handle.QuadWriter, cfg, "30kmoviedata.nq.gz", "cquad") if err != nil { t.Fatalf("Failed to load %q: %v", cfg.DatabasePath, err) } diff --git a/db/db.go b/db/db.go index e5370ee..641e1c7 100644 --- a/db/db.go +++ b/db/db.go @@ -70,6 +70,7 @@ func OpenQuadWriter(qs graph.TripleStore, cfg *config.Config) (graph.QuadWriter, func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error { block := make([]quad.Quad, 0, cfg.LoadSize) + count := 0 for { t, err := dec.Unmarshal() if err != nil { @@ -80,11 +81,19 @@ func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error { } block = append(block, t) if len(block) == cap(block) { + count += len(block) qw.AddQuadSet(block) + if glog.V(2) { + glog.V(2).Infof("Wrote %d quads.", count) + } block = block[:0] } } + count += len(block) qw.AddQuadSet(block) + if glog.V(2) { + glog.V(2).Infof("Wrote %d quads.", count) + } return nil } diff --git a/docs/Configuration.md b/docs/Configuration.md index 4b6b898..bd3bf11 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -23,7 +23,8 @@ All command line flags take precedence over the configuration file. * `mem`: An in-memory store, based on an initial N-Quads file. Loses all changes when the process exits. * `leveldb`: A persistent on-disk store backed by [LevelDB](http://code.google.com/p/leveldb/). - * `mongodb`: Stores the graph data and indices in a [MongoDB](http://mongodb.org) instance. Slower, as it incurs network traffic, but multiple Cayley instances can disappear and reconnect at will, across a potentially horizontally-scaled store. + * `bolt`: Stores the graph data on-disk in a [Bolt](http://github.com/boltdb/bolt) file. Uses more disk space and memory than LevelDB for smaller stores, but is often faster to write to and comparable for large ones, with faster average query times. + * `mongo`: Stores the graph data and indices in a [MongoDB](http://mongodb.org) instance. Slower, as it incurs network traffic, but multiple Cayley instances can disappear and reconnect at will, across a potentially horizontally-scaled store. #### **`db_path`** @@ -32,9 +33,10 @@ All command line flags take precedence over the configuration file. Where does the database actually live? Dependent on the type of database. For each datastore: - * `mem`: Path to a triple file to automatically load - * `leveldb`: Directory to hold the LevelDB database files - * `mongodb`: "hostname:port" of the desired MongoDB server. + * `mem`: Path to a triple file to automatically load. + * `leveldb`: Directory to hold the LevelDB database files. + * `bolt`: Path to the persistent single Bolt database file. + * `mongo`: "hostname:port" of the desired MongoDB server. #### **`listen_host`** @@ -103,8 +105,16 @@ The size in MiB of the LevelDB write cache. Increasing this number allows for mo The size in MiB of the LevelDB block cache. Increasing this number uses more memory to maintain a bigger cache of triple blocks for better performance. +### Bolt -### MongoDB +#### **`nosync`** + + * Type: Boolean + * Default: false + +Optionally disable syncing to disk per transaction. Nosync being true means much faster load times, but without consistency guarantees. + +### Mongo #### **`database_name`** diff --git a/docs/Overview.md b/docs/Overview.md index 300d005..39ded35 100644 --- a/docs/Overview.md +++ b/docs/Overview.md @@ -17,7 +17,8 @@ You can set up a full [configuration file](/docs/Configuration) if you'd prefer, Examples for each backend: * `leveldb`: `./cayley init --db=leveldb --dbpath=/tmp/moviedb` -- where /tmp/moviedb is the path you'd like to store your data. - * `mongodb`: `./cayley init --db=mongodb --dbpath=":"` -- where HOSTNAME and PORT point to your Mongo instance. + * `bolt`: `./cayley init --db=bolt --dbpath=/tmp/moviedb` -- where /tmp/moviedb is the filename where you'd like to store your data. + * `mongo`: `./cayley init --db=mongo --dbpath=":"` -- where HOSTNAME and PORT point to your Mongo instance. Those two options (db and dbpath) are always going to be present. If you feel like not repeating yourself, setting up a configuration file for your backend might be something to do now. There's an example file, `cayley.cfg.example` in the root directory. diff --git a/graph/bolt/all_iterator.go b/graph/bolt/all_iterator.go new file mode 100644 index 0000000..1214da5 --- /dev/null +++ b/graph/bolt/all_iterator.go @@ -0,0 +1,201 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "bytes" + "fmt" + "strings" + + "github.com/barakmich/glog" + "github.com/boltdb/bolt" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +type AllIterator struct { + uid uint64 + tags graph.Tagger + bucket []byte + dir quad.Direction + qs *QuadStore + result *Token + buffer [][]byte + offset int + done bool +} + +func NewAllIterator(bucket []byte, d quad.Direction, qs *QuadStore) *AllIterator { + return &AllIterator{ + uid: iterator.NextUID(), + bucket: bucket, + dir: d, + qs: qs, + } +} + +func (it *AllIterator) UID() uint64 { + return it.uid +} + +func (it *AllIterator) Reset() { + it.buffer = nil + it.offset = 0 + it.done = false +} + +func (it *AllIterator) Tagger() *graph.Tagger { + return &it.tags +} + +func (it *AllIterator) TagResults(dst map[string]graph.Value) { + for _, tag := range it.tags.Tags() { + dst[tag] = it.Result() + } + + for tag, value := range it.tags.Fixed() { + dst[tag] = value + } +} + +func (it *AllIterator) Clone() graph.Iterator { + out := NewAllIterator(it.bucket, it.dir, it.qs) + out.tags.CopyFrom(it) + return out +} + +func (it *AllIterator) Next() bool { + if it.done { + return false + } + if len(it.buffer) <= it.offset+1 { + it.offset = 0 + var last []byte + if it.buffer != nil { + last = it.buffer[len(it.buffer)-1] + } + it.buffer = make([][]byte, 0, bufferSize) + err := it.qs.db.View(func(tx *bolt.Tx) error { + i := 0 + b := tx.Bucket(it.bucket) + cur := b.Cursor() + if last == nil { + k, _ := cur.First() + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } else { + k, _ := cur.Seek(last) + if !bytes.Equal(k, last) { + return fmt.Errorf("Couldn't pick up after", k) + } + } + for i < bufferSize { + k, _ := cur.Next() + if k == nil { + it.buffer = append(it.buffer, k) + break + } + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } + return nil + }) + if err != nil { + glog.Error("Error nexting in database: ", err) + it.done = true + return false + } + } else { + it.offset++ + } + if it.Result() == nil { + it.done = true + return false + } + return true +} + +func (it *AllIterator) ResultTree() *graph.ResultTree { + return graph.NewResultTree(it.Result()) +} + +func (it *AllIterator) Result() graph.Value { + if it.done { + return nil + } + if it.result != nil { + return it.result + } + if it.offset >= len(it.buffer) { + return nil + } + if it.buffer[it.offset] == nil { + return nil + } + return &Token{bucket: it.bucket, key: it.buffer[it.offset]} +} + +func (it *AllIterator) NextPath() bool { + return false +} + +// No subiterators. +func (it *AllIterator) SubIterators() []graph.Iterator { + return nil +} + +func (it *AllIterator) Contains(v graph.Value) bool { + it.result = v.(*Token) + return true +} + +func (it *AllIterator) Close() { + it.result = nil + it.buffer = nil + it.done = true +} + +func (it *AllIterator) Size() (int64, bool) { + return it.qs.size, true +} + +func (it *AllIterator) DebugString(indent int) string { + size, _ := it.Size() + return fmt.Sprintf("%s(%s tags: %v bolt size:%d %s %p)", strings.Repeat(" ", indent), it.Type(), it.tags.Tags(), size, it.dir, it) +} + +func (it *AllIterator) Type() graph.Type { return graph.All } +func (it *AllIterator) Sorted() bool { return false } + +func (it *AllIterator) Optimize() (graph.Iterator, bool) { + return it, false +} + +func (it *AllIterator) Stats() graph.IteratorStats { + s, _ := it.Size() + return graph.IteratorStats{ + ContainsCost: 1, + NextCost: 2, + Size: s, + } +} diff --git a/graph/bolt/iterator.go b/graph/bolt/iterator.go new file mode 100644 index 0000000..a53f6ed --- /dev/null +++ b/graph/bolt/iterator.go @@ -0,0 +1,320 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/barakmich/glog" + "github.com/boltdb/bolt" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +var ( + boltType graph.Type + bufferSize = 50 + errNotExist = errors.New("Quad does not exist") +) + +func init() { + boltType = graph.RegisterIterator("bolt") +} + +type Iterator struct { + uid uint64 + tags graph.Tagger + bucket []byte + checkId []byte + dir quad.Direction + qs *QuadStore + result *Token + buffer [][]byte + offset int + done bool + size int64 +} + +func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) *Iterator { + tok := value.(*Token) + if !bytes.Equal(tok.bucket, nodeBucket) { + glog.Error("Creating an iterator from a non-node value.") + return &Iterator{done: true} + } + + it := Iterator{ + uid: iterator.NextUID(), + bucket: bucket, + dir: d, + qs: qs, + size: qs.SizeOf(value), + } + + it.checkId = make([]byte, len(tok.key)) + copy(it.checkId, tok.key) + + return &it +} + +func Type() graph.Type { return boltType } + +func (it *Iterator) UID() uint64 { + return it.uid +} + +func (it *Iterator) Reset() { + it.buffer = nil + it.offset = 0 + it.done = false +} + +func (it *Iterator) Tagger() *graph.Tagger { + return &it.tags +} + +func (it *Iterator) TagResults(dst map[string]graph.Value) { + for _, tag := range it.tags.Tags() { + dst[tag] = it.Result() + } + + for tag, value := range it.tags.Fixed() { + dst[tag] = value + } +} + +func (it *Iterator) Clone() graph.Iterator { + out := NewIterator(it.bucket, it.dir, &Token{nodeBucket, it.checkId}, it.qs) + out.Tagger().CopyFrom(it) + return out +} + +func (it *Iterator) Close() { + it.result = nil + it.buffer = nil + it.done = true +} + +func (it *Iterator) isLiveValue(val []byte) bool { + var entry IndexEntry + json.Unmarshal(val, &entry) + return len(entry.History)%2 != 0 +} + +func (it *Iterator) Next() bool { + if it.done { + return false + } + if len(it.buffer) <= it.offset+1 { + it.offset = 0 + var last []byte + if it.buffer != nil { + last = it.buffer[len(it.buffer)-1] + } + it.buffer = make([][]byte, 0, bufferSize) + err := it.qs.db.View(func(tx *bolt.Tx) error { + i := 0 + b := tx.Bucket(it.bucket) + cur := b.Cursor() + if last == nil { + k, _ := cur.Seek(it.checkId) + if bytes.HasPrefix(k, it.checkId) { + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } else { + it.buffer = append(it.buffer, nil) + return errNotExist + } + } else { + k, _ := cur.Seek(last) + if !bytes.Equal(k, last) { + return fmt.Errorf("Couldn't pick up after", k) + } + } + for i < bufferSize { + k, v := cur.Next() + if k == nil || !bytes.HasPrefix(k, it.checkId) { + it.buffer = append(it.buffer, nil) + break + } + if !it.isLiveValue(v) { + continue + } + var out []byte + out = make([]byte, len(k)) + copy(out, k) + it.buffer = append(it.buffer, out) + i++ + } + return nil + }) + if err != nil { + if err != errNotExist { + glog.Error("Error nexting in database: ", err) + } + it.done = true + return false + } + } else { + it.offset++ + } + if it.Result() == nil { + it.done = true + return false + } + return true +} + +func (it *Iterator) ResultTree() *graph.ResultTree { + return graph.NewResultTree(it.Result()) +} + +func (it *Iterator) Result() graph.Value { + if it.done { + return nil + } + if it.result != nil { + return it.result + } + if it.offset >= len(it.buffer) { + return nil + } + if it.buffer[it.offset] == nil { + return nil + } + return &Token{bucket: it.bucket, key: it.buffer[it.offset]} +} + +func (it *Iterator) NextPath() bool { + return false +} + +// No subiterators. +func (it *Iterator) SubIterators() []graph.Iterator { + return nil +} + +func PositionOf(tok *Token, d quad.Direction, qs *QuadStore) int { + if bytes.Equal(tok.bucket, spoBucket) { + switch d { + case quad.Subject: + return 0 + case quad.Predicate: + return hashSize + case quad.Object: + return 2 * hashSize + case quad.Label: + return 3 * hashSize + } + } + if bytes.Equal(tok.bucket, posBucket) { + switch d { + case quad.Subject: + return 2 * hashSize + case quad.Predicate: + return 0 + case quad.Object: + return hashSize + case quad.Label: + return 3 * hashSize + } + } + if bytes.Equal(tok.bucket, ospBucket) { + switch d { + case quad.Subject: + return hashSize + case quad.Predicate: + return 2 * hashSize + case quad.Object: + return 0 + case quad.Label: + return 3 * hashSize + } + } + if bytes.Equal(tok.bucket, cpsBucket) { + switch d { + case quad.Subject: + return 2 * hashSize + case quad.Predicate: + return hashSize + case quad.Object: + return 3 * hashSize + case quad.Label: + return 0 + } + } + panic("unreachable") +} + +func (it *Iterator) Contains(v graph.Value) bool { + val := v.(*Token) + if bytes.Equal(val.bucket, nodeBucket) { + return false + } + offset := PositionOf(val, it.dir, it.qs) + if bytes.HasPrefix(val.key[offset:], it.checkId) { + // You may ask, why don't we check to see if it's a valid (not deleted) triple + // again? + // + // We've already done that -- in order to get the graph.Value token in the + // first place, we had to have done the check already; it came from a Next(). + // + // However, if it ever starts coming from somewhere else, it'll be more + // efficient to change the interface of the graph.Value for LevelDB to a + // struct with a flag for isValid, to save another random read. + return true + } + return false +} + +func (it *Iterator) Size() (int64, bool) { + return it.size, true +} + +func (it *Iterator) DebugString(indent int) string { + return fmt.Sprintf("%s(%s %d tags: %v dir: %s size:%d %s)", + strings.Repeat(" ", indent), + it.Type(), + it.UID(), + it.tags.Tags(), + it.dir, + it.size, + it.qs.NameOf(&Token{it.bucket, it.checkId}), + ) +} + +func (it *Iterator) Type() graph.Type { return boltType } +func (it *Iterator) Sorted() bool { return false } + +func (it *Iterator) Optimize() (graph.Iterator, bool) { + return it, false +} + +func (it *Iterator) Stats() graph.IteratorStats { + s, _ := it.Size() + return graph.IteratorStats{ + ContainsCost: 1, + NextCost: 4, + Size: s, + } +} diff --git a/graph/bolt/quadstore.go b/graph/bolt/quadstore.go new file mode 100644 index 0000000..7c3cf60 --- /dev/null +++ b/graph/bolt/quadstore.go @@ -0,0 +1,507 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "bytes" + "crypto/sha1" + "encoding/binary" + "encoding/json" + "fmt" + "hash" + "sync" + + "github.com/barakmich/glog" + "github.com/boltdb/bolt" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +func init() { + graph.RegisterTripleStore("bolt", true, newQuadStore, createNewBolt) +} + +var ( + hashPool = sync.Pool{ + New: func() interface{} { return sha1.New() }, + } + hashSize = sha1.Size + localFillPercent = 0.7 +) + +type Token struct { + bucket []byte + key []byte +} + +func (t *Token) Key() interface{} { + return fmt.Sprint(t.bucket, t.key) +} + +type QuadStore struct { + db *bolt.DB + path string + open bool + size int64 + horizon int64 +} + +func createNewBolt(path string, _ graph.Options) error { + db, err := bolt.Open(path, 0600, nil) + if err != nil { + glog.Errorf("Error: couldn't create Bolt database: %v", err) + return err + } + defer db.Close() + qs := &QuadStore{} + qs.db = db + err = qs.createBuckets() + if err != nil { + return err + } + qs.Close() + return nil +} + +func newQuadStore(path string, options graph.Options) (graph.TripleStore, error) { + var qs QuadStore + var err error + db, err := bolt.Open(path, 0600, nil) + if err != nil { + glog.Errorln("Error, couldn't open! ", err) + return nil, err + } + qs.db = db + // BoolKey returns false on non-existence. IE, Sync by default. + qs.db.NoSync, _ = options.BoolKey("nosync") + err = qs.getMetadata() + if err != nil { + return nil, err + } + return &qs, nil +} + +func (qs *QuadStore) createBuckets() error { + return qs.db.Update(func(tx *bolt.Tx) error { + var err error + for _, index := range [][4]quad.Direction{spo, osp, pos, cps} { + _, err = tx.CreateBucket(bucketFor(index)) + if err != nil { + return fmt.Errorf("Couldn't create bucket: %s", err) + } + } + _, err = tx.CreateBucket(logBucket) + if err != nil { + return fmt.Errorf("Couldn't create bucket: %s", err) + } + _, err = tx.CreateBucket(nodeBucket) + if err != nil { + return fmt.Errorf("Couldn't create bucket: %s", err) + } + _, err = tx.CreateBucket(metaBucket) + if err != nil { + return fmt.Errorf("Couldn't create bucket: %s", err) + } + return nil + }) +} + +func (qs *QuadStore) Size() int64 { + return qs.size +} + +func (qs *QuadStore) Horizon() int64 { + return qs.horizon +} + +func (qs *QuadStore) createDeltaKeyFor(id int64) []byte { + return []byte(fmt.Sprintf("%018x", id)) +} + +func bucketFor(d [4]quad.Direction) []byte { + return []byte{d[0].Prefix(), d[1].Prefix(), d[2].Prefix(), d[3].Prefix()} +} + +func (qs *QuadStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { + key := make([]byte, 0, (hashSize * 4)) + key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) + return key +} + +func (qs *QuadStore) createValueKeyFor(s string) []byte { + key := make([]byte, 0, hashSize) + key = append(key, qs.convertStringToByteHash(s)...) + return key +} + +type IndexEntry struct { + History []int64 +} + +var ( + // Short hand for direction permutations. + spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} + osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label} + pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label} + cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} + + // Byte arrays for each bucket name. + spoBucket = bucketFor(spo) + ospBucket = bucketFor(osp) + posBucket = bucketFor(pos) + cpsBucket = bucketFor(cps) + logBucket = []byte("log") + nodeBucket = []byte("node") + metaBucket = []byte("meta") +) + +func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta) error { + old_size := qs.size + old_horizon := qs.horizon + err := qs.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(logBucket) + b.FillPercent = localFillPercent + resizeMap := make(map[string]int64) + size_change := int64(0) + for _, d := range deltas { + bytes, err := json.Marshal(d) + if err != nil { + return err + } + err = b.Put(qs.createDeltaKeyFor(d.ID), bytes) + if err != nil { + return err + } + } + for _, d := range deltas { + err := qs.buildQuadWrite(tx, d.Quad, d.ID, d.Action == graph.Add) + if err != nil { + return err + } + delta := int64(1) + if d.Action == graph.Delete { + delta = int64(-1) + } + resizeMap[d.Quad.Subject] += delta + resizeMap[d.Quad.Predicate] += delta + resizeMap[d.Quad.Object] += delta + if d.Quad.Label != "" { + resizeMap[d.Quad.Label] += delta + } + size_change += delta + qs.horizon = d.ID + } + for k, v := range resizeMap { + if v != 0 { + err := qs.UpdateValueKeyBy(k, v, tx) + if err != nil { + return err + } + } + } + qs.size += size_change + return qs.WriteHorizonAndSize(tx) + }) + + if err != nil { + glog.Error("Couldn't write to DB for Delta set. Error: ", err) + qs.horizon = old_horizon + qs.size = old_size + return err + } + return nil +} + +func (qs *QuadStore) buildQuadWrite(tx *bolt.Tx, q quad.Quad, id int64, isAdd bool) error { + var entry IndexEntry + b := tx.Bucket(spoBucket) + b.FillPercent = localFillPercent + data := b.Get(qs.createKeyFor(spo, q)) + if data != nil { + // We got something. + err := json.Unmarshal(data, &entry) + if err != nil { + return err + } + } + + if isAdd && len(entry.History)%2 == 1 { + glog.Error("Adding a valid triple ", entry) + return graph.ErrQuadExists + } + if !isAdd && len(entry.History)%2 == 0 { + glog.Error("Deleting an invalid triple ", entry) + return graph.ErrQuadNotExist + } + + entry.History = append(entry.History, id) + + jsonbytes, err := json.Marshal(entry) + if err != nil { + glog.Errorf("Couldn't write to buffer for entry %#v: %s", entry, err) + return err + } + for _, index := range [][4]quad.Direction{spo, osp, pos, cps} { + if index == cps && q.Get(quad.Label) == "" { + continue + } + b := tx.Bucket(bucketFor(index)) + b.FillPercent = localFillPercent + err = b.Put(qs.createKeyFor(index, q), jsonbytes) + if err != nil { + return err + } + } + return nil +} + +type ValueData struct { + Name string + Size int64 +} + +func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, tx *bolt.Tx) error { + value := ValueData{name, amount} + b := tx.Bucket(nodeBucket) + b.FillPercent = localFillPercent + key := qs.createValueKeyFor(name) + data := b.Get(key) + + if data != nil { + // Node exists in the database -- unmarshal and update. + err := json.Unmarshal(data, &value) + if err != nil { + glog.Errorf("Error: couldn't reconstruct value: %v", err) + return err + } + value.Size += amount + } + + // Are we deleting something? + if value.Size <= 0 { + value.Size = 0 + } + + // Repackage and rewrite. + bytes, err := json.Marshal(&value) + if err != nil { + glog.Errorf("Couldn't write to buffer for value %s: %s", name, err) + return err + } + err = b.Put(key, bytes) + return err +} + +func (qs *QuadStore) WriteHorizonAndSize(tx *bolt.Tx) error { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, qs.size) + if err != nil { + glog.Errorf("Couldn't convert size!") + return err + } + b := tx.Bucket(metaBucket) + b.FillPercent = localFillPercent + werr := b.Put([]byte("size"), buf.Bytes()) + if werr != nil { + glog.Error("Couldn't write size!") + return werr + } + buf.Reset() + err = binary.Write(buf, binary.LittleEndian, qs.horizon) + + if err != nil { + glog.Errorf("Couldn't convert horizon!") + } + + werr = b.Put([]byte("horizon"), buf.Bytes()) + + if werr != nil { + glog.Error("Couldn't write horizon!") + return werr + } + return err +} + +func (qs *QuadStore) Close() { + qs.db.Update(func(tx *bolt.Tx) error { + return qs.WriteHorizonAndSize(tx) + }) + qs.db.Close() + qs.open = false +} + +func (qs *QuadStore) Quad(k graph.Value) quad.Quad { + var q quad.Quad + tok := k.(*Token) + err := qs.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(tok.bucket) + data := b.Get(tok.key) + if data == nil { + return nil + } + var in IndexEntry + err := json.Unmarshal(data, &in) + if err != nil { + return err + } + if len(in.History) == 0 { + return nil + } + b = tx.Bucket(logBucket) + data = b.Get(qs.createDeltaKeyFor(in.History[len(in.History)-1])) + if data == nil { + // No harm, no foul. + return nil + } + return json.Unmarshal(data, &q) + }) + if err != nil { + glog.Error("Error getting triple: ", err) + return quad.Quad{} + } + return q +} + +func (qs *QuadStore) convertStringToByteHash(s string) []byte { + h := hashPool.Get().(hash.Hash) + h.Reset() + defer hashPool.Put(h) + key := make([]byte, 0, hashSize) + h.Write([]byte(s)) + key = h.Sum(key) + return key +} + +func (qs *QuadStore) ValueOf(s string) graph.Value { + return &Token{ + bucket: nodeBucket, + key: qs.createValueKeyFor(s), + } +} + +func (qs *QuadStore) valueData(t *Token) ValueData { + var out ValueData + if glog.V(3) { + glog.V(3).Infof("%s %v", string(t.bucket), t.key) + } + err := qs.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(t.bucket) + data := b.Get(t.key) + if data != nil { + return json.Unmarshal(data, &out) + } + return nil + }) + if err != nil { + glog.Errorln("Error: couldn't get value") + return ValueData{} + } + return out +} + +func (qs *QuadStore) NameOf(k graph.Value) string { + if k == nil { + glog.V(2).Info("k was nil") + return "" + } + return qs.valueData(k.(*Token)).Name +} + +func (qs *QuadStore) SizeOf(k graph.Value) int64 { + if k == nil { + return 0 + } + return int64(qs.valueData(k.(*Token)).Size) +} + +func (qs *QuadStore) getInt64ForKey(tx *bolt.Tx, key string, empty int64) (int64, error) { + var out int64 + b := tx.Bucket(metaBucket) + data := b.Get([]byte(key)) + if data == nil { + return empty, nil + } + buf := bytes.NewBuffer(data) + err := binary.Read(buf, binary.LittleEndian, &out) + if err != nil { + return 0, err + } + return out, nil +} + +func (qs *QuadStore) getMetadata() error { + err := qs.db.View(func(tx *bolt.Tx) error { + var err error + qs.size, err = qs.getInt64ForKey(tx, "size", 0) + if err != nil { + return err + } + qs.horizon, err = qs.getInt64ForKey(tx, "horizon", 0) + return err + }) + return err +} + +func (qs *QuadStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { + var bucket []byte + switch d { + case quad.Subject: + bucket = spoBucket + case quad.Predicate: + bucket = posBucket + case quad.Object: + bucket = ospBucket + case quad.Label: + bucket = cpsBucket + default: + panic("unreachable " + d.String()) + } + return NewIterator(bucket, d, val, qs) +} + +func (qs *QuadStore) NodesAllIterator() graph.Iterator { + return NewAllIterator(nodeBucket, quad.Any, qs) +} + +func (qs *QuadStore) TriplesAllIterator() graph.Iterator { + return NewAllIterator(posBucket, quad.Predicate, qs) +} + +func (qs *QuadStore) TripleDirection(val graph.Value, d quad.Direction) graph.Value { + v := val.(*Token) + offset := PositionOf(v, d, qs) + if offset != -1 { + return &Token{ + bucket: nodeBucket, + key: v.key[offset : offset+hashSize], + } + } + return qs.ValueOf(qs.Quad(v).Get(d)) +} + +func compareTokens(a, b graph.Value) bool { + atok := a.(*Token) + btok := b.(*Token) + return bytes.Equal(atok.key, btok.key) && bytes.Equal(atok.bucket, btok.bucket) +} + +func (qs *QuadStore) FixedIterator() graph.FixedIterator { + return iterator.NewFixedIteratorWithCompare(compareTokens) +} diff --git a/graph/bolt/quadstore_iterator_optimize.go b/graph/bolt/quadstore_iterator_optimize.go new file mode 100644 index 0000000..e893020 --- /dev/null +++ b/graph/bolt/quadstore_iterator_optimize.go @@ -0,0 +1,55 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" +) + +func (ts *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { + switch it.Type() { + case graph.LinksTo: + return ts.optimizeLinksTo(it.(*iterator.LinksTo)) + + } + return it, false +} + +func (ts *QuadStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { + subs := it.SubIterators() + if len(subs) != 1 { + return it, false + } + primary := subs[0] + if primary.Type() == graph.Fixed { + size, _ := primary.Size() + if size == 1 { + if !graph.Next(primary) { + panic("unexpected size during optimize") + } + val := primary.Result() + newIt := ts.TripleIterator(it.Direction(), val) + nt := newIt.Tagger() + nt.CopyFrom(it) + for _, tag := range primary.Tagger().Tags() { + nt.AddFixed(tag, val) + } + it.Close() + return newIt, true + } + } + return it, false +} diff --git a/graph/triplestore.go b/graph/triplestore.go index df85124..741297c 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -121,6 +121,18 @@ func (d Options) StringKey(key string) (string, bool) { return "", false } +func (d Options) BoolKey(key string) (bool, bool) { + if val, ok := d[key]; ok { + switch vv := val.(type) { + case bool: + return vv, true + default: + glog.Fatalln("Invalid", key, "parameter type from config.") + } + } + return false, false +} + var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load") type BulkLoader interface {