move to registry interface for backends
This commit is contained in:
parent
e780c1ceb9
commit
d808d9347c
9 changed files with 117 additions and 64 deletions
|
|
@ -26,6 +26,11 @@ import (
|
||||||
"github.com/google/cayley/db"
|
"github.com/google/cayley/db"
|
||||||
"github.com/google/cayley/graph"
|
"github.com/google/cayley/graph"
|
||||||
"github.com/google/cayley/http"
|
"github.com/google/cayley/http"
|
||||||
|
|
||||||
|
// load all supported backends
|
||||||
|
_ "github.com/google/cayley/graph/leveldb"
|
||||||
|
_ "github.com/google/cayley/graph/memstore"
|
||||||
|
_ "github.com/google/cayley/graph/mongo"
|
||||||
)
|
)
|
||||||
|
|
||||||
var tripleFile = flag.String("triples", "", "Triple File to load before going to REPL.")
|
var tripleFile = flag.String("triples", "", "Triple File to load before going to REPL.")
|
||||||
|
|
@ -72,7 +77,7 @@ func main() {
|
||||||
db.Init(cfg, *tripleFile)
|
db.Init(cfg, *tripleFile)
|
||||||
case "load":
|
case "load":
|
||||||
ts = db.Open(cfg)
|
ts = db.Open(cfg)
|
||||||
db.Load(ts, cfg, *tripleFile, false)
|
db.Load(ts, cfg, *tripleFile)
|
||||||
ts.Close()
|
ts.Close()
|
||||||
case "repl":
|
case "repl":
|
||||||
ts = db.Open(cfg)
|
ts = db.Open(cfg)
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ type Config struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
var databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.")
|
var databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.")
|
||||||
var databaseBackend = flag.String("db", "mem", "Database Backend.")
|
var databaseBackend = flag.String("db", "memstore", "Database Backend.")
|
||||||
var host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).")
|
var host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).")
|
||||||
var loadSize = flag.Int("load_size", 10000, "Size of triplesets to load")
|
var loadSize = flag.Int("load_size", 10000, "Size of triplesets to load")
|
||||||
var port = flag.String("port", "64210", "Port to listen on.")
|
var port = flag.String("port", "64210", "Port to listen on.")
|
||||||
|
|
|
||||||
22
db/init.go
22
db/init.go
|
|
@ -16,25 +16,19 @@ package db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/google/cayley/config"
|
"github.com/google/cayley/config"
|
||||||
"github.com/google/cayley/graph/leveldb"
|
"github.com/google/cayley/graph"
|
||||||
"github.com/google/cayley/graph/mongo"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func Init(cfg *config.Config, triplePath string) bool {
|
func Init(cfg *config.Config, triplePath string) bool {
|
||||||
created := false
|
err := graph.InitTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
|
||||||
dbpath := cfg.DatabasePath
|
if err != nil {
|
||||||
switch cfg.DatabaseType {
|
return false
|
||||||
case "mongo", "mongodb":
|
|
||||||
created = mongo.CreateNewMongoGraph(dbpath, cfg.DatabaseOptions)
|
|
||||||
case "leveldb":
|
|
||||||
created = leveldb.CreateNewLevelDB(dbpath)
|
|
||||||
case "mem":
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
if created && triplePath != "" {
|
|
||||||
|
if triplePath != "" {
|
||||||
ts := Open(cfg)
|
ts := Open(cfg)
|
||||||
Load(ts, cfg, triplePath, true)
|
Load(ts, cfg, triplePath)
|
||||||
ts.Close()
|
ts.Close()
|
||||||
}
|
}
|
||||||
return created
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
36
db/load.go
36
db/load.go
|
|
@ -21,30 +21,26 @@ import (
|
||||||
|
|
||||||
"github.com/google/cayley/config"
|
"github.com/google/cayley/config"
|
||||||
"github.com/google/cayley/graph"
|
"github.com/google/cayley/graph"
|
||||||
"github.com/google/cayley/graph/mongo"
|
|
||||||
"github.com/google/cayley/nquads"
|
"github.com/google/cayley/nquads"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Load(ts graph.TripleStore, cfg *config.Config, triplePath string, firstTime bool) {
|
type bulkLoadable interface {
|
||||||
switch cfg.DatabaseType {
|
// BulkLoad loads Triples from a channel in bulk to the TripleStore. It
|
||||||
case "mongo", "mongodb":
|
// returns false if bulk loading is not possible (i.e. if you cannot load
|
||||||
if firstTime {
|
// in bulk to a non-empty database, and the current database is non-empty)
|
||||||
loadMongo(ts.(*mongo.TripleStore), triplePath)
|
BulkLoad(chan *graph.Triple) bool
|
||||||
} else {
|
|
||||||
LoadTriplesFromFileInto(ts, triplePath, cfg.LoadSize)
|
|
||||||
}
|
|
||||||
case "leveldb":
|
|
||||||
LoadTriplesFromFileInto(ts, triplePath, cfg.LoadSize)
|
|
||||||
case "mem":
|
|
||||||
LoadTriplesFromFileInto(ts, triplePath, cfg.LoadSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadMongo(ts *mongo.TripleStore, path string) {
|
func Load(ts graph.TripleStore, cfg *config.Config, triplePath string) {
|
||||||
tChan := make(chan *graph.Triple)
|
tChan := make(chan *graph.Triple)
|
||||||
go ReadTriplesFromFile(tChan, path)
|
go ReadTriplesFromFile(tChan, triplePath)
|
||||||
ts.BulkLoad(tChan)
|
|
||||||
|
bulker, canBulk := ts.(bulkLoadable)
|
||||||
|
if canBulk && bulker.BulkLoad(tChan) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
LoadTriplesInto(tChan, ts, cfg.LoadSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) {
|
func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) {
|
||||||
|
|
@ -62,9 +58,7 @@ func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) {
|
||||||
nquads.ReadNQuadsFromReader(c, f)
|
nquads.ReadNQuadsFromReader(c, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadTriplesFromFileInto(ts graph.TripleStore, filename string, loadSize int) {
|
func LoadTriplesInto(tChan chan *graph.Triple, ts graph.TripleStore, loadSize int) {
|
||||||
tChan := make(chan *graph.Triple)
|
|
||||||
go ReadTriplesFromFile(tChan, filename)
|
|
||||||
tripleblock := make([]*graph.Triple, loadSize)
|
tripleblock := make([]*graph.Triple, loadSize)
|
||||||
i := 0
|
i := 0
|
||||||
for t := range tChan {
|
for t := range tChan {
|
||||||
|
|
|
||||||
24
db/open.go
24
db/open.go
|
|
@ -19,22 +19,20 @@ import (
|
||||||
|
|
||||||
"github.com/google/cayley/config"
|
"github.com/google/cayley/config"
|
||||||
"github.com/google/cayley/graph"
|
"github.com/google/cayley/graph"
|
||||||
"github.com/google/cayley/graph/leveldb"
|
|
||||||
"github.com/google/cayley/graph/memstore"
|
|
||||||
"github.com/google/cayley/graph/mongo"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func Open(cfg *config.Config) graph.TripleStore {
|
func Open(cfg *config.Config) graph.TripleStore {
|
||||||
glog.Infof("Opening database \"%s\" at %s", cfg.DatabaseType, cfg.DatabasePath)
|
glog.Infof("Opening database \"%s\" at %s", cfg.DatabaseType, cfg.DatabasePath)
|
||||||
switch cfg.DatabaseType {
|
ts, err := graph.NewTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
|
||||||
case "mongo", "mongodb":
|
if err != nil {
|
||||||
return mongo.NewTripleStore(cfg.DatabasePath, cfg.DatabaseOptions)
|
glog.Fatalln(err.Error())
|
||||||
case "leveldb":
|
return nil
|
||||||
return leveldb.NewTripleStore(cfg.DatabasePath, cfg.DatabaseOptions)
|
|
||||||
case "mem":
|
|
||||||
ts := memstore.NewTripleStore()
|
|
||||||
Load(ts, cfg, cfg.DatabasePath, true)
|
|
||||||
return ts
|
|
||||||
}
|
}
|
||||||
panic("Unsupported database backend " + cfg.DatabaseType)
|
|
||||||
|
// memstore is not persistent, so MUST be loaded
|
||||||
|
if cfg.DatabaseType == "memstore" {
|
||||||
|
Load(ts, cfg, cfg.DatabasePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ts
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -48,12 +48,12 @@ type TripleStore struct {
|
||||||
readopts *opt.ReadOptions
|
readopts *opt.ReadOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateNewLevelDB(path string) bool {
|
func createNewLevelDB(path string, _ graph.Options) error {
|
||||||
opts := &opt.Options{}
|
opts := &opt.Options{}
|
||||||
db, err := leveldb.OpenFile(path, opts)
|
db, err := leveldb.OpenFile(path, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorln("Error: couldn't create database: ", err)
|
glog.Errorln("Error: couldn't create database: ", err)
|
||||||
return false
|
return err
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
ts := &TripleStore{}
|
ts := &TripleStore{}
|
||||||
|
|
@ -62,10 +62,10 @@ func CreateNewLevelDB(path string) bool {
|
||||||
Sync: true,
|
Sync: true,
|
||||||
}
|
}
|
||||||
ts.Close()
|
ts.Close()
|
||||||
return true
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTripleStore(path string, options graph.Options) *TripleStore {
|
func newTripleStore(path string, options graph.Options) (graph.TripleStore, error) {
|
||||||
var ts TripleStore
|
var ts TripleStore
|
||||||
ts.path = path
|
ts.path = path
|
||||||
cache_size := DefaultCacheSize
|
cache_size := DefaultCacheSize
|
||||||
|
|
@ -94,7 +94,7 @@ func NewTripleStore(path string, options graph.Options) *TripleStore {
|
||||||
ts.db = db
|
ts.db = db
|
||||||
glog.Infoln(ts.GetStats())
|
glog.Infoln(ts.GetStats())
|
||||||
ts.getSize()
|
ts.getSize()
|
||||||
return &ts
|
return &ts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TripleStore) GetStats() string {
|
func (ts *TripleStore) GetStats() string {
|
||||||
|
|
@ -443,3 +443,9 @@ func compareBytes(a, b graph.Value) bool {
|
||||||
func (ts *TripleStore) FixedIterator() graph.FixedIterator {
|
func (ts *TripleStore) FixedIterator() graph.FixedIterator {
|
||||||
return iterator.NewFixedIteratorWithCompare(compareBytes)
|
return iterator.NewFixedIteratorWithCompare(compareBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
graph.RegisterTripleStore("leveldb",
|
||||||
|
graph.TripleStoreGetter(newTripleStore),
|
||||||
|
graph.TripleStoreInit(createNewLevelDB))
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,7 @@ type TripleStore struct {
|
||||||
// vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree
|
// vip_index map[string]map[int64]map[string]map[int64]*llrb.Tree
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTripleStore() *TripleStore {
|
func newTripleStore() *TripleStore {
|
||||||
var ts TripleStore
|
var ts TripleStore
|
||||||
ts.idMap = make(map[string]int64)
|
ts.idMap = make(map[string]int64)
|
||||||
ts.revIdMap = make(map[int64]string)
|
ts.revIdMap = make(map[int64]string)
|
||||||
|
|
@ -268,3 +268,9 @@ func (ts *TripleStore) NodesAllIterator() graph.Iterator {
|
||||||
return NewMemstoreAllIterator(ts)
|
return NewMemstoreAllIterator(ts)
|
||||||
}
|
}
|
||||||
func (ts *TripleStore) Close() {}
|
func (ts *TripleStore) Close() {}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
graph.RegisterTripleStore("memstore", func(string, graph.Options) (graph.TripleStore, error) {
|
||||||
|
return newTripleStore(), nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,11 +37,10 @@ type TripleStore struct {
|
||||||
idCache *IDLru
|
idCache *IDLru
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateNewMongoGraph(addr string, options graph.Options) bool {
|
func createNewMongoGraph(addr string, options graph.Options) error {
|
||||||
conn, err := mgo.Dial(addr)
|
conn, err := mgo.Dial(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatal("Error connecting: ", err)
|
return err
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
conn.SetSafe(&mgo.Safe{})
|
conn.SetSafe(&mgo.Safe{})
|
||||||
dbName := DefaultDBName
|
dbName := DefaultDBName
|
||||||
|
|
@ -63,14 +62,14 @@ func CreateNewMongoGraph(addr string, options graph.Options) bool {
|
||||||
db.C("triples").EnsureIndex(indexOpts)
|
db.C("triples").EnsureIndex(indexOpts)
|
||||||
indexOpts.Key = []string{"Provenance"}
|
indexOpts.Key = []string{"Provenance"}
|
||||||
db.C("triples").EnsureIndex(indexOpts)
|
db.C("triples").EnsureIndex(indexOpts)
|
||||||
return true
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTripleStore(addr string, options graph.Options) *TripleStore {
|
func newTripleStore(addr string, options graph.Options) (graph.TripleStore, error) {
|
||||||
var ts TripleStore
|
var ts TripleStore
|
||||||
conn, err := mgo.Dial(addr)
|
conn, err := mgo.Dial(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatal("Error connecting: ", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
conn.SetSafe(&mgo.Safe{})
|
conn.SetSafe(&mgo.Safe{})
|
||||||
dbName := DefaultDBName
|
dbName := DefaultDBName
|
||||||
|
|
@ -81,7 +80,7 @@ func NewTripleStore(addr string, options graph.Options) *TripleStore {
|
||||||
ts.session = conn
|
ts.session = conn
|
||||||
ts.hasher = sha1.New()
|
ts.hasher = sha1.New()
|
||||||
ts.idCache = NewIDLru(1 << 16)
|
ts.idCache = NewIDLru(1 << 16)
|
||||||
return &ts
|
return &ts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TripleStore) getIdForTriple(t *graph.Triple) string {
|
func (ts *TripleStore) getIdForTriple(t *graph.Triple) string {
|
||||||
|
|
@ -291,7 +290,11 @@ func (ts *TripleStore) TripleDirection(in graph.Value, d graph.Direction) graph.
|
||||||
return val
|
return val
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TripleStore) BulkLoad(t_chan chan *graph.Triple) {
|
func (ts *TripleStore) BulkLoad(t_chan chan *graph.Triple) bool {
|
||||||
|
if ts.Size() != 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
ts.session.SetSafe(nil)
|
ts.session.SetSafe(nil)
|
||||||
for triple := range t_chan {
|
for triple := range t_chan {
|
||||||
ts.writeTriple(triple)
|
ts.writeTriple(triple)
|
||||||
|
|
@ -334,4 +337,11 @@ func (ts *TripleStore) BulkLoad(t_chan chan *graph.Triple) {
|
||||||
}) }`}, {"args", bson.D{}}}, nil)
|
}) }`}, {"args", bson.D{}}}, nil)
|
||||||
|
|
||||||
ts.session.SetSafe(&mgo.Safe{})
|
ts.session.SetSafe(&mgo.Safe{})
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
graph.RegisterTripleStore("mongo",
|
||||||
|
graph.TripleStoreGetter(newTripleStore),
|
||||||
|
graph.TripleStoreInit(createNewMongoGraph))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ package graph
|
||||||
// triple backing store we prefer.
|
// triple backing store we prefer.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"github.com/barakmich/glog"
|
"github.com/barakmich/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -117,3 +118,42 @@ func (d Options) StringKey(key string) (string, bool) {
|
||||||
}
|
}
|
||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TripleStoreGetter func(string, Options) (TripleStore, error)
|
||||||
|
type TripleStoreInit func(string, Options) error
|
||||||
|
|
||||||
|
var storeRegistry = make(map[string]TripleStoreGetter)
|
||||||
|
var storeInitRegistry = make(map[string]TripleStoreInit)
|
||||||
|
|
||||||
|
func RegisterTripleStore(name string, getter TripleStoreGetter, initer ...TripleStoreInit) {
|
||||||
|
if _, found := storeRegistry[name]; found {
|
||||||
|
panic("already registered TripleStore " + name)
|
||||||
|
}
|
||||||
|
storeRegistry[name] = getter
|
||||||
|
if len(initer) > 0 {
|
||||||
|
storeInitRegistry[name] = initer[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTripleStore(name, dbpath string, opts Options) (TripleStore, error) {
|
||||||
|
getter, hasGetter := storeRegistry[name]
|
||||||
|
if !hasGetter {
|
||||||
|
return nil, fmt.Errorf("unknown triplestore '%s'", name)
|
||||||
|
}
|
||||||
|
return getter(dbpath, opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func InitTripleStore(name, dbpath string, opts Options) error {
|
||||||
|
initer, hasInit := storeInitRegistry[name]
|
||||||
|
if hasInit {
|
||||||
|
return initer(dbpath, opts)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unknown triplestore '%s'", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TripleStores() (t []string) {
|
||||||
|
for n := range storeRegistry {
|
||||||
|
t = append(t, n)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue