Merge pull request #69 from pbnjay/triplestore_registry

Triplestore registry
This commit is contained in:
Barak Michener 2014-07-18 15:01:04 -04:00
commit abdd649c82
21 changed files with 183 additions and 116 deletions

View file

@ -11,6 +11,7 @@
Alexander Peters <info@alexanderpeters.de>
Google Inc.
Jay Graves <jaywgraves@gmail.com>
Jeremy Jay <jeremy@pbnjay.com>
Pius Uzamere <pius+github@alum.mit.edu>
Robert Daniel Kortschak <dan.kortschak@adelaide.edu.au>
Timothy Armstrong <armstrong.timothy@gmail.com>

View file

@ -14,6 +14,7 @@
Alexander Peters <info@alexanderpeters.de>
Barak Michener <barakmich@google.com> <barak@cayley.io> <me@barakmich.com>
Jay Graves <jaywgraves@gmail.com>
Jeremy Jay <jeremy@pbnjay.com>
Pius Uzamere <pius+github@alum.mit.edu>
Robert Daniel Kortschak <dan.kortschak@adelaide.edu.au>
Timothy Armstrong <armstrong.timothy@gmail.com>

View file

@ -21,15 +21,16 @@ import (
"github.com/google/cayley/config"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/memstore"
"github.com/google/cayley/http"
"github.com/google/cayley/nquads"
_ "github.com/google/cayley/graph/memstore"
)
func init() {
glog.SetToStderr(true)
cfg := config.ParseConfigFromFile("cayley_appengine.cfg")
ts := memstore.NewTripleStore()
ts, _ := graph.NewTripleStore("memstore", "", nil)
glog.Errorln(cfg)
LoadTriplesFromFileInto(ts, cfg.DatabasePath, cfg.LoadSize)
http.SetupRoutes(ts, cfg)

View file

@ -26,6 +26,11 @@ import (
"github.com/google/cayley/db"
"github.com/google/cayley/graph"
"github.com/google/cayley/http"
// Load all supported backends.
_ "github.com/google/cayley/graph/leveldb"
_ "github.com/google/cayley/graph/memstore"
_ "github.com/google/cayley/graph/mongo"
)
var tripleFile = flag.String("triples", "", "Triple File to load before going to REPL.")
@ -72,7 +77,7 @@ func main() {
db.Init(cfg, *tripleFile)
case "load":
ts = db.Open(cfg)
db.Load(ts, cfg, *tripleFile, false)
db.Load(ts, cfg, *tripleFile)
ts.Close()
case "repl":
ts = db.Open(cfg)

View file

@ -34,7 +34,7 @@ type Config struct {
}
var databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.")
var databaseBackend = flag.String("db", "mem", "Database Backend.")
var databaseBackend = flag.String("db", "memstore", "Database Backend.")
var host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).")
var loadSize = flag.Int("load_size", 10000, "Size of triplesets to load")
var port = flag.String("port", "64210", "Port to listen on.")

View file

@ -16,25 +16,19 @@ package db
import (
"github.com/google/cayley/config"
"github.com/google/cayley/graph/leveldb"
"github.com/google/cayley/graph/mongo"
"github.com/google/cayley/graph"
)
func Init(cfg *config.Config, triplePath string) bool {
created := false
dbpath := cfg.DatabasePath
switch cfg.DatabaseType {
case "mongo", "mongodb":
created = mongo.CreateNewMongoGraph(dbpath, cfg.DatabaseOptions)
case "leveldb":
created = leveldb.CreateNewLevelDB(dbpath)
case "mem":
return true
err := graph.InitTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
if err != nil {
return false
}
if created && triplePath != "" {
if triplePath != "" {
ts := Open(cfg)
Load(ts, cfg, triplePath, true)
Load(ts, cfg, triplePath)
ts.Close()
}
return created
return true
}

View file

@ -21,30 +21,25 @@ import (
"github.com/google/cayley/config"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/mongo"
"github.com/google/cayley/nquads"
)
func Load(ts graph.TripleStore, cfg *config.Config, triplePath string, firstTime bool) {
switch cfg.DatabaseType {
case "mongo", "mongodb":
if firstTime {
loadMongo(ts.(*mongo.TripleStore), triplePath)
} else {
LoadTriplesFromFileInto(ts, triplePath, cfg.LoadSize)
func Load(ts graph.TripleStore, cfg *config.Config, triplePath string) {
tChan := make(chan *graph.Triple)
go ReadTriplesFromFile(tChan, triplePath)
bulker, canBulk := ts.(graph.BulkLoader)
if canBulk {
err := bulker.BulkLoad(tChan)
if err == nil {
return
}
if err != graph.ErrCannotBulkLoad {
glog.Errorln("Error attempting to bulk load: ", err)
}
case "leveldb":
LoadTriplesFromFileInto(ts, triplePath, cfg.LoadSize)
case "mem":
LoadTriplesFromFileInto(ts, triplePath, cfg.LoadSize)
}
}
func loadMongo(ts *mongo.TripleStore, path string) {
tChan := make(chan *graph.Triple)
go ReadTriplesFromFile(tChan, path)
ts.BulkLoad(tChan)
LoadTriplesInto(tChan, ts, cfg.LoadSize)
}
func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) {
@ -62,9 +57,7 @@ func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) {
nquads.ReadNQuadsFromReader(c, f)
}
func LoadTriplesFromFileInto(ts graph.TripleStore, filename string, loadSize int) {
tChan := make(chan *graph.Triple)
go ReadTriplesFromFile(tChan, filename)
func LoadTriplesInto(tChan chan *graph.Triple, ts graph.TripleStore, loadSize int) {
tripleblock := make([]*graph.Triple, loadSize)
i := 0
for t := range tChan {

View file

@ -19,22 +19,19 @@ import (
"github.com/google/cayley/config"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/leveldb"
"github.com/google/cayley/graph/memstore"
"github.com/google/cayley/graph/mongo"
)
func Open(cfg *config.Config) graph.TripleStore {
glog.Infof("Opening database \"%s\" at %s", cfg.DatabaseType, cfg.DatabasePath)
switch cfg.DatabaseType {
case "mongo", "mongodb":
return mongo.NewTripleStore(cfg.DatabasePath, cfg.DatabaseOptions)
case "leveldb":
return leveldb.NewTripleStore(cfg.DatabasePath, cfg.DatabaseOptions)
case "mem":
ts := memstore.NewTripleStore()
Load(ts, cfg, cfg.DatabasePath, true)
return ts
ts, err := graph.NewTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
if err != nil {
glog.Fatalln(err.Error())
}
panic("Unsupported database backend " + cfg.DatabaseType)
// Memstore is not persistent, so it MUST be loaded.
if cfg.DatabaseType == "memstore" {
Load(ts, cfg, cfg.DatabasePath)
}
return ts
}

View file

@ -167,12 +167,12 @@ var (
}
)
// Register adds a new iterator type to the set of acceptable types, returning
// RegisterIterator adds a new iterator type to the set of acceptable types, returning
// the registered Type.
// Calls to Register are idempotent and must be made prior to use of the iterator.
// The conventional approach for use is to include a call to Register in a package
// init() function, saving the Type to a private package var.
func Register(name string) Type {
func RegisterIterator(name string) Type {
lock.Lock()
defer lock.Unlock()
for i, t := range types {

View file

@ -198,7 +198,7 @@ func (it *Iterator) DebugString(indent int) string {
var levelDBType graph.Type
func init() {
levelDBType = graph.Register("leveldb")
levelDBType = graph.RegisterIterator("leveldb")
}
func Type() graph.Type { return levelDBType }

View file

@ -102,12 +102,13 @@ func TestCreateDatabase(t *testing.T) {
}
t.Log(tmpDir)
if created := CreateNewLevelDB(tmpDir); !created {
err = createNewLevelDB(tmpDir, nil)
if err != nil {
t.Fatal("Failed to create LevelDB database.")
}
ts := NewTripleStore(tmpDir, nil)
if ts == nil {
ts, err := newTripleStore(tmpDir, nil)
if ts == nil || err != nil {
t.Error("Failed to create leveldb TripleStore.")
}
if s := ts.Size(); s != 0 {
@ -115,21 +116,10 @@ func TestCreateDatabase(t *testing.T) {
}
ts.Close()
if created := CreateNewLevelDB("/dev/null/some terrible path"); created {
err = createNewLevelDB("/dev/null/some terrible path", nil)
if err == nil {
t.Errorf("Created LevelDB database for bad path.")
}
// TODO(kortschak) Invalidate this test by using error returns rather than panics.
var panicked bool
func() {
defer func() {
r := recover()
panicked = r != nil
}()
NewTripleStore("/dev/null/some terrible path", nil)
}()
if !panicked {
t.Error("NewTripleStore failed to panic with bad path.")
}
os.RemoveAll(tmpDir)
}
@ -142,14 +132,13 @@ func TestLoadDatabase(t *testing.T) {
defer os.RemoveAll(tmpDir)
t.Log(tmpDir)
if created := CreateNewLevelDB(tmpDir); !created {
err = createNewLevelDB(tmpDir, nil)
if err != nil {
t.Fatal("Failed to create LevelDB database.")
}
var ts *TripleStore
ts = NewTripleStore(tmpDir, nil)
if ts == nil {
ts, err := newTripleStore(tmpDir, nil)
if ts == nil || err != nil {
t.Error("Failed to create leveldb TripleStore.")
}
@ -164,19 +153,25 @@ func TestLoadDatabase(t *testing.T) {
}
ts.Close()
if created := CreateNewLevelDB(tmpDir); !created {
err = createNewLevelDB(tmpDir, nil)
if err != nil {
t.Fatal("Failed to create LevelDB database.")
}
ts = NewTripleStore(tmpDir, nil)
if ts == nil {
ts, err = newTripleStore(tmpDir, nil)
if ts == nil || err != nil {
t.Error("Failed to create leveldb TripleStore.")
}
ts2, didConvert := ts.(*TripleStore)
if !didConvert {
t.Errorf("Could not convert from generic to LevelDB TripleStore")
}
ts.AddTripleSet(makeTripleSet())
if s := ts.Size(); s != 11 {
t.Errorf("Unexpected triplestore size, got:%d expect:11", s)
}
if s := ts.SizeOf(ts.ValueOf("B")); s != 5 {
if s := ts2.SizeOf(ts.ValueOf("B")); s != 5 {
t.Errorf("Unexpected triplestore size, got:%d expect:5", s)
}
@ -184,7 +179,7 @@ func TestLoadDatabase(t *testing.T) {
if s := ts.Size(); s != 10 {
t.Errorf("Unexpected triplestore size after RemoveTriple, got:%d expect:10", s)
}
if s := ts.SizeOf(ts.ValueOf("B")); s != 4 {
if s := ts2.SizeOf(ts.ValueOf("B")); s != 4 {
t.Errorf("Unexpected triplestore size, got:%d expect:4", s)
}
@ -199,12 +194,15 @@ func TestIterator(t *testing.T) {
defer os.RemoveAll(tmpDir)
t.Log(tmpDir)
if created := CreateNewLevelDB(tmpDir); !created {
err = createNewLevelDB(tmpDir, nil)
if err != nil {
t.Fatal("Failed to create LevelDB database.")
}
var ts *TripleStore
ts = NewTripleStore(tmpDir, nil)
ts, err := newTripleStore(tmpDir, nil)
if ts == nil || err != nil {
t.Error("Failed to create leveldb TripleStore.")
}
ts.AddTripleSet(makeTripleSet())
var it graph.Iterator
@ -289,12 +287,15 @@ func TestSetIterator(t *testing.T) {
tmpDir, _ := ioutil.TempDir(os.TempDir(), "cayley_test")
t.Log(tmpDir)
defer os.RemoveAll(tmpDir)
ok := CreateNewLevelDB(tmpDir)
if !ok {
err := createNewLevelDB(tmpDir, nil)
if err != nil {
t.Fatalf("Failed to create working directory")
}
ts := NewTripleStore(tmpDir, nil)
ts, err := newTripleStore(tmpDir, nil)
if ts == nil || err != nil {
t.Error("Failed to create leveldb TripleStore.")
}
defer ts.Close()
ts.AddTripleSet(makeTripleSet())
@ -401,11 +402,14 @@ func TestOptimize(t *testing.T) {
tmpDir, _ := ioutil.TempDir(os.TempDir(), "cayley_test")
t.Log(tmpDir)
defer os.RemoveAll(tmpDir)
ok := CreateNewLevelDB(tmpDir)
if !ok {
err := createNewLevelDB(tmpDir, nil)
if err != nil {
t.Fatalf("Failed to create working directory")
}
ts := NewTripleStore(tmpDir, nil)
ts, err := newTripleStore(tmpDir, nil)
if ts == nil || err != nil {
t.Error("Failed to create leveldb TripleStore.")
}
ts.AddTripleSet(makeTripleSet())
// With an linksto-fixed pair

View file

@ -48,12 +48,12 @@ type TripleStore struct {
readopts *opt.ReadOptions
}
func CreateNewLevelDB(path string) bool {
func createNewLevelDB(path string, _ graph.Options) error {
opts := &opt.Options{}
db, err := leveldb.OpenFile(path, opts)
if err != nil {
glog.Errorln("Error: couldn't create database: ", err)
return false
return err
}
defer db.Close()
ts := &TripleStore{}
@ -62,10 +62,10 @@ func CreateNewLevelDB(path string) bool {
Sync: true,
}
ts.Close()
return true
return nil
}
func NewTripleStore(path string, options graph.Options) *TripleStore {
func newTripleStore(path string, options graph.Options) (graph.TripleStore, error) {
var ts TripleStore
ts.path = path
cache_size := DefaultCacheSize
@ -94,7 +94,7 @@ func NewTripleStore(path string, options graph.Options) *TripleStore {
ts.db = db
glog.Infoln(ts.GetStats())
ts.getSize()
return &ts
return &ts, nil
}
func (ts *TripleStore) GetStats() string {
@ -443,3 +443,7 @@ func compareBytes(a, b graph.Value) bool {
func (ts *TripleStore) FixedIterator() graph.FixedIterator {
return iterator.NewFixedIteratorWithCompare(compareBytes)
}
func init() {
graph.RegisterTripleStore("leveldb", newTripleStore, createNewLevelDB)
}

View file

@ -104,7 +104,7 @@ func (it *Iterator) DebugString(indent int) string {
var memType graph.Type
func init() {
memType = graph.Register("llrb")
memType = graph.RegisterIterator("llrb")
}
func Type() graph.Type { return memType }

View file

@ -79,7 +79,7 @@ type TripleStore struct {
// vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree
}
func NewTripleStore() *TripleStore {
func newTripleStore() *TripleStore {
var ts TripleStore
ts.idMap = make(map[string]int64)
ts.revIdMap = make(map[int64]string)
@ -268,3 +268,9 @@ func (ts *TripleStore) NodesAllIterator() graph.Iterator {
return NewMemstoreAllIterator(ts)
}
func (ts *TripleStore) Close() {}
func init() {
graph.RegisterTripleStore("memstore", func(string, graph.Options) (graph.TripleStore, error) {
return newTripleStore(), nil
}, nil)
}

View file

@ -52,7 +52,7 @@ var simpleGraph = []*graph.Triple{
func makeTestStore(data []*graph.Triple) (*TripleStore, []pair) {
seen := make(map[string]struct{})
ts := NewTripleStore()
ts := newTripleStore()
var (
val int64
ind []pair

View file

@ -160,7 +160,7 @@ func (it *Iterator) Size() (int64, bool) {
var mongoType graph.Type
func init() {
mongoType = graph.Register("mongo")
mongoType = graph.RegisterIterator("mongo")
}
func Type() graph.Type { return mongoType }

View file

@ -37,11 +37,10 @@ type TripleStore struct {
idCache *IDLru
}
func CreateNewMongoGraph(addr string, options graph.Options) bool {
func createNewMongoGraph(addr string, options graph.Options) error {
conn, err := mgo.Dial(addr)
if err != nil {
glog.Fatal("Error connecting: ", err)
return false
return err
}
conn.SetSafe(&mgo.Safe{})
dbName := DefaultDBName
@ -63,14 +62,14 @@ func CreateNewMongoGraph(addr string, options graph.Options) bool {
db.C("triples").EnsureIndex(indexOpts)
indexOpts.Key = []string{"Provenance"}
db.C("triples").EnsureIndex(indexOpts)
return true
return nil
}
func NewTripleStore(addr string, options graph.Options) *TripleStore {
func newTripleStore(addr string, options graph.Options) (graph.TripleStore, error) {
var ts TripleStore
conn, err := mgo.Dial(addr)
if err != nil {
glog.Fatal("Error connecting: ", err)
return nil, err
}
conn.SetSafe(&mgo.Safe{})
dbName := DefaultDBName
@ -81,7 +80,7 @@ func NewTripleStore(addr string, options graph.Options) *TripleStore {
ts.session = conn
ts.hasher = sha1.New()
ts.idCache = NewIDLru(1 << 16)
return &ts
return &ts, nil
}
func (ts *TripleStore) getIdForTriple(t *graph.Triple) string {
@ -291,7 +290,11 @@ func (ts *TripleStore) TripleDirection(in graph.Value, d graph.Direction) graph.
return val
}
func (ts *TripleStore) BulkLoad(t_chan chan *graph.Triple) {
func (ts *TripleStore) BulkLoad(t_chan chan *graph.Triple) bool {
if ts.Size() != 0 {
return false
}
ts.session.SetSafe(nil)
for triple := range t_chan {
ts.writeTriple(triple)
@ -334,4 +337,9 @@ func (ts *TripleStore) BulkLoad(t_chan chan *graph.Triple) {
}) }`}, {"args", bson.D{}}}, nil)
ts.session.SetSafe(&mgo.Safe{})
return true
}
func init() {
graph.RegisterTripleStore("mongo", newTripleStore, createNewMongoGraph)
}

View file

@ -18,7 +18,7 @@ import (
"testing"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/memstore"
_ "github.com/google/cayley/graph/memstore"
)
func TestBadParse(t *testing.T) {
@ -52,7 +52,7 @@ var testQueries = []struct {
}
func TestMemstoreBackedSexp(t *testing.T) {
ts := memstore.NewTripleStore()
ts, _ := graph.NewTripleStore("memstore", "", nil)
it := BuildIteratorTreeForQuery(ts, "()")
if it.Type() != graph.Null {
t.Errorf(`Incorrect type for empty query, got:%q expect: "null"`, it.Type())
@ -76,7 +76,7 @@ func TestMemstoreBackedSexp(t *testing.T) {
}
func TestTreeConstraintParse(t *testing.T) {
ts := memstore.NewTripleStore()
ts, _ := graph.NewTripleStore("memstore", "", nil)
ts.AddTriple(&graph.Triple{"i", "like", "food", ""})
ts.AddTriple(&graph.Triple{"food", "is", "good", ""})
query := "(\"i\"\n" +
@ -96,7 +96,7 @@ func TestTreeConstraintParse(t *testing.T) {
}
func TestTreeConstraintTagParse(t *testing.T) {
ts := memstore.NewTripleStore()
ts, _ := graph.NewTripleStore("memstore", "", nil)
ts.AddTriple(&graph.Triple{"i", "like", "food", ""})
ts.AddTriple(&graph.Triple{"food", "is", "good", ""})
query := "(\"i\"\n" +
@ -116,7 +116,7 @@ func TestTreeConstraintTagParse(t *testing.T) {
}
func TestMultipleConstraintParse(t *testing.T) {
ts := memstore.NewTripleStore()
ts, _ := graph.NewTripleStore("memstore", "", nil)
for _, tv := range []*graph.Triple{
{"i", "like", "food", ""},
{"i", "like", "beer", ""},

View file

@ -22,6 +22,7 @@ package graph
// triple backing store we prefer.
import (
"errors"
"github.com/barakmich/glog"
)
@ -117,3 +118,55 @@ func (d Options) StringKey(key string) (string, bool) {
}
return "", false
}
var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load")
type BulkLoader interface {
// BulkLoad loads Triples from a channel in bulk to the TripleStore. It
// returns ErrCannotBulkLoad if bulk loading is not possible (i.e. if you
// cannot load in bulk to a non-empty database, and the db is non-empty)
BulkLoad(chan *Triple) error
}
type NewStoreFunc func(string, Options) (TripleStore, error)
type InitStoreFunc func(string, Options) error
var storeRegistry = make(map[string]NewStoreFunc)
var storeInitRegistry = make(map[string]InitStoreFunc)
func RegisterTripleStore(name string, newFunc NewStoreFunc, initFunc InitStoreFunc) {
if _, found := storeRegistry[name]; found {
panic("already registered TripleStore " + name)
}
storeRegistry[name] = newFunc
if initFunc != nil {
storeInitRegistry[name] = initFunc
}
}
func NewTripleStore(name, dbpath string, opts Options) (TripleStore, error) {
newFunc, hasNew := storeRegistry[name]
if !hasNew {
return nil, errors.New("triplestore: name '" + name + "' is not registered")
}
return newFunc(dbpath, opts)
}
func InitTripleStore(name, dbpath string, opts Options) error {
initFunc, hasInit := storeInitRegistry[name]
if hasInit {
return initFunc(dbpath, opts)
}
if _, isRegistered := storeRegistry[name]; isRegistered {
return nil
}
return errors.New("triplestore: name '" + name + "' is not registered")
}
func TripleStores() []string {
t := make([]string, 0, len(storeRegistry))
for n := range storeRegistry {
t = append(t, n)
}
return t
}

View file

@ -20,7 +20,7 @@ import (
"testing"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/memstore"
_ "github.com/google/cayley/graph/memstore"
)
// This is a simple test graph.
@ -51,7 +51,7 @@ var simpleGraph = []*graph.Triple{
}
func makeTestSession(data []*graph.Triple) *Session {
ts := memstore.NewTripleStore()
ts, _ := graph.NewTripleStore("memstore", "", nil)
for _, t := range data {
ts.AddTriple(t)
}

View file

@ -20,7 +20,7 @@ import (
"testing"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/memstore"
_ "github.com/google/cayley/graph/memstore"
)
// This is a simple test graph.
@ -51,7 +51,7 @@ var simpleGraph = []*graph.Triple{
}
func makeTestSession(data []*graph.Triple) *Session {
ts := memstore.NewTripleStore()
ts, _ := graph.NewTripleStore("memstore", "", nil)
for _, t := range data {
ts.AddTriple(t)
}