diff --git a/cayley.go b/cayley.go index a848931..7ff1fd8 100644 --- a/cayley.go +++ b/cayley.go @@ -26,6 +26,11 @@ import ( "github.com/google/cayley/db" "github.com/google/cayley/graph" "github.com/google/cayley/http" + + // load all supported backends + _ "github.com/google/cayley/graph/leveldb" + _ "github.com/google/cayley/graph/memstore" + _ "github.com/google/cayley/graph/mongo" ) var tripleFile = flag.String("triples", "", "Triple File to load before going to REPL.") @@ -72,7 +77,7 @@ func main() { db.Init(cfg, *tripleFile) case "load": ts = db.Open(cfg) - db.Load(ts, cfg, *tripleFile, false) + db.Load(ts, cfg, *tripleFile) ts.Close() case "repl": ts = db.Open(cfg) diff --git a/config/config.go b/config/config.go index 1c577fa..d521043 100644 --- a/config/config.go +++ b/config/config.go @@ -34,7 +34,7 @@ type Config struct { } var databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.") -var databaseBackend = flag.String("db", "mem", "Database Backend.") +var databaseBackend = flag.String("db", "memstore", "Database Backend.") var host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).") var loadSize = flag.Int("load_size", 10000, "Size of triplesets to load") var port = flag.String("port", "64210", "Port to listen on.") diff --git a/db/init.go b/db/init.go index 8efb863..a8b3897 100644 --- a/db/init.go +++ b/db/init.go @@ -16,25 +16,19 @@ package db import ( "github.com/google/cayley/config" - "github.com/google/cayley/graph/leveldb" - "github.com/google/cayley/graph/mongo" + "github.com/google/cayley/graph" ) func Init(cfg *config.Config, triplePath string) bool { - created := false - dbpath := cfg.DatabasePath - switch cfg.DatabaseType { - case "mongo", "mongodb": - created = mongo.CreateNewMongoGraph(dbpath, cfg.DatabaseOptions) - case "leveldb": - created = leveldb.CreateNewLevelDB(dbpath) - case "mem": - return true + err := graph.InitTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions) + if err != nil { + return false } - if created && triplePath != "" { + + if triplePath != "" { ts := Open(cfg) - Load(ts, cfg, triplePath, true) + Load(ts, cfg, triplePath) ts.Close() } - return created + return true } diff --git a/db/load.go b/db/load.go index f09962e..d4d2bd4 100644 --- a/db/load.go +++ b/db/load.go @@ -21,30 +21,26 @@ import ( "github.com/google/cayley/config" "github.com/google/cayley/graph" - "github.com/google/cayley/graph/mongo" "github.com/google/cayley/nquads" ) -func Load(ts graph.TripleStore, cfg *config.Config, triplePath string, firstTime bool) { - switch cfg.DatabaseType { - case "mongo", "mongodb": - if firstTime { - loadMongo(ts.(*mongo.TripleStore), triplePath) - } else { - LoadTriplesFromFileInto(ts, triplePath, cfg.LoadSize) - } - case "leveldb": - LoadTriplesFromFileInto(ts, triplePath, cfg.LoadSize) - case "mem": - LoadTriplesFromFileInto(ts, triplePath, cfg.LoadSize) - } - +type bulkLoadable interface { + // BulkLoad loads Triples from a channel in bulk to the TripleStore. It + // returns false if bulk loading is not possible (i.e. if you cannot load + // in bulk to a non-empty database, and the current database is non-empty) + BulkLoad(chan *graph.Triple) bool } -func loadMongo(ts *mongo.TripleStore, path string) { +func Load(ts graph.TripleStore, cfg *config.Config, triplePath string) { tChan := make(chan *graph.Triple) - go ReadTriplesFromFile(tChan, path) - ts.BulkLoad(tChan) + go ReadTriplesFromFile(tChan, triplePath) + + bulker, canBulk := ts.(bulkLoadable) + if canBulk && bulker.BulkLoad(tChan) { + return + } + + LoadTriplesInto(tChan, ts, cfg.LoadSize) } func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) { @@ -62,9 +58,7 @@ func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) { nquads.ReadNQuadsFromReader(c, f) } -func LoadTriplesFromFileInto(ts graph.TripleStore, filename string, loadSize int) { - tChan := make(chan *graph.Triple) - go ReadTriplesFromFile(tChan, filename) +func LoadTriplesInto(tChan chan *graph.Triple, ts graph.TripleStore, loadSize int) { tripleblock := make([]*graph.Triple, loadSize) i := 0 for t := range tChan { diff --git a/db/open.go b/db/open.go index 6997ba9..b0973d7 100644 --- a/db/open.go +++ b/db/open.go @@ -19,22 +19,20 @@ import ( "github.com/google/cayley/config" "github.com/google/cayley/graph" - "github.com/google/cayley/graph/leveldb" - "github.com/google/cayley/graph/memstore" - "github.com/google/cayley/graph/mongo" ) func Open(cfg *config.Config) graph.TripleStore { glog.Infof("Opening database \"%s\" at %s", cfg.DatabaseType, cfg.DatabasePath) - switch cfg.DatabaseType { - case "mongo", "mongodb": - return mongo.NewTripleStore(cfg.DatabasePath, cfg.DatabaseOptions) - case "leveldb": - return leveldb.NewTripleStore(cfg.DatabasePath, cfg.DatabaseOptions) - case "mem": - ts := memstore.NewTripleStore() - Load(ts, cfg, cfg.DatabasePath, true) - return ts + ts, err := graph.NewTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions) + if err != nil { + glog.Fatalln(err.Error()) + return nil } - panic("Unsupported database backend " + cfg.DatabaseType) + + // memstore is not persistent, so MUST be loaded + if cfg.DatabaseType == "memstore" { + Load(ts, cfg, cfg.DatabasePath) + } + + return ts } diff --git a/graph/leveldb/triplestore.go b/graph/leveldb/triplestore.go index 7880165..798895a 100644 --- a/graph/leveldb/triplestore.go +++ b/graph/leveldb/triplestore.go @@ -48,12 +48,12 @@ type TripleStore struct { readopts *opt.ReadOptions } -func CreateNewLevelDB(path string) bool { +func createNewLevelDB(path string, _ graph.Options) error { opts := &opt.Options{} db, err := leveldb.OpenFile(path, opts) if err != nil { glog.Errorln("Error: couldn't create database: ", err) - return false + return err } defer db.Close() ts := &TripleStore{} @@ -62,10 +62,10 @@ func CreateNewLevelDB(path string) bool { Sync: true, } ts.Close() - return true + return nil } -func NewTripleStore(path string, options graph.Options) *TripleStore { +func newTripleStore(path string, options graph.Options) (graph.TripleStore, error) { var ts TripleStore ts.path = path cache_size := DefaultCacheSize @@ -94,7 +94,7 @@ func NewTripleStore(path string, options graph.Options) *TripleStore { ts.db = db glog.Infoln(ts.GetStats()) ts.getSize() - return &ts + return &ts, nil } func (ts *TripleStore) GetStats() string { @@ -443,3 +443,9 @@ func compareBytes(a, b graph.Value) bool { func (ts *TripleStore) FixedIterator() graph.FixedIterator { return iterator.NewFixedIteratorWithCompare(compareBytes) } + +func init() { + graph.RegisterTripleStore("leveldb", + graph.TripleStoreGetter(newTripleStore), + graph.TripleStoreInit(createNewLevelDB)) +} diff --git a/graph/memstore/triplestore.go b/graph/memstore/triplestore.go index a53422c..29fe94c 100644 --- a/graph/memstore/triplestore.go +++ b/graph/memstore/triplestore.go @@ -79,7 +79,7 @@ type TripleStore struct { // vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree } -func NewTripleStore() *TripleStore { +func newTripleStore() *TripleStore { var ts TripleStore ts.idMap = make(map[string]int64) ts.revIdMap = make(map[int64]string) @@ -268,3 +268,9 @@ func (ts *TripleStore) NodesAllIterator() graph.Iterator { return NewMemstoreAllIterator(ts) } func (ts *TripleStore) Close() {} + +func init() { + graph.RegisterTripleStore("memstore", func(string, graph.Options) (graph.TripleStore, error) { + return newTripleStore(), nil + }) +} diff --git a/graph/mongo/triplestore.go b/graph/mongo/triplestore.go index 001c539..7ff220f 100644 --- a/graph/mongo/triplestore.go +++ b/graph/mongo/triplestore.go @@ -37,11 +37,10 @@ type TripleStore struct { idCache *IDLru } -func CreateNewMongoGraph(addr string, options graph.Options) bool { +func createNewMongoGraph(addr string, options graph.Options) error { conn, err := mgo.Dial(addr) if err != nil { - glog.Fatal("Error connecting: ", err) - return false + return err } conn.SetSafe(&mgo.Safe{}) dbName := DefaultDBName @@ -63,14 +62,14 @@ func CreateNewMongoGraph(addr string, options graph.Options) bool { db.C("triples").EnsureIndex(indexOpts) indexOpts.Key = []string{"Provenance"} db.C("triples").EnsureIndex(indexOpts) - return true + return nil } -func NewTripleStore(addr string, options graph.Options) *TripleStore { +func newTripleStore(addr string, options graph.Options) (graph.TripleStore, error) { var ts TripleStore conn, err := mgo.Dial(addr) if err != nil { - glog.Fatal("Error connecting: ", err) + return nil, err } conn.SetSafe(&mgo.Safe{}) dbName := DefaultDBName @@ -81,7 +80,7 @@ func NewTripleStore(addr string, options graph.Options) *TripleStore { ts.session = conn ts.hasher = sha1.New() ts.idCache = NewIDLru(1 << 16) - return &ts + return &ts, nil } func (ts *TripleStore) getIdForTriple(t *graph.Triple) string { @@ -291,7 +290,11 @@ func (ts *TripleStore) TripleDirection(in graph.Value, d graph.Direction) graph. return val } -func (ts *TripleStore) BulkLoad(t_chan chan *graph.Triple) { +func (ts *TripleStore) BulkLoad(t_chan chan *graph.Triple) bool { + if ts.Size() != 0 { + return false + } + ts.session.SetSafe(nil) for triple := range t_chan { ts.writeTriple(triple) @@ -334,4 +337,11 @@ func (ts *TripleStore) BulkLoad(t_chan chan *graph.Triple) { }) }`}, {"args", bson.D{}}}, nil) ts.session.SetSafe(&mgo.Safe{}) + return true +} + +func init() { + graph.RegisterTripleStore("mongo", + graph.TripleStoreGetter(newTripleStore), + graph.TripleStoreInit(createNewMongoGraph)) } diff --git a/graph/triplestore.go b/graph/triplestore.go index 92ed9e9..f8ddef9 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -22,6 +22,7 @@ package graph // triple backing store we prefer. import ( + "fmt" "github.com/barakmich/glog" ) @@ -117,3 +118,42 @@ func (d Options) StringKey(key string) (string, bool) { } return "", false } + +type TripleStoreGetter func(string, Options) (TripleStore, error) +type TripleStoreInit func(string, Options) error + +var storeRegistry = make(map[string]TripleStoreGetter) +var storeInitRegistry = make(map[string]TripleStoreInit) + +func RegisterTripleStore(name string, getter TripleStoreGetter, initer ...TripleStoreInit) { + if _, found := storeRegistry[name]; found { + panic("already registered TripleStore " + name) + } + storeRegistry[name] = getter + if len(initer) > 0 { + storeInitRegistry[name] = initer[0] + } +} + +func NewTripleStore(name, dbpath string, opts Options) (TripleStore, error) { + getter, hasGetter := storeRegistry[name] + if !hasGetter { + return nil, fmt.Errorf("unknown triplestore '%s'", name) + } + return getter(dbpath, opts) +} + +func InitTripleStore(name, dbpath string, opts Options) error { + initer, hasInit := storeInitRegistry[name] + if hasInit { + return initer(dbpath, opts) + } + return fmt.Errorf("unknown triplestore '%s'", name) +} + +func TripleStores() (t []string) { + for n := range storeRegistry { + t = append(t, n) + } + return +}