comments and concretized deltas
This commit is contained in:
parent
fe0569c9d4
commit
f967b36f84
9 changed files with 31 additions and 35 deletions
|
|
@ -36,7 +36,7 @@ func (qs *store) ValueOf(s string) graph.Value {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *store) ApplyDeltas([]*graph.Delta) error { return nil }
|
func (qs *store) ApplyDeltas([]graph.Delta) error { return nil }
|
||||||
|
|
||||||
func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} }
|
func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} }
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ type Iterator struct {
|
||||||
result graph.Value
|
result graph.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) graph.Iterator {
|
func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) *Iterator {
|
||||||
vb := value.(Token)
|
vb := value.(Token)
|
||||||
p := make([]byte, 0, 2+qs.hasherSize)
|
p := make([]byte, 0, 2+qs.hasherSize)
|
||||||
p = append(p, []byte(prefix)...)
|
p = append(p, []byte(prefix)...)
|
||||||
|
|
@ -70,7 +70,6 @@ func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleS
|
||||||
it.open = false
|
it.open = false
|
||||||
it.iter.Release()
|
it.iter.Release()
|
||||||
glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix)
|
glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix)
|
||||||
return &iterator.Null{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &it
|
return &it
|
||||||
|
|
@ -108,7 +107,7 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) {
|
||||||
|
|
||||||
func (it *Iterator) Clone() graph.Iterator {
|
func (it *Iterator) Clone() graph.Iterator {
|
||||||
out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs)
|
out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs)
|
||||||
out.Tagger().CopyFrom(it)
|
out.tags.CopyFrom(it)
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -136,7 +136,7 @@ func (qs *TripleStore) Horizon() int64 {
|
||||||
return qs.horizon
|
return qs.horizon
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte {
|
func (qa *TripleStore) createDeltaKeyFor(d graph.Delta) []byte {
|
||||||
key := make([]byte, 0, 19)
|
key := make([]byte, 0, 19)
|
||||||
key = append(key, 'd')
|
key = append(key, 'd')
|
||||||
key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...)
|
key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...)
|
||||||
|
|
@ -176,7 +176,7 @@ var (
|
||||||
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
|
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (qs *TripleStore) ApplyDeltas(deltas []*graph.Delta) error {
|
func (qs *TripleStore) ApplyDeltas(deltas []graph.Delta) error {
|
||||||
batch := &leveldb.Batch{}
|
batch := &leveldb.Batch{}
|
||||||
resizeMap := make(map[string]int64)
|
resizeMap := make(map[string]int64)
|
||||||
size_change := int64(0)
|
size_change := int64(0)
|
||||||
|
|
|
||||||
|
|
@ -35,17 +35,17 @@ func NewMemstoreNodesAllIterator(ts *TripleStore) *NodesAllIterator {
|
||||||
}
|
}
|
||||||
|
|
||||||
// No subiterators.
|
// No subiterators.
|
||||||
func (nit *NodesAllIterator) SubIterators() []graph.Iterator {
|
func (it *NodesAllIterator) SubIterators() []graph.Iterator {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nit *NodesAllIterator) Next() bool {
|
func (it *NodesAllIterator) Next() bool {
|
||||||
if !nit.Int64.Next() {
|
if !it.Int64.Next() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
_, ok := nit.ts.revIdMap[nit.Int64.Result().(int64)]
|
_, ok := it.ts.revIdMap[it.Int64.Result().(int64)]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nit.Next()
|
return it.Next()
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -105,7 +105,7 @@ func newTripleStore() *TripleStore {
|
||||||
return &ts
|
return &ts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error {
|
func (ts *TripleStore) ApplyDeltas(deltas []graph.Delta) error {
|
||||||
for _, d := range deltas {
|
for _, d := range deltas {
|
||||||
var err error
|
var err error
|
||||||
if d.Action == graph.Add {
|
if d.Action == graph.Add {
|
||||||
|
|
@ -154,13 +154,13 @@ func (ts *TripleStore) quadExists(t quad.Quad) (bool, int64) {
|
||||||
return false, 0
|
return false, 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TripleStore) AddDelta(d *graph.Delta) error {
|
func (ts *TripleStore) AddDelta(d graph.Delta) error {
|
||||||
if exists, _ := ts.quadExists(d.Quad); exists {
|
if exists, _ := ts.quadExists(d.Quad); exists {
|
||||||
return graph.ErrQuadExists
|
return graph.ErrQuadExists
|
||||||
}
|
}
|
||||||
var quadID int64
|
var quadID int64
|
||||||
quadID = ts.quadIdCounter
|
quadID = ts.quadIdCounter
|
||||||
ts.log = append(ts.log, LogEntry{Delta: *d})
|
ts.log = append(ts.log, LogEntry{Delta: d})
|
||||||
ts.size++
|
ts.size++
|
||||||
ts.quadIdCounter++
|
ts.quadIdCounter++
|
||||||
|
|
||||||
|
|
@ -189,7 +189,7 @@ func (ts *TripleStore) AddDelta(d *graph.Delta) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TripleStore) RemoveDelta(d *graph.Delta) error {
|
func (ts *TripleStore) RemoveDelta(d graph.Delta) error {
|
||||||
var prevQuadID int64
|
var prevQuadID int64
|
||||||
var exists bool
|
var exists bool
|
||||||
prevQuadID = 0
|
prevQuadID = 0
|
||||||
|
|
@ -199,7 +199,7 @@ func (ts *TripleStore) RemoveDelta(d *graph.Delta) error {
|
||||||
|
|
||||||
var quadID int64
|
var quadID int64
|
||||||
quadID = ts.quadIdCounter
|
quadID = ts.quadIdCounter
|
||||||
ts.log = append(ts.log, LogEntry{Delta: *d})
|
ts.log = append(ts.log, LogEntry{Delta: d})
|
||||||
ts.log[prevQuadID].DeletedBy = quadID
|
ts.log[prevQuadID].DeletedBy = quadID
|
||||||
ts.size--
|
ts.size--
|
||||||
ts.quadIdCounter++
|
ts.quadIdCounter++
|
||||||
|
|
|
||||||
|
|
@ -192,7 +192,7 @@ func (qs *TripleStore) checkValid(key string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) updateLog(d *graph.Delta) error {
|
func (qs *TripleStore) updateLog(d graph.Delta) error {
|
||||||
var action string
|
var action string
|
||||||
if d.Action == graph.Add {
|
if d.Action == graph.Add {
|
||||||
action = "Add"
|
action = "Add"
|
||||||
|
|
@ -212,7 +212,7 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error {
|
func (qs *TripleStore) ApplyDeltas(in []graph.Delta) error {
|
||||||
qs.session.SetSafe(nil)
|
qs.session.SetSafe(nil)
|
||||||
ids := make(map[string]int)
|
ids := make(map[string]int)
|
||||||
// Pre-check the existence condition.
|
// Pre-check the existence condition.
|
||||||
|
|
@ -267,17 +267,12 @@ func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
|
func (qs *TripleStore) Quad(val graph.Value) quad.Quad {
|
||||||
var bsonDoc bson.M
|
var q quad.Quad
|
||||||
err := qs.db.C("quads").FindId(val.(string)).One(&bsonDoc)
|
err := qs.db.C("quads").FindId(val.(string)).One(&q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err)
|
glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err)
|
||||||
}
|
}
|
||||||
return quad.Quad{
|
return q
|
||||||
bsonDoc["Subject"].(string),
|
|
||||||
bsonDoc["Predicate"].(string),
|
|
||||||
bsonDoc["Object"].(string),
|
|
||||||
bsonDoc["Label"].(string),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator {
|
func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator {
|
||||||
|
|
|
||||||
|
|
@ -53,8 +53,10 @@ func (h *Handle) Close() {
|
||||||
h.QuadWriter.Close()
|
h.QuadWriter.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrQuadExists = errors.New("Quad exists")
|
var (
|
||||||
var ErrQuadNotExist = errors.New("Quad doesn't exist")
|
ErrQuadExists = errors.New("Quad exists")
|
||||||
|
ErrQuadNotExist = errors.New("Quad doesn't exist")
|
||||||
|
)
|
||||||
|
|
||||||
type QuadWriter interface {
|
type QuadWriter interface {
|
||||||
// Add a quad to the store.
|
// Add a quad to the store.
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ type Value interface{}
|
||||||
type TripleStore interface {
|
type TripleStore interface {
|
||||||
// The only way in is through building a transaction, which
|
// The only way in is through building a transaction, which
|
||||||
// is done by a replication strategy.
|
// is done by a replication strategy.
|
||||||
ApplyDeltas([]*Delta) error
|
ApplyDeltas([]Delta) error
|
||||||
|
|
||||||
// Given an opaque token, returns the triple for that token from the store.
|
// Given an opaque token, returns the triple for that token from the store.
|
||||||
Quad(Value) quad.Quad
|
Quad(Value) quad.Quad
|
||||||
|
|
|
||||||
|
|
@ -50,8 +50,8 @@ func (s *Single) AcquireNextID() int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Single) AddQuad(q quad.Quad) error {
|
func (s *Single) AddQuad(q quad.Quad) error {
|
||||||
deltas := make([]*graph.Delta, 1)
|
deltas := make([]graph.Delta, 1)
|
||||||
deltas[0] = &graph.Delta{
|
deltas[0] = graph.Delta{
|
||||||
ID: s.AcquireNextID(),
|
ID: s.AcquireNextID(),
|
||||||
Quad: q,
|
Quad: q,
|
||||||
Action: graph.Add,
|
Action: graph.Add,
|
||||||
|
|
@ -61,9 +61,9 @@ func (s *Single) AddQuad(q quad.Quad) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Single) AddQuadSet(set []quad.Quad) error {
|
func (s *Single) AddQuadSet(set []quad.Quad) error {
|
||||||
deltas := make([]*graph.Delta, len(set))
|
deltas := make([]graph.Delta, len(set))
|
||||||
for i, q := range set {
|
for i, q := range set {
|
||||||
deltas[i] = &graph.Delta{
|
deltas[i] = graph.Delta{
|
||||||
ID: s.AcquireNextID(),
|
ID: s.AcquireNextID(),
|
||||||
Quad: q,
|
Quad: q,
|
||||||
Action: graph.Add,
|
Action: graph.Add,
|
||||||
|
|
@ -75,8 +75,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Single) RemoveQuad(q quad.Quad) error {
|
func (s *Single) RemoveQuad(q quad.Quad) error {
|
||||||
deltas := make([]*graph.Delta, 1)
|
deltas := make([]graph.Delta, 1)
|
||||||
deltas[0] = &graph.Delta{
|
deltas[0] = graph.Delta{
|
||||||
ID: s.AcquireNextID(),
|
ID: s.AcquireNextID(),
|
||||||
Quad: q,
|
Quad: q,
|
||||||
Action: graph.Delete,
|
Action: graph.Delete,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue