graph: make quadstore init functions idempotent
This commit is contained in:
parent
9658689995
commit
e849da9402
7 changed files with 63 additions and 24 deletions
|
|
@ -45,6 +45,7 @@ import (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
quadFile = flag.String("quads", "", "Quad file to load before going to REPL.")
|
quadFile = flag.String("quads", "", "Quad file to load before going to REPL.")
|
||||||
|
initOpt = flag.Bool("init", false, "Initialize the database before using it. Equivalent to running `cayley init` followed by the given command.")
|
||||||
quadType = flag.String("format", "cquad", `Quad format to use for loading ("cquad" or "nquad").`)
|
quadType = flag.String("format", "cquad", `Quad format to use for loading ("cquad" or "nquad").`)
|
||||||
cpuprofile = flag.String("prof", "", "Output profiling file.")
|
cpuprofile = flag.String("prof", "", "Output profiling file.")
|
||||||
queryLanguage = flag.String("query_lang", "gremlin", "Use this parser as the query language.")
|
queryLanguage = flag.String("query_lang", "gremlin", "Use this parser as the query language.")
|
||||||
|
|
@ -238,6 +239,12 @@ func main() {
|
||||||
handle.Close()
|
handle.Close()
|
||||||
|
|
||||||
case "repl":
|
case "repl":
|
||||||
|
if *initOpt {
|
||||||
|
err = db.Init(cfg)
|
||||||
|
if err != nil && err != graph.ErrDatabaseExists {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
handle, err = db.Open(cfg)
|
handle, err = db.Open(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
|
|
@ -254,6 +261,12 @@ func main() {
|
||||||
handle.Close()
|
handle.Close()
|
||||||
|
|
||||||
case "http":
|
case "http":
|
||||||
|
if *initOpt {
|
||||||
|
err = db.Init(cfg)
|
||||||
|
if err != nil && err != graph.ErrDatabaseExists {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
handle, err = db.Open(cfg)
|
handle, err = db.Open(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
|
|
|
||||||
|
|
@ -153,10 +153,9 @@ func TestLoadDatabase(t *testing.T) {
|
||||||
t.Errorf("Unexpected quadstore size, got:%d expect:1", s)
|
t.Errorf("Unexpected quadstore size, got:%d expect:1", s)
|
||||||
}
|
}
|
||||||
qs.Close()
|
qs.Close()
|
||||||
os.RemoveAll(tmpFile.Name())
|
|
||||||
|
|
||||||
err = createNewBolt(tmpFile.Name(), nil)
|
err = createNewBolt(tmpFile.Name(), nil)
|
||||||
if err != nil {
|
if err != graph.ErrDatabaseExists {
|
||||||
t.Fatal("Failed to create Bolt database.", err)
|
t.Fatal("Failed to create Bolt database.", err)
|
||||||
}
|
}
|
||||||
qs, err = newQuadStore(tmpFile.Name(), nil)
|
qs, err = newQuadStore(tmpFile.Name(), nil)
|
||||||
|
|
@ -172,20 +171,20 @@ func TestLoadDatabase(t *testing.T) {
|
||||||
|
|
||||||
//Test horizon
|
//Test horizon
|
||||||
horizon := qs.Horizon()
|
horizon := qs.Horizon()
|
||||||
if horizon.Int() != 0 {
|
if horizon.Int() != 1 {
|
||||||
t.Errorf("Unexpected horizon value, got:%d expect:0", horizon.Int())
|
t.Errorf("Unexpected horizon value, got:%d expect:1", horizon.Int())
|
||||||
}
|
}
|
||||||
|
|
||||||
w.AddQuadSet(makeQuadSet())
|
w.AddQuadSet(makeQuadSet())
|
||||||
if s := qs.Size(); s != 11 {
|
if s := qs.Size(); s != 12 {
|
||||||
t.Errorf("Unexpected quadstore size, got:%d expect:11", s)
|
t.Errorf("Unexpected quadstore size, got:%d expect:12", s)
|
||||||
}
|
}
|
||||||
if s := ts2.SizeOf(qs.ValueOf("B")); s != 5 {
|
if s := ts2.SizeOf(qs.ValueOf("B")); s != 5 {
|
||||||
t.Errorf("Unexpected quadstore size, got:%d expect:5", s)
|
t.Errorf("Unexpected quadstore size, got:%d expect:5", s)
|
||||||
}
|
}
|
||||||
horizon = qs.Horizon()
|
horizon = qs.Horizon()
|
||||||
if horizon.Int() != 11 {
|
if horizon.Int() != 12 {
|
||||||
t.Errorf("Unexpected horizon value, got:%d expect:11", horizon.Int())
|
t.Errorf("Unexpected horizon value, got:%d expect:12", horizon.Int())
|
||||||
}
|
}
|
||||||
|
|
||||||
w.RemoveQuad(quad.Quad{
|
w.RemoveQuad(quad.Quad{
|
||||||
|
|
@ -194,8 +193,8 @@ func TestLoadDatabase(t *testing.T) {
|
||||||
Object: "B",
|
Object: "B",
|
||||||
Label: "",
|
Label: "",
|
||||||
})
|
})
|
||||||
if s := qs.Size(); s != 10 {
|
if s := qs.Size(); s != 11 {
|
||||||
t.Errorf("Unexpected quadstore size after RemoveQuad, got:%d expect:10", s)
|
t.Errorf("Unexpected quadstore size after RemoveQuad, got:%d expect:11", s)
|
||||||
}
|
}
|
||||||
if s := ts2.SizeOf(qs.ValueOf("B")); s != 4 {
|
if s := ts2.SizeOf(qs.ValueOf("B")); s != 4 {
|
||||||
t.Errorf("Unexpected quadstore size, got:%d expect:4", s)
|
t.Errorf("Unexpected quadstore size, got:%d expect:4", s)
|
||||||
|
|
|
||||||
|
|
@ -85,6 +85,11 @@ func createNewBolt(path string, _ graph.Options) error {
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
qs := &QuadStore{}
|
qs := &QuadStore{}
|
||||||
qs.db = db
|
qs.db = db
|
||||||
|
defer qs.Close()
|
||||||
|
err = qs.getMetadata()
|
||||||
|
if err != errNoBucket {
|
||||||
|
return graph.ErrDatabaseExists
|
||||||
|
}
|
||||||
err = qs.createBuckets()
|
err = qs.createBuckets()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -154,7 +154,7 @@ func TestLoadDatabase(t *testing.T) {
|
||||||
qs.Close()
|
qs.Close()
|
||||||
|
|
||||||
err = createNewLevelDB(tmpDir, nil)
|
err = createNewLevelDB(tmpDir, nil)
|
||||||
if err != nil {
|
if err != graph.ErrDatabaseExists {
|
||||||
t.Fatal("Failed to create LevelDB database.")
|
t.Fatal("Failed to create LevelDB database.")
|
||||||
}
|
}
|
||||||
qs, err = newQuadStore(tmpDir, nil)
|
qs, err = newQuadStore(tmpDir, nil)
|
||||||
|
|
@ -170,20 +170,20 @@ func TestLoadDatabase(t *testing.T) {
|
||||||
|
|
||||||
//Test horizon
|
//Test horizon
|
||||||
horizon := qs.Horizon()
|
horizon := qs.Horizon()
|
||||||
if horizon.Int() != 0 {
|
if horizon.Int() != 1 {
|
||||||
t.Errorf("Unexpected horizon value, got:%d expect:0", horizon.Int())
|
t.Errorf("Unexpected horizon value, got:%d expect:1", horizon.Int())
|
||||||
}
|
}
|
||||||
|
|
||||||
w.AddQuadSet(makeQuadSet())
|
w.AddQuadSet(makeQuadSet())
|
||||||
if s := qs.Size(); s != 11 {
|
if s := qs.Size(); s != 12 {
|
||||||
t.Errorf("Unexpected quadstore size, got:%d expect:11", s)
|
t.Errorf("Unexpected quadstore size, got:%d expect:12", s)
|
||||||
}
|
}
|
||||||
if s := ts2.SizeOf(qs.ValueOf("B")); s != 5 {
|
if s := ts2.SizeOf(qs.ValueOf("B")); s != 5 {
|
||||||
t.Errorf("Unexpected quadstore size, got:%d expect:5", s)
|
t.Errorf("Unexpected quadstore size, got:%d expect:5", s)
|
||||||
}
|
}
|
||||||
horizon = qs.Horizon()
|
horizon = qs.Horizon()
|
||||||
if horizon.Int() != 11 {
|
if horizon.Int() != 12 {
|
||||||
t.Errorf("Unexpected horizon value, got:%d expect:11", horizon.Int())
|
t.Errorf("Unexpected horizon value, got:%d expect:12", horizon.Int())
|
||||||
}
|
}
|
||||||
|
|
||||||
w.RemoveQuad(quad.Quad{
|
w.RemoveQuad(quad.Quad{
|
||||||
|
|
@ -192,8 +192,8 @@ func TestLoadDatabase(t *testing.T) {
|
||||||
Object: "B",
|
Object: "B",
|
||||||
Label: "",
|
Label: "",
|
||||||
})
|
})
|
||||||
if s := qs.Size(); s != 10 {
|
if s := qs.Size(); s != 11 {
|
||||||
t.Errorf("Unexpected quadstore size after RemoveQuad, got:%d expect:10", s)
|
t.Errorf("Unexpected quadstore size after RemoveQuad, got:%d expect:11", s)
|
||||||
}
|
}
|
||||||
if s := ts2.SizeOf(qs.ValueOf("B")); s != 4 {
|
if s := ts2.SizeOf(qs.ValueOf("B")); s != 4 {
|
||||||
t.Errorf("Unexpected quadstore size, got:%d expect:4", s)
|
t.Errorf("Unexpected quadstore size, got:%d expect:4", s)
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,8 @@ const (
|
||||||
DefaultCacheSize = 2
|
DefaultCacheSize = 2
|
||||||
DefaultWriteBufferSize = 20
|
DefaultWriteBufferSize = 20
|
||||||
QuadStoreType = "leveldb"
|
QuadStoreType = "leveldb"
|
||||||
|
horizonKey = "__horizon"
|
||||||
|
sizeKey = "__size"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -87,6 +89,16 @@ func createNewLevelDB(path string, _ graph.Options) error {
|
||||||
qs.writeopts = &opt.WriteOptions{
|
qs.writeopts = &opt.WriteOptions{
|
||||||
Sync: true,
|
Sync: true,
|
||||||
}
|
}
|
||||||
|
qs.readopts = &opt.ReadOptions{}
|
||||||
|
_, err = qs.db.Get([]byte(horizonKey), qs.readopts)
|
||||||
|
if err != nil && err != leveldb.ErrNotFound {
|
||||||
|
glog.Errorln("couldn't read from leveldb during init")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err != leveldb.ErrNotFound {
|
||||||
|
return graph.ErrDatabaseExists
|
||||||
|
}
|
||||||
|
// Write some metadata
|
||||||
qs.Close()
|
qs.Close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -343,7 +355,7 @@ func (qs *QuadStore) Close() {
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
err := binary.Write(buf, binary.LittleEndian, qs.size)
|
err := binary.Write(buf, binary.LittleEndian, qs.size)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
werr := qs.db.Put([]byte("__size"), buf.Bytes(), qs.writeopts)
|
werr := qs.db.Put([]byte(sizeKey), buf.Bytes(), qs.writeopts)
|
||||||
if werr != nil {
|
if werr != nil {
|
||||||
glog.Error("could not write size before closing!")
|
glog.Error("could not write size before closing!")
|
||||||
}
|
}
|
||||||
|
|
@ -353,7 +365,7 @@ func (qs *QuadStore) Close() {
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
err = binary.Write(buf, binary.LittleEndian, qs.horizon)
|
err = binary.Write(buf, binary.LittleEndian, qs.horizon)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
werr := qs.db.Put([]byte("__horizon"), buf.Bytes(), qs.writeopts)
|
werr := qs.db.Put([]byte(horizonKey), buf.Bytes(), qs.writeopts)
|
||||||
if werr != nil {
|
if werr != nil {
|
||||||
glog.Error("could not write horizon before closing!")
|
glog.Error("could not write horizon before closing!")
|
||||||
}
|
}
|
||||||
|
|
@ -444,11 +456,11 @@ func (qs *QuadStore) getInt64ForKey(key string, empty int64) (int64, error) {
|
||||||
|
|
||||||
func (qs *QuadStore) getMetadata() error {
|
func (qs *QuadStore) getMetadata() error {
|
||||||
var err error
|
var err error
|
||||||
qs.size, err = qs.getInt64ForKey("__size", 0)
|
qs.size, err = qs.getInt64ForKey(sizeKey, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
qs.horizon, err = qs.getInt64ForKey("__horizon", 0)
|
qs.horizon, err = qs.getInt64ForKey(horizonKey, 0)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -140,6 +140,7 @@ func (d Options) BoolKey(key string) (bool, bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrCannotBulkLoad = errors.New("quadstore: cannot bulk load")
|
var ErrCannotBulkLoad = errors.New("quadstore: cannot bulk load")
|
||||||
|
var ErrDatabaseExists = errors.New("quadstore: cannot init; database already exists")
|
||||||
|
|
||||||
type BulkLoader interface {
|
type BulkLoader interface {
|
||||||
// BulkLoad loads Quads from a quad.Unmarshaler in bulk to the QuadStore.
|
// BulkLoad loads Quads from a quad.Unmarshaler in bulk to the QuadStore.
|
||||||
|
|
|
||||||
|
|
@ -64,6 +64,7 @@ func connectSQLTables(addr string, _ graph.Options) (*sql.DB, error) {
|
||||||
|
|
||||||
func createSQLTables(addr string, options graph.Options) error {
|
func createSQLTables(addr string, options graph.Options) error {
|
||||||
conn, err := connectSQLTables(addr, options)
|
conn, err := connectSQLTables(addr, options)
|
||||||
|
defer conn.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -89,6 +90,11 @@ func createSQLTables(addr string, options graph.Options) error {
|
||||||
UNIQUE(subject_hash, predicate_hash, object_hash, label_hash)
|
UNIQUE(subject_hash, predicate_hash, object_hash, label_hash)
|
||||||
);`)
|
);`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
errd := err.(*pq.Error)
|
||||||
|
if errd.Code == "42P07" {
|
||||||
|
return graph.ErrDatabaseExists
|
||||||
|
}
|
||||||
glog.Errorf("Cannot create quad table: %v", quadTable)
|
glog.Errorf("Cannot create quad table: %v", quadTable)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -105,6 +111,7 @@ func createSQLTables(addr string, options graph.Options) error {
|
||||||
`, factor, factor, factor))
|
`, factor, factor, factor))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Cannot create indices: %v", index)
|
glog.Errorf("Cannot create indices: %v", index)
|
||||||
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
tx.Commit()
|
tx.Commit()
|
||||||
|
|
@ -310,7 +317,9 @@ func (qs *QuadStore) Horizon() graph.PrimaryKey {
|
||||||
var horizon int64
|
var horizon int64
|
||||||
err := qs.db.QueryRow("SELECT horizon FROM quads ORDER BY horizon DESC LIMIT 1;").Scan(&horizon)
|
err := qs.db.QueryRow("SELECT horizon FROM quads ORDER BY horizon DESC LIMIT 1;").Scan(&horizon)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't execute horizon: %v", err)
|
if err != sql.ErrNoRows {
|
||||||
|
glog.Errorf("Couldn't execute horizon: %v", err)
|
||||||
|
}
|
||||||
return graph.NewSequentialKey(0)
|
return graph.NewSequentialKey(0)
|
||||||
}
|
}
|
||||||
return graph.NewSequentialKey(horizon)
|
return graph.NewSequentialKey(horizon)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue