convert leveldb to log-structure

This commit is contained in:
Barak Michener 2014-08-05 14:17:38 -04:00
parent dcb495d145
commit d4e5eead32
4 changed files with 177 additions and 140 deletions

View file

@ -16,9 +16,11 @@ package leveldb
import (
"bytes"
"encoding/json"
"fmt"
"strings"
"github.com/barakmich/glog"
ldbit "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@ -41,7 +43,7 @@ type Iterator struct {
result graph.Value
}
func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) *Iterator {
func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) graph.Iterator {
vb := value.([]byte)
p := make([]byte, 0, 2+qs.hasher.Size())
p = append(p, []byte(prefix)...)
@ -65,10 +67,10 @@ func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleS
ok := it.iter.Seek(it.nextPrefix)
if !ok {
// FIXME(kortschak) What are the semantics here? Is this iterator usable?
// If not, we should return nil *Iterator and an error.
it.open = false
it.iter.Release()
glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix)
return &iterator.Null{}
}
return &it
@ -106,7 +108,7 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) {
func (it *Iterator) Clone() graph.Iterator {
out := NewIterator(it.originalPrefix, it.dir, it.checkId, it.qs)
out.tags.CopyFrom(it)
out.Tagger().CopyFrom(it)
return out
}
@ -117,6 +119,12 @@ func (it *Iterator) Close() {
}
}
func (it *Iterator) isLiveValue(val []byte) bool {
var entry IndexEntry
json.Unmarshal(val, &entry)
return len(entry.History)%2 != 0
}
func (it *Iterator) Next() (graph.Value, bool) {
if it.iter == nil {
it.result = nil
@ -132,6 +140,9 @@ func (it *Iterator) Next() (graph.Value, bool) {
return nil, false
}
if bytes.HasPrefix(it.iter.Key(), it.nextPrefix) {
if !it.isLiveValue(it.iter.Value()) {
return it.Next()
}
out := make([]byte, len(it.iter.Key()))
copy(out, it.iter.Key())
it.result = out
@ -173,7 +184,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int {
case quad.Object:
return 2*qs.hasher.Size() + 2
case quad.Label:
return -1
return 3*qs.hasher.Size() + 2
}
}
if bytes.Equal(prefix, []byte("po")) {
@ -185,7 +196,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int {
case quad.Object:
return qs.hasher.Size() + 2
case quad.Label:
return -1
return 3*qs.hasher.Size() + 2
}
}
if bytes.Equal(prefix, []byte("os")) {
@ -197,7 +208,7 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int {
case quad.Object:
return 2
case quad.Label:
return -1
return 3*qs.hasher.Size() + 2
}
}
if bytes.Equal(prefix, []byte("cp")) {
@ -221,16 +232,13 @@ func (it *Iterator) Contains(v graph.Value) bool {
return false
}
offset := PositionOf(val[0:2], it.dir, it.qs)
if offset != -1 {
if bytes.HasPrefix(val[offset:], it.checkId[1:]) {
return true
}
} else {
nameForDir := it.qs.Quad(v).Get(it.dir)
hashForDir := it.qs.ValueOf(nameForDir).([]byte)
if bytes.Equal(hashForDir, it.checkId) {
return true
if bytes.HasPrefix(val[offset:], it.checkId[1:]) {
data, err := it.qs.db.Get(val, it.ro)
if err != nil {
glog.Error("Couldn't get data for key ", val, " in iterator ", it.UID(), " failing Contains.")
return false
}
return it.isLiveValue(data)
}
return false
}