Merge pull request #128 from barakmich/boltdb

Add BoltDB backend
This commit is contained in:
Barak Michener 2014-08-23 18:11:28 -04:00
commit 89a03bfe8f
12 changed files with 1159 additions and 10 deletions

View file

@ -19,6 +19,7 @@ install:
- go get github.com/syndtr/goleveldb/leveldb/iterator
- go get github.com/syndtr/goleveldb/leveldb/opt
- go get github.com/syndtr/goleveldb/leveldb/util
- go get github.com/boltdb/bolt
- go get gopkg.in/mgo.v2
- go get gopkg.in/mgo.v2/bson

View file

@ -27,8 +27,9 @@ Its goal is to be a part of the developer's toolbox where [Linked Data](http://l
* JavaScript, with a [Gremlin](http://gremlindocs.com/)-inspired\* graph object.
* (simplified) [MQL](https://developers.google.com/freebase/v1/mql-overview), for Freebase fans
* Plays well with multiple backend stores:
* [LevelDB](http://code.google.com/p/leveldb/) for single-machine storage
* [MongoDB](http://mongodb.org)
* [LevelDB](http://code.google.com/p/leveldb/)
* [Bolt](http://github.com/boltdb/bolt)
* [MongoDB](http://mongodb.org) for distributed stores
* In-memory, ephemeral
* Modular design; easy to extend with new languages and backends
* Good test coverage

View file

@ -42,6 +42,7 @@ import (
"github.com/google/cayley/quad/nquads"
// Load all supported backends.
_ "github.com/google/cayley/graph/bolt"
_ "github.com/google/cayley/graph/leveldb"
_ "github.com/google/cayley/graph/memstore"
_ "github.com/google/cayley/graph/mongo"

View file

@ -18,9 +18,11 @@ import (
"bytes"
"compress/bzip2"
"compress/gzip"
"flag"
"fmt"
"github.com/google/cayley/quad"
"io"
"os"
"reflect"
"sort"
"strings"
@ -34,6 +36,8 @@ import (
"github.com/google/cayley/query/gremlin"
)
var backend = flag.String("backend", "memstore", "Which backend to test. Loads test data to /tmp if not present.")
var benchmarkQueries = []struct {
message string
long bool
@ -379,15 +383,42 @@ var (
)
func prepare(t testing.TB) {
switch *backend {
case "memstore":
break
case "leveldb":
fallthrough
case "bolt":
cfg.DatabaseType = *backend
cfg.DatabasePath = fmt.Sprint("/tmp/cayley_test_", *backend)
cfg.DatabaseOptions = map[string]interface{}{
"nosync": true, // It's a test. If we need to load, do it fast.
}
default:
t.Fatalf("Untestable backend store %s", *backend)
}
var err error
create.Do(func() {
needsLoad := true
if graph.IsPersistent(cfg.DatabaseType) {
if _, err := os.Stat(cfg.DatabasePath); os.IsNotExist(err) {
err = db.Init(cfg)
if err != nil {
t.Fatalf("Could not initialize database: %v", err)
}
} else {
needsLoad = false
}
}
handle, err = db.Open(cfg)
if err != nil {
t.Fatalf("Failed to open %q: %v", cfg.DatabasePath, err)
}
if !graph.IsPersistent(cfg.DatabaseType) {
err = load(handle.QuadWriter, cfg, "", "cquad")
if needsLoad {
err = load(handle.QuadWriter, cfg, "30kmoviedata.nq.gz", "cquad")
if err != nil {
t.Fatalf("Failed to load %q: %v", cfg.DatabasePath, err)
}

View file

@ -70,6 +70,7 @@ func OpenQuadWriter(qs graph.TripleStore, cfg *config.Config) (graph.QuadWriter,
func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error {
block := make([]quad.Quad, 0, cfg.LoadSize)
count := 0
for {
t, err := dec.Unmarshal()
if err != nil {
@ -80,11 +81,19 @@ func Load(qw graph.QuadWriter, cfg *config.Config, dec quad.Unmarshaler) error {
}
block = append(block, t)
if len(block) == cap(block) {
count += len(block)
qw.AddQuadSet(block)
if glog.V(2) {
glog.V(2).Infof("Wrote %d quads.", count)
}
block = block[:0]
}
}
count += len(block)
qw.AddQuadSet(block)
if glog.V(2) {
glog.V(2).Infof("Wrote %d quads.", count)
}
return nil
}

View file

@ -23,7 +23,8 @@ All command line flags take precedence over the configuration file.
* `mem`: An in-memory store, based on an initial N-Quads file. Loses all changes when the process exits.
* `leveldb`: A persistent on-disk store backed by [LevelDB](http://code.google.com/p/leveldb/).
* `mongodb`: Stores the graph data and indices in a [MongoDB](http://mongodb.org) instance. Slower, as it incurs network traffic, but multiple Cayley instances can disappear and reconnect at will, across a potentially horizontally-scaled store.
* `bolt`: Stores the graph data on-disk in a [Bolt](http://github.com/boltdb/bolt) file. Uses more disk space and memory than LevelDB for smaller stores, but is often faster to write to and comparable for large ones, with faster average query times.
* `mongo`: Stores the graph data and indices in a [MongoDB](http://mongodb.org) instance. Slower, as it incurs network traffic, but multiple Cayley instances can disappear and reconnect at will, across a potentially horizontally-scaled store.
#### **`db_path`**
@ -32,9 +33,10 @@ All command line flags take precedence over the configuration file.
Where does the database actually live? Dependent on the type of database. For each datastore:
* `mem`: Path to a triple file to automatically load
* `leveldb`: Directory to hold the LevelDB database files
* `mongodb`: "hostname:port" of the desired MongoDB server.
* `mem`: Path to a triple file to automatically load.
* `leveldb`: Directory to hold the LevelDB database files.
* `bolt`: Path to the persistent single Bolt database file.
* `mongo`: "hostname:port" of the desired MongoDB server.
#### **`listen_host`**
@ -103,8 +105,16 @@ The size in MiB of the LevelDB write cache. Increasing this number allows for mo
The size in MiB of the LevelDB block cache. Increasing this number uses more memory to maintain a bigger cache of triple blocks for better performance.
### Bolt
### MongoDB
#### **`nosync`**
* Type: Boolean
* Default: false
Optionally disable syncing to disk per transaction. Nosync being true means much faster load times, but without consistency guarantees.
### Mongo
#### **`database_name`**

View file

@ -17,7 +17,8 @@ You can set up a full [configuration file](/docs/Configuration) if you'd prefer,
Examples for each backend:
* `leveldb`: `./cayley init --db=leveldb --dbpath=/tmp/moviedb` -- where /tmp/moviedb is the path you'd like to store your data.
* `mongodb`: `./cayley init --db=mongodb --dbpath="<HOSTNAME>:<PORT>"` -- where HOSTNAME and PORT point to your Mongo instance.
* `bolt`: `./cayley init --db=bolt --dbpath=/tmp/moviedb` -- where /tmp/moviedb is the filename where you'd like to store your data.
* `mongo`: `./cayley init --db=mongo --dbpath="<HOSTNAME>:<PORT>"` -- where HOSTNAME and PORT point to your Mongo instance.
Those two options (db and dbpath) are always going to be present. If you feel like not repeating yourself, setting up a configuration file for your backend might be something to do now. There's an example file, `cayley.cfg.example` in the root directory.

201
graph/bolt/all_iterator.go Normal file
View file

@ -0,0 +1,201 @@
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"bytes"
"fmt"
"strings"
"github.com/barakmich/glog"
"github.com/boltdb/bolt"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
type AllIterator struct {
uid uint64
tags graph.Tagger
bucket []byte
dir quad.Direction
qs *QuadStore
result *Token
buffer [][]byte
offset int
done bool
}
func NewAllIterator(bucket []byte, d quad.Direction, qs *QuadStore) *AllIterator {
return &AllIterator{
uid: iterator.NextUID(),
bucket: bucket,
dir: d,
qs: qs,
}
}
func (it *AllIterator) UID() uint64 {
return it.uid
}
func (it *AllIterator) Reset() {
it.buffer = nil
it.offset = 0
it.done = false
}
func (it *AllIterator) Tagger() *graph.Tagger {
return &it.tags
}
func (it *AllIterator) TagResults(dst map[string]graph.Value) {
for _, tag := range it.tags.Tags() {
dst[tag] = it.Result()
}
for tag, value := range it.tags.Fixed() {
dst[tag] = value
}
}
func (it *AllIterator) Clone() graph.Iterator {
out := NewAllIterator(it.bucket, it.dir, it.qs)
out.tags.CopyFrom(it)
return out
}
func (it *AllIterator) Next() bool {
if it.done {
return false
}
if len(it.buffer) <= it.offset+1 {
it.offset = 0
var last []byte
if it.buffer != nil {
last = it.buffer[len(it.buffer)-1]
}
it.buffer = make([][]byte, 0, bufferSize)
err := it.qs.db.View(func(tx *bolt.Tx) error {
i := 0
b := tx.Bucket(it.bucket)
cur := b.Cursor()
if last == nil {
k, _ := cur.First()
var out []byte
out = make([]byte, len(k))
copy(out, k)
it.buffer = append(it.buffer, out)
i++
} else {
k, _ := cur.Seek(last)
if !bytes.Equal(k, last) {
return fmt.Errorf("Couldn't pick up after", k)
}
}
for i < bufferSize {
k, _ := cur.Next()
if k == nil {
it.buffer = append(it.buffer, k)
break
}
var out []byte
out = make([]byte, len(k))
copy(out, k)
it.buffer = append(it.buffer, out)
i++
}
return nil
})
if err != nil {
glog.Error("Error nexting in database: ", err)
it.done = true
return false
}
} else {
it.offset++
}
if it.Result() == nil {
it.done = true
return false
}
return true
}
func (it *AllIterator) ResultTree() *graph.ResultTree {
return graph.NewResultTree(it.Result())
}
func (it *AllIterator) Result() graph.Value {
if it.done {
return nil
}
if it.result != nil {
return it.result
}
if it.offset >= len(it.buffer) {
return nil
}
if it.buffer[it.offset] == nil {
return nil
}
return &Token{bucket: it.bucket, key: it.buffer[it.offset]}
}
func (it *AllIterator) NextPath() bool {
return false
}
// No subiterators.
func (it *AllIterator) SubIterators() []graph.Iterator {
return nil
}
func (it *AllIterator) Contains(v graph.Value) bool {
it.result = v.(*Token)
return true
}
func (it *AllIterator) Close() {
it.result = nil
it.buffer = nil
it.done = true
}
func (it *AllIterator) Size() (int64, bool) {
return it.qs.size, true
}
func (it *AllIterator) DebugString(indent int) string {
size, _ := it.Size()
return fmt.Sprintf("%s(%s tags: %v bolt size:%d %s %p)", strings.Repeat(" ", indent), it.Type(), it.tags.Tags(), size, it.dir, it)
}
func (it *AllIterator) Type() graph.Type { return graph.All }
func (it *AllIterator) Sorted() bool { return false }
func (it *AllIterator) Optimize() (graph.Iterator, bool) {
return it, false
}
func (it *AllIterator) Stats() graph.IteratorStats {
s, _ := it.Size()
return graph.IteratorStats{
ContainsCost: 1,
NextCost: 2,
Size: s,
}
}

320
graph/bolt/iterator.go Normal file
View file

@ -0,0 +1,320 @@
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/barakmich/glog"
"github.com/boltdb/bolt"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
var (
boltType graph.Type
bufferSize = 50
errNotExist = errors.New("Quad does not exist")
)
func init() {
boltType = graph.RegisterIterator("bolt")
}
type Iterator struct {
uid uint64
tags graph.Tagger
bucket []byte
checkId []byte
dir quad.Direction
qs *QuadStore
result *Token
buffer [][]byte
offset int
done bool
size int64
}
func NewIterator(bucket []byte, d quad.Direction, value graph.Value, qs *QuadStore) *Iterator {
tok := value.(*Token)
if !bytes.Equal(tok.bucket, nodeBucket) {
glog.Error("Creating an iterator from a non-node value.")
return &Iterator{done: true}
}
it := Iterator{
uid: iterator.NextUID(),
bucket: bucket,
dir: d,
qs: qs,
size: qs.SizeOf(value),
}
it.checkId = make([]byte, len(tok.key))
copy(it.checkId, tok.key)
return &it
}
func Type() graph.Type { return boltType }
func (it *Iterator) UID() uint64 {
return it.uid
}
func (it *Iterator) Reset() {
it.buffer = nil
it.offset = 0
it.done = false
}
func (it *Iterator) Tagger() *graph.Tagger {
return &it.tags
}
func (it *Iterator) TagResults(dst map[string]graph.Value) {
for _, tag := range it.tags.Tags() {
dst[tag] = it.Result()
}
for tag, value := range it.tags.Fixed() {
dst[tag] = value
}
}
func (it *Iterator) Clone() graph.Iterator {
out := NewIterator(it.bucket, it.dir, &Token{nodeBucket, it.checkId}, it.qs)
out.Tagger().CopyFrom(it)
return out
}
func (it *Iterator) Close() {
it.result = nil
it.buffer = nil
it.done = true
}
func (it *Iterator) isLiveValue(val []byte) bool {
var entry IndexEntry
json.Unmarshal(val, &entry)
return len(entry.History)%2 != 0
}
func (it *Iterator) Next() bool {
if it.done {
return false
}
if len(it.buffer) <= it.offset+1 {
it.offset = 0
var last []byte
if it.buffer != nil {
last = it.buffer[len(it.buffer)-1]
}
it.buffer = make([][]byte, 0, bufferSize)
err := it.qs.db.View(func(tx *bolt.Tx) error {
i := 0
b := tx.Bucket(it.bucket)
cur := b.Cursor()
if last == nil {
k, _ := cur.Seek(it.checkId)
if bytes.HasPrefix(k, it.checkId) {
var out []byte
out = make([]byte, len(k))
copy(out, k)
it.buffer = append(it.buffer, out)
i++
} else {
it.buffer = append(it.buffer, nil)
return errNotExist
}
} else {
k, _ := cur.Seek(last)
if !bytes.Equal(k, last) {
return fmt.Errorf("Couldn't pick up after", k)
}
}
for i < bufferSize {
k, v := cur.Next()
if k == nil || !bytes.HasPrefix(k, it.checkId) {
it.buffer = append(it.buffer, nil)
break
}
if !it.isLiveValue(v) {
continue
}
var out []byte
out = make([]byte, len(k))
copy(out, k)
it.buffer = append(it.buffer, out)
i++
}
return nil
})
if err != nil {
if err != errNotExist {
glog.Error("Error nexting in database: ", err)
}
it.done = true
return false
}
} else {
it.offset++
}
if it.Result() == nil {
it.done = true
return false
}
return true
}
func (it *Iterator) ResultTree() *graph.ResultTree {
return graph.NewResultTree(it.Result())
}
func (it *Iterator) Result() graph.Value {
if it.done {
return nil
}
if it.result != nil {
return it.result
}
if it.offset >= len(it.buffer) {
return nil
}
if it.buffer[it.offset] == nil {
return nil
}
return &Token{bucket: it.bucket, key: it.buffer[it.offset]}
}
func (it *Iterator) NextPath() bool {
return false
}
// No subiterators.
func (it *Iterator) SubIterators() []graph.Iterator {
return nil
}
func PositionOf(tok *Token, d quad.Direction, qs *QuadStore) int {
if bytes.Equal(tok.bucket, spoBucket) {
switch d {
case quad.Subject:
return 0
case quad.Predicate:
return hashSize
case quad.Object:
return 2 * hashSize
case quad.Label:
return 3 * hashSize
}
}
if bytes.Equal(tok.bucket, posBucket) {
switch d {
case quad.Subject:
return 2 * hashSize
case quad.Predicate:
return 0
case quad.Object:
return hashSize
case quad.Label:
return 3 * hashSize
}
}
if bytes.Equal(tok.bucket, ospBucket) {
switch d {
case quad.Subject:
return hashSize
case quad.Predicate:
return 2 * hashSize
case quad.Object:
return 0
case quad.Label:
return 3 * hashSize
}
}
if bytes.Equal(tok.bucket, cpsBucket) {
switch d {
case quad.Subject:
return 2 * hashSize
case quad.Predicate:
return hashSize
case quad.Object:
return 3 * hashSize
case quad.Label:
return 0
}
}
panic("unreachable")
}
func (it *Iterator) Contains(v graph.Value) bool {
val := v.(*Token)
if bytes.Equal(val.bucket, nodeBucket) {
return false
}
offset := PositionOf(val, it.dir, it.qs)
if bytes.HasPrefix(val.key[offset:], it.checkId) {
// You may ask, why don't we check to see if it's a valid (not deleted) triple
// again?
//
// We've already done that -- in order to get the graph.Value token in the
// first place, we had to have done the check already; it came from a Next().
//
// However, if it ever starts coming from somewhere else, it'll be more
// efficient to change the interface of the graph.Value for LevelDB to a
// struct with a flag for isValid, to save another random read.
return true
}
return false
}
func (it *Iterator) Size() (int64, bool) {
return it.size, true
}
func (it *Iterator) DebugString(indent int) string {
return fmt.Sprintf("%s(%s %d tags: %v dir: %s size:%d %s)",
strings.Repeat(" ", indent),
it.Type(),
it.UID(),
it.tags.Tags(),
it.dir,
it.size,
it.qs.NameOf(&Token{it.bucket, it.checkId}),
)
}
func (it *Iterator) Type() graph.Type { return boltType }
func (it *Iterator) Sorted() bool { return false }
func (it *Iterator) Optimize() (graph.Iterator, bool) {
return it, false
}
func (it *Iterator) Stats() graph.IteratorStats {
s, _ := it.Size()
return graph.IteratorStats{
ContainsCost: 1,
NextCost: 4,
Size: s,
}
}

507
graph/bolt/quadstore.go Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,55 @@
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
)
func (ts *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) {
switch it.Type() {
case graph.LinksTo:
return ts.optimizeLinksTo(it.(*iterator.LinksTo))
}
return it, false
}
func (ts *QuadStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) {
subs := it.SubIterators()
if len(subs) != 1 {
return it, false
}
primary := subs[0]
if primary.Type() == graph.Fixed {
size, _ := primary.Size()
if size == 1 {
if !graph.Next(primary) {
panic("unexpected size during optimize")
}
val := primary.Result()
newIt := ts.TripleIterator(it.Direction(), val)
nt := newIt.Tagger()
nt.CopyFrom(it)
for _, tag := range primary.Tagger().Tags() {
nt.AddFixed(tag, val)
}
it.Close()
return newIt, true
}
}
return it, false
}

View file

@ -121,6 +121,18 @@ func (d Options) StringKey(key string) (string, bool) {
return "", false
}
func (d Options) BoolKey(key string) (bool, bool) {
if val, ok := d[key]; ok {
switch vv := val.(type) {
case bool:
return vv, true
default:
glog.Fatalln("Invalid", key, "parameter type from config.")
}
}
return false, false
}
var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load")
type BulkLoader interface {