diff --git a/.gitignore b/.gitignore index 8c613ec..0ed1b73 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,3 @@ main *.peg.go cayley cayley.cfg -leveldb/ -snappy/ -pkg/ diff --git a/graph/leveldb/leveldb_all_iterator.go b/graph/leveldb/leveldb_all_iterator.go new file mode 100644 index 0000000..4fad9d8 --- /dev/null +++ b/graph/leveldb/leveldb_all_iterator.go @@ -0,0 +1,134 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "bytes" + "fmt" + "strings" + + leveldb_it "github.com/syndtr/goleveldb/leveldb/iterator" + leveldb_opt "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/google/cayley/graph" +) + +type LevelDBAllIterator struct { + graph.BaseIterator + prefix []byte + dir string + open bool + it leveldb_it.Iterator + ts *LevelDBTripleStore + ro *leveldb_opt.ReadOptions +} + +func NewLevelDBAllIterator(prefix, dir string, ts *LevelDBTripleStore) *LevelDBAllIterator { + var it LevelDBAllIterator + graph.BaseIteratorInit(&it.BaseIterator) + it.ro = &leveldb_opt.ReadOptions{} + it.ro.DontFillCache = true + it.it = ts.db.NewIterator(nil, it.ro) + it.prefix = []byte(prefix) + it.dir = dir + it.open = true + it.ts = ts + it.it.Seek(it.prefix) + if !it.it.Valid() { + it.open = false + it.it.Release() + } + return &it +} + +func (a *LevelDBAllIterator) Reset() { + if !a.open { + a.it = a.ts.db.NewIterator(nil, a.ro) + a.open = true + } + a.it.Seek(a.prefix) + if !a.it.Valid() { + a.open = false + a.it.Release() + } +} + +func (a *LevelDBAllIterator) Clone() graph.Iterator { + out := NewLevelDBAllIterator(string(a.prefix), a.dir, a.ts) + out.CopyTagsFrom(a) + return out +} + +func (a *LevelDBAllIterator) Next() (graph.TSVal, bool) { + if !a.open { + a.Last = nil + return nil, false + } + var out []byte + out = make([]byte, len(a.it.Key())) + copy(out, a.it.Key()) + a.it.Next() + if !a.it.Valid() { + a.Close() + } + if !bytes.HasPrefix(out, a.prefix) { + a.Close() + return nil, false + } + a.Last = out + return out, true +} + +func (a *LevelDBAllIterator) Check(v graph.TSVal) bool { + a.Last = v + return true +} + +func (lit *LevelDBAllIterator) Close() { + if lit.open { + lit.it.Release() + lit.open = false + } +} + +func (a *LevelDBAllIterator) Size() (int64, bool) { + size, err := a.ts.GetApproximateSizeForPrefix(a.prefix) + if err == nil { + return size, false + } + // INT64_MAX + return int64(^uint64(0) >> 1), false +} + +func (lit *LevelDBAllIterator) DebugString(indent int) string { + size, _ := lit.Size() + return fmt.Sprintf("%s(%s tags: %v leveldb size:%d %s %p)", strings.Repeat(" ", indent), lit.Type(), lit.Tags(), size, lit.dir, lit) +} + +func (lit *LevelDBAllIterator) Type() string { return "all" } +func (lit *LevelDBAllIterator) Sorted() bool { return false } + +func (lit *LevelDBAllIterator) Optimize() (graph.Iterator, bool) { + return lit, false +} + +func (lit *LevelDBAllIterator) GetStats() *graph.IteratorStats { + s, _ := lit.Size() + return &graph.IteratorStats{ + CheckCost: 1, + NextCost: 2, + Size: s, + } +} diff --git a/graph/leveldb/leveldb_iterator.go b/graph/leveldb/leveldb_iterator.go new file mode 100644 index 0000000..b657073 --- /dev/null +++ b/graph/leveldb/leveldb_iterator.go @@ -0,0 +1,212 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "bytes" + _ "encoding/binary" + "fmt" + "strings" + + leveldb_it "github.com/syndtr/goleveldb/leveldb/iterator" + leveldb_opt "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/google/cayley/graph" +) + +type LevelDBIterator struct { + graph.BaseIterator + nextPrefix []byte + checkId []byte + dir string + open bool + it leveldb_it.Iterator + ts *LevelDBTripleStore + ro *leveldb_opt.ReadOptions + originalPrefix string +} + +func NewLevelDBIterator(prefix, dir string, value graph.TSVal, ts *LevelDBTripleStore) *LevelDBIterator { + var it LevelDBIterator + graph.BaseIteratorInit(&it.BaseIterator) + it.checkId = value.([]byte) + it.dir = dir + it.originalPrefix = prefix + it.nextPrefix = make([]byte, 0, 2+ts.hasher.Size()) + it.nextPrefix = append(it.nextPrefix, []byte(prefix)...) + it.nextPrefix = append(it.nextPrefix, []byte(it.checkId[1:])...) + it.ro = &leveldb_opt.ReadOptions{} + it.ro.DontFillCache = true + it.it = ts.db.NewIterator(nil, it.ro) + it.open = true + it.ts = ts + ok := it.it.Seek(it.nextPrefix) + if !ok { + it.open = false + it.it.Release() + } + return &it +} + +func (lit *LevelDBIterator) Reset() { + if !lit.open { + lit.it = lit.ts.db.NewIterator(nil, lit.ro) + lit.open = true + } + ok := lit.it.Seek(lit.nextPrefix) + if !ok { + lit.open = false + lit.it.Release() + } +} + +func (lit *LevelDBIterator) Clone() graph.Iterator { + out := NewLevelDBIterator(lit.originalPrefix, lit.dir, lit.checkId, lit.ts) + out.CopyTagsFrom(lit) + return out +} + +func (lit *LevelDBIterator) Close() { + if lit.open { + lit.it.Release() + lit.open = false + } +} + +func (lit *LevelDBIterator) Next() (graph.TSVal, bool) { + if lit.it == nil { + lit.Last = nil + return nil, false + } + if !lit.open { + lit.Last = nil + return nil, false + } + if !lit.it.Valid() { + lit.Last = nil + lit.Close() + return nil, false + } + if bytes.HasPrefix(lit.it.Key(), lit.nextPrefix) { + out := make([]byte, len(lit.it.Key())) + copy(out, lit.it.Key()) + lit.Last = out + ok := lit.it.Next() + if !ok { + lit.Close() + } + return out, true + } + lit.Close() + lit.Last = nil + return nil, false +} + +func GetPositionFromPrefix(prefix []byte, dir string, ts *LevelDBTripleStore) int { + if bytes.Equal(prefix, []byte("sp")) { + switch dir { + case "s": + return 2 + case "p": + return ts.hasher.Size() + 2 + case "o": + return 2*ts.hasher.Size() + 2 + case "c": + return -1 + } + } + if bytes.Equal(prefix, []byte("po")) { + switch dir { + case "s": + return 2*ts.hasher.Size() + 2 + case "p": + return 2 + case "o": + return ts.hasher.Size() + 2 + case "c": + return -1 + } + } + if bytes.Equal(prefix, []byte("os")) { + switch dir { + case "s": + return ts.hasher.Size() + 2 + case "p": + return 2*ts.hasher.Size() + 2 + case "o": + return 2 + case "c": + return -1 + } + } + if bytes.Equal(prefix, []byte("cp")) { + switch dir { + case "s": + return 2*ts.hasher.Size() + 2 + case "p": + return ts.hasher.Size() + 2 + case "o": + return 3*ts.hasher.Size() + 2 + case "c": + return 2 + } + } + panic("Notreached") +} + +func (lit *LevelDBIterator) Check(v graph.TSVal) bool { + val := v.([]byte) + if val[0] == 'z' { + return false + } + offset := GetPositionFromPrefix(val[0:2], lit.dir, lit.ts) + if offset != -1 { + if bytes.HasPrefix(val[offset:], lit.checkId[1:]) { + return true + } + } else { + nameForDir := lit.ts.GetTriple(v).Get(lit.dir) + hashForDir := lit.ts.GetIdFor(nameForDir).([]byte) + if bytes.Equal(hashForDir, lit.checkId) { + return true + } + } + return false +} + +func (lit *LevelDBIterator) Size() (int64, bool) { + return lit.ts.GetSizeFor(lit.checkId), true +} + +func (lit *LevelDBIterator) DebugString(indent int) string { + size, _ := lit.Size() + return fmt.Sprintf("%s(%s %d tags: %v dir: %s size:%d %s)", strings.Repeat(" ", indent), lit.Type(), lit.GetUid(), lit.Tags(), lit.dir, size, lit.ts.GetNameFor(lit.checkId)) +} + +func (lit *LevelDBIterator) Type() string { return "leveldb" } +func (lit *LevelDBIterator) Sorted() bool { return false } + +func (lit *LevelDBIterator) Optimize() (graph.Iterator, bool) { + return lit, false +} + +func (lit *LevelDBIterator) GetStats() *graph.IteratorStats { + s, _ := lit.Size() + return &graph.IteratorStats{ + CheckCost: 1, + NextCost: 2, + Size: s, + } +} diff --git a/graph/leveldb/leveldb_test.go b/graph/leveldb/leveldb_test.go new file mode 100644 index 0000000..0412c94 --- /dev/null +++ b/graph/leveldb/leveldb_test.go @@ -0,0 +1,435 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "io/ioutil" + "os" + "sort" + "testing" + + . "github.com/smartystreets/goconvey/convey" + + "github.com/google/cayley/graph" +) + +func makeTripleSet() []*graph.Triple { + tripleSet := []*graph.Triple{ + graph.MakeTriple("A", "follows", "B", ""), + graph.MakeTriple("C", "follows", "B", ""), + graph.MakeTriple("C", "follows", "D", ""), + graph.MakeTriple("D", "follows", "B", ""), + graph.MakeTriple("B", "follows", "F", ""), + graph.MakeTriple("F", "follows", "G", ""), + graph.MakeTriple("D", "follows", "G", ""), + graph.MakeTriple("E", "follows", "F", ""), + graph.MakeTriple("B", "status", "cool", "status_graph"), + graph.MakeTriple("D", "status", "cool", "status_graph"), + graph.MakeTriple("G", "status", "cool", "status_graph"), + } + return tripleSet +} + +func extractTripleFromIterator(ts graph.TripleStore, it graph.Iterator) []string { + var output []string + for { + val, ok := it.Next() + if !ok { + break + } + output = append(output, ts.GetTriple(val).ToString()) + } + return output +} + +func extractValuesFromIterator(ts graph.TripleStore, it graph.Iterator) []string { + var output []string + for { + val, ok := it.Next() + if !ok { + break + } + output = append(output, ts.GetNameFor(val)) + } + return output +} + +func TestCreateDatabase(t *testing.T) { + + Convey("Given a database path", t, func() { + tmpDir, err := ioutil.TempDir(os.TempDir(), "cayley_test") + t.Log(tmpDir) + if err != nil { + t.Fatal("Cannot use ioutil.", err) + } + + Convey("Creates a database", func() { + ok := CreateNewLevelDB(tmpDir) + So(ok, ShouldBeTrue) + Convey("And has good defaults for a new database", func() { + ts := NewDefaultLevelDBTripleStore(tmpDir, nil) + So(ts, ShouldNotBeNil) + So(ts.Size(), ShouldEqual, 0) + ts.Close() + }) + }) + + Convey("Fails if it cannot create the database", func() { + ok := CreateNewLevelDB("/dev/null/some terrible path") + So(ok, ShouldBeFalse) + So(func() { NewDefaultLevelDBTripleStore("/dev/null/some terrible path", nil) }, ShouldPanic) + }) + + Reset(func() { + os.RemoveAll(tmpDir) + }) + + }) + +} + +func TestLoadDatabase(t *testing.T) { + var ts *LevelDBTripleStore + + Convey("Given a created database path", t, func() { + tmpDir, _ := ioutil.TempDir(os.TempDir(), "cayley_test") + t.Log(tmpDir) + ok := CreateNewLevelDB(tmpDir) + So(ok, ShouldBeTrue) + ts = NewDefaultLevelDBTripleStore(tmpDir, nil) + + Convey("Can load a single triple", func() { + ts.AddTriple(graph.MakeTriple("Something", "points_to", "Something Else", "context")) + So(ts.GetNameFor(ts.GetIdFor("Something")), ShouldEqual, "Something") + So(ts.Size(), ShouldEqual, 1) + }) + + Convey("Can load many triples", func() { + + ts.AddTripleSet(makeTripleSet()) + So(ts.Size(), ShouldEqual, 11) + So(ts.GetSizeFor(ts.GetIdFor("B")), ShouldEqual, 5) + + Convey("Can delete triples", func() { + ts.RemoveTriple(graph.MakeTriple("A", "follows", "B", "")) + So(ts.Size(), ShouldEqual, 10) + So(ts.GetSizeFor(ts.GetIdFor("B")), ShouldEqual, 4) + }) + }) + + Reset(func() { + ts.Close() + os.RemoveAll(tmpDir) + }) + + }) + +} + +func TestIterator(t *testing.T) { + var ts *LevelDBTripleStore + + Convey("Given a prepared database", t, func() { + tmpDir, _ := ioutil.TempDir(os.TempDir(), "cayley_test") + t.Log(tmpDir) + defer os.RemoveAll(tmpDir) + ok := CreateNewLevelDB(tmpDir) + So(ok, ShouldBeTrue) + ts = NewDefaultLevelDBTripleStore(tmpDir, nil) + ts.AddTripleSet(makeTripleSet()) + var it graph.Iterator + + Convey("Can create an all iterator for nodes", func() { + it = ts.GetNodesAllIterator() + So(it, ShouldNotBeNil) + + Convey("Has basics", func() { + size, accurate := it.Size() + So(size, ShouldBeBetween, 0, 20) + So(accurate, ShouldBeFalse) + So(it.Type(), ShouldEqual, "all") + re_it, ok := it.Optimize() + So(ok, ShouldBeFalse) + So(re_it, ShouldPointTo, it) + }) + + Convey("Iterates all nodes", func() { + expected := []string{ + "A", + "B", + "C", + "D", + "E", + "F", + "G", + "follows", + "status", + "cool", + "status_graph", + } + sort.Strings(expected) + actual := extractValuesFromIterator(ts, it) + sort.Strings(actual) + So(actual, ShouldResemble, expected) + it.Reset() + actual = extractValuesFromIterator(ts, it) + sort.Strings(actual) + So(actual, ShouldResemble, expected) + + }) + + Convey("Contains a couple nodes", func() { + So(it.Check(ts.GetIdFor("A")), ShouldBeTrue) + So(it.Check(ts.GetIdFor("cool")), ShouldBeTrue) + //So(it.Check(ts.GetIdFor("baller")), ShouldBeFalse) + }) + + Reset(func() { + it.Reset() + }) + }) + + Convey("Can create an all iterator for edges", func() { + it := ts.GetTriplesAllIterator() + So(it, ShouldNotBeNil) + Convey("Has basics", func() { + size, accurate := it.Size() + So(size, ShouldBeBetween, 0, 20) + So(accurate, ShouldBeFalse) + So(it.Type(), ShouldEqual, "all") + re_it, ok := it.Optimize() + So(ok, ShouldBeFalse) + So(re_it, ShouldPointTo, it) + }) + + Convey("Iterates an edge", func() { + edge_val, _ := it.Next() + triple := ts.GetTriple(edge_val) + set := makeTripleSet() + var string_set []string + for _, t := range set { + string_set = append(string_set, t.ToString()) + } + So(triple.ToString(), ShouldBeIn, string_set) + }) + + Reset(func() { + ts.Close() + }) + }) + }) + +} + +func TestSetIterator(t *testing.T) { + var ts *LevelDBTripleStore + var tmpDir string + + Convey("Given a prepared database", t, func() { + tmpDir, _ = ioutil.TempDir(os.TempDir(), "cayley_test") + t.Log(tmpDir) + defer os.RemoveAll(tmpDir) + ok := CreateNewLevelDB(tmpDir) + So(ok, ShouldBeTrue) + ts = NewDefaultLevelDBTripleStore(tmpDir, nil) + ts.AddTripleSet(makeTripleSet()) + var it graph.Iterator + + Convey("Can create a subject iterator", func() { + it = ts.GetTripleIterator("s", ts.GetIdFor("C")) + + Convey("Containing the right things", func() { + expected := []string{ + graph.MakeTriple("C", "follows", "B", "").ToString(), + graph.MakeTriple("C", "follows", "D", "").ToString(), + } + actual := extractTripleFromIterator(ts, it) + sort.Strings(actual) + sort.Strings(expected) + So(actual, ShouldResemble, expected) + }) + + Convey("And checkable", func() { + and := graph.NewAndIterator() + and.AddSubIterator(ts.GetTriplesAllIterator()) + and.AddSubIterator(it) + + expected := []string{ + graph.MakeTriple("C", "follows", "B", "").ToString(), + graph.MakeTriple("C", "follows", "D", "").ToString(), + } + actual := extractTripleFromIterator(ts, and) + sort.Strings(actual) + sort.Strings(expected) + So(actual, ShouldResemble, expected) + }) + Reset(func() { + it.Reset() + }) + + }) + + Convey("Can create an object iterator", func() { + it = ts.GetTripleIterator("o", ts.GetIdFor("F")) + + Convey("Containing the right things", func() { + expected := []string{ + graph.MakeTriple("B", "follows", "F", "").ToString(), + graph.MakeTriple("E", "follows", "F", "").ToString(), + } + actual := extractTripleFromIterator(ts, it) + sort.Strings(actual) + sort.Strings(expected) + So(actual, ShouldResemble, expected) + }) + + Convey("Mutually and-checkable", func() { + and := graph.NewAndIterator() + and.AddSubIterator(ts.GetTripleIterator("s", ts.GetIdFor("B"))) + and.AddSubIterator(it) + + expected := []string{ + graph.MakeTriple("B", "follows", "F", "").ToString(), + } + actual := extractTripleFromIterator(ts, and) + sort.Strings(actual) + sort.Strings(expected) + So(actual, ShouldResemble, expected) + }) + + }) + + Convey("Can create a predicate iterator", func() { + it = ts.GetTripleIterator("p", ts.GetIdFor("status")) + + Convey("Containing the right things", func() { + expected := []string{ + graph.MakeTriple("B", "status", "cool", "status_graph").ToString(), + graph.MakeTriple("D", "status", "cool", "status_graph").ToString(), + graph.MakeTriple("G", "status", "cool", "status_graph").ToString(), + } + actual := extractTripleFromIterator(ts, it) + sort.Strings(actual) + sort.Strings(expected) + So(actual, ShouldResemble, expected) + }) + + }) + + Convey("Can create a provenance iterator", func() { + it = ts.GetTripleIterator("c", ts.GetIdFor("status_graph")) + + Convey("Containing the right things", func() { + expected := []string{ + graph.MakeTriple("B", "status", "cool", "status_graph").ToString(), + graph.MakeTriple("D", "status", "cool", "status_graph").ToString(), + graph.MakeTriple("G", "status", "cool", "status_graph").ToString(), + } + actual := extractTripleFromIterator(ts, it) + sort.Strings(actual) + sort.Strings(expected) + So(actual, ShouldResemble, expected) + }) + + Convey("Can be cross-checked", func() { + and := graph.NewAndIterator() + // Order is important + and.AddSubIterator(ts.GetTripleIterator("s", ts.GetIdFor("B"))) + and.AddSubIterator(it) + + expected := []string{ + graph.MakeTriple("B", "status", "cool", "status_graph").ToString(), + } + actual := extractTripleFromIterator(ts, and) + So(actual, ShouldResemble, expected) + }) + + Convey("Can check against other iterators", func() { + and := graph.NewAndIterator() + // Order is important + and.AddSubIterator(it) + and.AddSubIterator(ts.GetTripleIterator("s", ts.GetIdFor("B"))) + + expected := []string{ + graph.MakeTriple("B", "status", "cool", "status_graph").ToString(), + } + actual := extractTripleFromIterator(ts, and) + So(actual, ShouldResemble, expected) + }) + Reset(func() { + it.Reset() + }) + + }) + + Reset(func() { + ts.Close() + }) + + }) + +} + +func TestOptimize(t *testing.T) { + var ts *LevelDBTripleStore + var lto graph.Iterator + var tmpDir string + + Convey("Given a prepared database", t, func() { + tmpDir, _ = ioutil.TempDir(os.TempDir(), "cayley_test") + t.Log(tmpDir) + defer os.RemoveAll(tmpDir) + ok := CreateNewLevelDB(tmpDir) + So(ok, ShouldBeTrue) + ts = NewDefaultLevelDBTripleStore(tmpDir, nil) + ts.AddTripleSet(makeTripleSet()) + + Convey("With an linksto-fixed pair", func() { + fixed := ts.MakeFixed() + fixed.AddValue(ts.GetIdFor("F")) + fixed.AddTag("internal") + lto = graph.NewLinksToIterator(ts, fixed, "o") + + Convey("Creates an appropriate iterator", func() { + oldIt := lto.Clone() + newIt, ok := lto.Optimize() + So(ok, ShouldBeTrue) + So(newIt.Type(), ShouldEqual, "leveldb") + + Convey("Containing the right things", func() { + afterOp := extractTripleFromIterator(ts, newIt) + beforeOp := extractTripleFromIterator(ts, oldIt) + sort.Strings(afterOp) + sort.Strings(beforeOp) + So(afterOp, ShouldResemble, beforeOp) + }) + + Convey("With the correct tags", func() { + oldIt.Next() + newIt.Next() + oldResults := make(map[string]graph.TSVal) + oldIt.TagResults(&oldResults) + newResults := make(map[string]graph.TSVal) + oldIt.TagResults(&newResults) + So(newResults, ShouldResemble, oldResults) + }) + + }) + + }) + + }) + +} diff --git a/graph/leveldb/leveldb_triplestore.go b/graph/leveldb/leveldb_triplestore.go new file mode 100644 index 0000000..edc2e10 --- /dev/null +++ b/graph/leveldb/leveldb_triplestore.go @@ -0,0 +1,429 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "bytes" + "crypto/sha1" + "encoding/binary" + "encoding/json" + "fmt" + "hash" + + "github.com/barakmich/glog" + "github.com/syndtr/goleveldb/leveldb" + leveldb_cache "github.com/syndtr/goleveldb/leveldb/cache" + leveldb_opt "github.com/syndtr/goleveldb/leveldb/opt" + leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + + "github.com/google/cayley/graph" +) + +const DefaultCacheSize = 2 +const DefaultWriteBufferSize = 20 + +type LevelDBTripleStore struct { + dbOpts *leveldb_opt.Options + db *leveldb.DB + path string + open bool + size int64 + hasher hash.Hash + writeopts *leveldb_opt.WriteOptions + readopts *leveldb_opt.ReadOptions +} + +func CreateNewLevelDB(path string) bool { + opts := &leveldb_opt.Options{} + db, err := leveldb.OpenFile(path, opts) + if err != nil { + glog.Errorln("Error: couldn't create database", err) + return false + } + defer db.Close() + ts := &LevelDBTripleStore{} + ts.db = db + ts.writeopts = &leveldb_opt.WriteOptions{ + Sync: true, + } + ts.Close() + return true +} + +func NewDefaultLevelDBTripleStore(path string, options graph.OptionsDict) *LevelDBTripleStore { + var ts LevelDBTripleStore + ts.path = path + cache_size := DefaultCacheSize + if val, ok := options.GetIntKey("cache_size_mb"); ok { + cache_size = val + } + ts.dbOpts = &leveldb_opt.Options{ + BlockCache: leveldb_cache.NewLRUCache(cache_size * leveldb_opt.MiB), + } + ts.dbOpts.ErrorIfMissing = true + + write_buffer_mb := DefaultWriteBufferSize + if val, ok := options.GetIntKey("write_buffer_mb"); ok { + write_buffer_mb = val + } + ts.dbOpts.WriteBuffer = write_buffer_mb * leveldb_opt.MiB + ts.hasher = sha1.New() + ts.writeopts = &leveldb_opt.WriteOptions{ + Sync: false, + } + ts.readopts = &leveldb_opt.ReadOptions{} + db, err := leveldb.OpenFile(ts.path, ts.dbOpts) + if err != nil { + panic("Error, couldn't open! " + err.Error()) + } + ts.db = db + glog.Infoln(ts.GetStats()) + ts.getSize() + return &ts +} + +func (ts *LevelDBTripleStore) GetStats() string { + out := "" + stats, err := ts.db.GetProperty("leveldb.stats") + if err == nil { + out += fmt.Sprintln("Stats: ", stats) + } + out += fmt.Sprintln("Size: ", ts.size) + return out +} + +func (ts *LevelDBTripleStore) Size() int64 { + return ts.size +} + +func (ts *LevelDBTripleStore) createKeyFor(dir1, dir2, dir3 string, triple *graph.Triple) []byte { + key := make([]byte, 0, 2+(ts.hasher.Size()*3)) + key = append(key, []byte(dir1+dir2)...) + key = append(key, ts.convertStringToByteHash(triple.Get(dir1))...) + key = append(key, ts.convertStringToByteHash(triple.Get(dir2))...) + key = append(key, ts.convertStringToByteHash(triple.Get(dir3))...) + return key +} + +func (ts *LevelDBTripleStore) createProvKeyFor(dir1, dir2, dir3 string, triple *graph.Triple) []byte { + key := make([]byte, 0, 2+(ts.hasher.Size()*4)) + key = append(key, []byte("c"+dir1)...) + key = append(key, ts.convertStringToByteHash(triple.Get("c"))...) + key = append(key, ts.convertStringToByteHash(triple.Get(dir1))...) + key = append(key, ts.convertStringToByteHash(triple.Get(dir2))...) + key = append(key, ts.convertStringToByteHash(triple.Get(dir3))...) + return key +} + +func (ts *LevelDBTripleStore) createValueKeyFor(s string) []byte { + key := make([]byte, 0, 1+ts.hasher.Size()) + key = append(key, []byte("z")...) + key = append(key, ts.convertStringToByteHash(s)...) + return key +} + +func (ts *LevelDBTripleStore) AddTriple(t *graph.Triple) { + batch := &leveldb.Batch{} + ts.buildWrite(batch, t) + err := ts.db.Write(batch, ts.writeopts) + if err != nil { + glog.Errorf("Couldn't write to DB for triple %s", t.ToString()) + return + } + ts.size++ +} + +func (ts *LevelDBTripleStore) RemoveTriple(t *graph.Triple) { + _, err := ts.db.Get(ts.createKeyFor("s", "p", "o", t), ts.readopts) + if err != nil && err != leveldb.ErrNotFound { + glog.Errorf("Couldn't access DB to confirm deletion") + return + } + if err == leveldb.ErrNotFound { + // No such triple in the database, forget about it. + return + } + batch := &leveldb.Batch{} + batch.Delete(ts.createKeyFor("s", "p", "o", t)) + batch.Delete(ts.createKeyFor("o", "s", "p", t)) + batch.Delete(ts.createKeyFor("p", "o", "s", t)) + ts.UpdateValueKeyBy(t.Get("s"), -1, batch) + ts.UpdateValueKeyBy(t.Get("p"), -1, batch) + ts.UpdateValueKeyBy(t.Get("o"), -1, batch) + if t.Get("c") != "" { + batch.Delete(ts.createProvKeyFor("p", "s", "o", t)) + ts.UpdateValueKeyBy(t.Get("c"), -1, batch) + } + err = ts.db.Write(batch, nil) + if err != nil { + glog.Errorf("Couldn't delete triple %s", t.ToString()) + return + } + ts.size-- +} + +func (ts *LevelDBTripleStore) buildTripleWrite(batch *leveldb.Batch, t *graph.Triple) { + bytes, err := json.Marshal(*t) + if err != nil { + glog.Errorf("Couldn't write to buffer for triple %s\n %s\n", t.ToString(), err) + return + } + batch.Put(ts.createKeyFor("s", "p", "o", t), bytes) + batch.Put(ts.createKeyFor("o", "s", "p", t), bytes) + batch.Put(ts.createKeyFor("p", "o", "s", t), bytes) + if t.Get("c") != "" { + batch.Put(ts.createProvKeyFor("p", "s", "o", t), bytes) + } +} + +func (ts *LevelDBTripleStore) buildWrite(batch *leveldb.Batch, t *graph.Triple) { + ts.buildTripleWrite(batch, t) + ts.UpdateValueKeyBy(t.Get("s"), 1, nil) + ts.UpdateValueKeyBy(t.Get("p"), 1, nil) + ts.UpdateValueKeyBy(t.Get("o"), 1, nil) + if t.Get("c") != "" { + ts.UpdateValueKeyBy(t.Get("c"), 1, nil) + } +} + +type ValueData struct { + Name string + Size int64 +} + +func (ts *LevelDBTripleStore) UpdateValueKeyBy(name string, amount int, batch *leveldb.Batch) { + value := &ValueData{name, int64(amount)} + key := ts.createValueKeyFor(name) + b, err := ts.db.Get(key, ts.readopts) + + // Error getting the node from the database. + if err != nil && err != leveldb.ErrNotFound { + glog.Errorf("Error reading Value %s from the DB\n", name) + return + } + + // Node exists in the database -- unmarshal and update. + if b != nil && err != leveldb.ErrNotFound { + err = json.Unmarshal(b, value) + if err != nil { + glog.Errorln("Error: couldn't reconstruct value ", err) + return + } + value.Size += int64(amount) + } + + // Are we deleting something? + if amount < 0 { + if value.Size <= 0 { + if batch == nil { + ts.db.Delete(key, ts.writeopts) + } else { + batch.Delete(key) + } + return + } + } + + // Repackage and rewrite. + bytes, err := json.Marshal(&value) + if err != nil { + glog.Errorf("Couldn't write to buffer for value %s\n %s", name, err) + return + } + if batch == nil { + ts.db.Put(key, bytes, ts.writeopts) + } else { + batch.Put(key, bytes) + } +} + +func (ts *LevelDBTripleStore) AddTripleSet(t_s []*graph.Triple) { + batch := &leveldb.Batch{} + newTs := len(t_s) + resizeMap := make(map[string]int) + for _, t := range t_s { + ts.buildTripleWrite(batch, t) + resizeMap[t.Sub]++ + resizeMap[t.Pred]++ + resizeMap[t.Obj]++ + if t.Provenance != "" { + resizeMap[t.Provenance]++ + } + } + for k, v := range resizeMap { + ts.UpdateValueKeyBy(k, v, batch) + } + err := ts.db.Write(batch, ts.writeopts) + if err != nil { + glog.Errorf("Couldn't write to DB for tripleset") + return + } + ts.size += int64(newTs) +} + +func (ldbts *LevelDBTripleStore) Close() { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, ldbts.size) + if err == nil { + werr := ldbts.db.Put([]byte("__size"), buf.Bytes(), ldbts.writeopts) + if werr != nil { + glog.Errorf("Couldn't write size before closing!") + } + } else { + glog.Errorf("Couldn't convert size before closing!") + } + ldbts.db.Close() + ldbts.open = false +} + +func (ts *LevelDBTripleStore) GetTriple(k graph.TSVal) *graph.Triple { + var triple graph.Triple + b, err := ts.db.Get(k.([]byte), ts.readopts) + if err != nil && err != leveldb.ErrNotFound { + glog.Errorln("Error: couldn't get triple from DB") + return &graph.Triple{} + } + if err == leveldb.ErrNotFound { + // No harm, no foul. + return &graph.Triple{} + } + err = json.Unmarshal(b, &triple) + if err != nil { + glog.Errorln("Error: couldn't reconstruct triple") + return &graph.Triple{} + } + return &triple +} + +func (ts *LevelDBTripleStore) convertStringToByteHash(s string) []byte { + ts.hasher.Reset() + key := make([]byte, 0, ts.hasher.Size()) + ts.hasher.Write([]byte(s)) + key = ts.hasher.Sum(key) + return key +} + +func (ts *LevelDBTripleStore) GetIdFor(s string) graph.TSVal { + return ts.createValueKeyFor(s) +} + +func (ts *LevelDBTripleStore) getValueData(value_key []byte) ValueData { + var out ValueData + if glog.V(3) { + glog.V(3).Infof("%s %v\n", string(value_key[0]), value_key) + } + b, err := ts.db.Get(value_key, ts.readopts) + if err != nil && err != leveldb.ErrNotFound { + glog.Errorln("Error: couldn't get value from DB") + return out + } + if b != nil && err != leveldb.ErrNotFound { + err = json.Unmarshal(b, &out) + if err != nil { + glog.Errorln("Error: couldn't reconstruct value") + return ValueData{} + } + } + return out +} + +func (ts *LevelDBTripleStore) GetNameFor(k graph.TSVal) string { + if k == nil { + glog.V(2).Infoln("k was nil") + return "" + } + return ts.getValueData(k.([]byte)).Name +} + +func (ts *LevelDBTripleStore) GetSizeFor(k graph.TSVal) int64 { + if k == nil { + return 0 + } + return int64(ts.getValueData(k.([]byte)).Size) +} + +func (ts *LevelDBTripleStore) getSize() { + var size int64 + b, err := ts.db.Get([]byte("__size"), ts.readopts) + if err != nil && err != leveldb.ErrNotFound { + panic("Couldn't read size " + err.Error()) + } + if err == leveldb.ErrNotFound { + // Must be a new database. Cool + ts.size = 0 + return + } + buf := bytes.NewBuffer(b) + err = binary.Read(buf, binary.LittleEndian, &size) + if err != nil { + glog.Errorln("Error: couldn't parse size") + } + ts.size = size +} + +func (ts *LevelDBTripleStore) GetApproximateSizeForPrefix(pre []byte) (int64, error) { + limit := make([]byte, len(pre)) + copy(limit, pre) + end := len(limit) - 1 + limit[end]++ + ranges := make([]leveldb_util.Range, 1) + ranges[0].Start = pre + ranges[0].Limit = limit + sizes, err := ts.db.GetApproximateSizes(ranges) + if err == nil { + return (int64(sizes[0]) >> 6) + 1, nil + } + return 0, nil +} + +func (ts *LevelDBTripleStore) GetTripleIterator(dir string, val graph.TSVal) graph.Iterator { + switch dir { + case "s": + return NewLevelDBIterator("sp", "s", val, ts) + case "p": + return NewLevelDBIterator("po", "p", val, ts) + case "o": + return NewLevelDBIterator("os", "o", val, ts) + case "c": + return NewLevelDBIterator("cp", "c", val, ts) + } + panic("Notreached " + dir) +} + +func (ts *LevelDBTripleStore) GetNodesAllIterator() graph.Iterator { + return NewLevelDBAllIterator("z", "v", ts) +} + +func (ts *LevelDBTripleStore) GetTriplesAllIterator() graph.Iterator { + return NewLevelDBAllIterator("po", "p", ts) +} + +func (ts *LevelDBTripleStore) GetTripleDirection(val graph.TSVal, direction string) graph.TSVal { + v := val.([]uint8) + offset := GetPositionFromPrefix(v[0:2], direction, ts) + if offset != -1 { + return append([]byte("z"), v[offset:offset+ts.hasher.Size()]...) + } else { + return ts.GetTriple(val).Get(direction) + } +} + +func compareBytes(a, b graph.TSVal) bool { + return bytes.Equal(a.([]uint8), b.([]uint8)) +} + +func (ts *LevelDBTripleStore) MakeFixed() *graph.FixedIterator { + return graph.NewFixedIteratorWithCompare(compareBytes) +} diff --git a/graph/leveldb/leveldb_triplestore_iterator_optimize.go b/graph/leveldb/leveldb_triplestore_iterator_optimize.go new file mode 100644 index 0000000..243afba --- /dev/null +++ b/graph/leveldb/leveldb_triplestore_iterator_optimize.go @@ -0,0 +1,53 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "github.com/google/cayley/graph" +) + +func (ts *LevelDBTripleStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { + switch it.Type() { + case "linksto": + return ts.optimizeLinksTo(it.(*graph.LinksToIterator)) + + } + return it, false +} + +func (ts *LevelDBTripleStore) optimizeLinksTo(it *graph.LinksToIterator) (graph.Iterator, bool) { + l := it.GetSubIterators() + if l.Len() != 1 { + return it, false + } + primaryIt := l.Front().Value.(graph.Iterator) + if primaryIt.Type() == "fixed" { + size, _ := primaryIt.Size() + if size == 1 { + val, ok := primaryIt.Next() + if !ok { + panic("Sizes lie") + } + newIt := ts.GetTripleIterator(it.Direction(), val) + newIt.CopyTagsFrom(it) + for _, tag := range primaryIt.Tags() { + newIt.AddFixedTag(tag, val) + } + it.Close() + return newIt, true + } + } + return it, false +}