diff --git a/.travis.yml b/.travis.yml index b350943..efb83a3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,8 @@ language: go go: - - 1.2 - 1.3 + - 1.3.1 - tip install: diff --git a/graph/iterator/hasa_iterator.go b/graph/iterator/hasa_iterator.go index 99dec4f..b88f58b 100644 --- a/graph/iterator/hasa_iterator.go +++ b/graph/iterator/hasa_iterator.go @@ -202,8 +202,7 @@ func (it *HasA) Next() bool { return graph.NextLogOut(it, 0, false) } tID := it.primaryIt.Result() - name := it.ts.Quad(tID).Get(it.dir) - val := it.ts.ValueOf(name) + val := it.ts.TripleDirection(tID, it.dir) it.result = val return graph.NextLogOut(it, val, true) } diff --git a/graph/iterator/mock_ts_test.go b/graph/iterator/mock_ts_test.go index 4202ffa..4b186ac 100644 --- a/graph/iterator/mock_ts_test.go +++ b/graph/iterator/mock_ts_test.go @@ -36,7 +36,7 @@ func (qs *store) ValueOf(s string) graph.Value { return nil } -func (qs *store) ApplyDeltas([]*graph.Delta) error { return nil } +func (qs *store) ApplyDeltas([]graph.Delta) error { return nil } func (qs *store) Quad(graph.Value) quad.Quad { return quad.Quad{} } diff --git a/graph/leveldb/iterator.go b/graph/leveldb/iterator.go index d3c5ea5..dbc70be 100644 --- a/graph/leveldb/iterator.go +++ b/graph/leveldb/iterator.go @@ -43,9 +43,9 @@ type Iterator struct { result graph.Value } -func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) graph.Iterator { +func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleStore) *Iterator { vb := value.(Token) - p := make([]byte, 0, 2+qs.hasher.Size()) + p := make([]byte, 0, 2+qs.hasherSize) p = append(p, []byte(prefix)...) p = append(p, []byte(vb[1:])...) @@ -70,7 +70,6 @@ func NewIterator(prefix string, d quad.Direction, value graph.Value, qs *TripleS it.open = false it.iter.Release() glog.Error("Opening LevelDB iterator couldn't seek to location ", it.nextPrefix) - return &iterator.Null{} } return &it @@ -108,7 +107,7 @@ func (it *Iterator) TagResults(dst map[string]graph.Value) { func (it *Iterator) Clone() graph.Iterator { out := NewIterator(it.originalPrefix, it.dir, Token(it.checkId), it.qs) - out.Tagger().CopyFrom(it) + out.tags.CopyFrom(it) return out } @@ -180,45 +179,45 @@ func PositionOf(prefix []byte, d quad.Direction, qs *TripleStore) int { case quad.Subject: return 2 case quad.Predicate: - return qs.hasher.Size() + 2 + return qs.hasherSize + 2 case quad.Object: - return 2*qs.hasher.Size() + 2 + return 2*qs.hasherSize + 2 case quad.Label: - return 3*qs.hasher.Size() + 2 + return 3*qs.hasherSize + 2 } } if bytes.Equal(prefix, []byte("po")) { switch d { case quad.Subject: - return 2*qs.hasher.Size() + 2 + return 2*qs.hasherSize + 2 case quad.Predicate: return 2 case quad.Object: - return qs.hasher.Size() + 2 + return qs.hasherSize + 2 case quad.Label: - return 3*qs.hasher.Size() + 2 + return 3*qs.hasherSize + 2 } } if bytes.Equal(prefix, []byte("os")) { switch d { case quad.Subject: - return qs.hasher.Size() + 2 + return qs.hasherSize + 2 case quad.Predicate: - return 2*qs.hasher.Size() + 2 + return 2*qs.hasherSize + 2 case quad.Object: return 2 case quad.Label: - return 3*qs.hasher.Size() + 2 + return 3*qs.hasherSize + 2 } } if bytes.Equal(prefix, []byte("cp")) { switch d { case quad.Subject: - return 2*qs.hasher.Size() + 2 + return 2*qs.hasherSize + 2 case quad.Predicate: - return qs.hasher.Size() + 2 + return qs.hasherSize + 2 case quad.Object: - return 3*qs.hasher.Size() + 2 + return 3*qs.hasherSize + 2 case quad.Label: return 2 } diff --git a/graph/leveldb/triplestore.go b/graph/leveldb/triplestore.go index 9648d1c..8d36a33 100644 --- a/graph/leveldb/triplestore.go +++ b/graph/leveldb/triplestore.go @@ -50,15 +50,17 @@ func (t Token) Key() interface{} { } type TripleStore struct { - dbOpts *opt.Options - db *leveldb.DB - path string - open bool - size int64 - horizon int64 - hasher hash.Hash - writeopts *opt.WriteOptions - readopts *opt.ReadOptions + dbOpts *opt.Options + db *leveldb.DB + path string + open bool + size int64 + horizon int64 + hasher hash.Hash + hasherSize int + makeHasher func() hash.Hash + writeopts *opt.WriteOptions + readopts *opt.ReadOptions } func createNewLevelDB(path string, _ graph.Options) error { @@ -96,7 +98,8 @@ func newTripleStore(path string, options graph.Options) (graph.TripleStore, erro write_buffer_mb = val } qs.dbOpts.WriteBuffer = write_buffer_mb * opt.MiB - qs.hasher = sha1.New() + qs.hasherSize = sha1.Size + qs.makeHasher = sha1.New qs.writeopts = &opt.WriteOptions{ Sync: false, } @@ -133,7 +136,7 @@ func (qs *TripleStore) Horizon() int64 { return qs.horizon } -func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte { +func (qa *TripleStore) createDeltaKeyFor(d graph.Delta) []byte { key := make([]byte, 0, 19) key = append(key, 'd') key = append(key, []byte(fmt.Sprintf("%018x", d.ID))...) @@ -141,20 +144,22 @@ func (qa *TripleStore) createDeltaKeyFor(d *graph.Delta) []byte { } func (qs *TripleStore) createKeyFor(d [4]quad.Direction, triple quad.Quad) []byte { - key := make([]byte, 0, 2+(qs.hasher.Size()*4)) + hasher := qs.makeHasher() + key := make([]byte, 0, 2+(qs.hasherSize*3)) // TODO(kortschak) Remove dependence on String() method. key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[0]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[1]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[2]))...) - key = append(key, qs.convertStringToByteHash(triple.Get(d[3]))...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[0]), hasher)...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[1]), hasher)...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[2]), hasher)...) + key = append(key, qs.convertStringToByteHash(triple.Get(d[3]), hasher)...) return key } func (qs *TripleStore) createValueKeyFor(s string) []byte { - key := make([]byte, 0, 1+qs.hasher.Size()) + hasher := qs.makeHasher() + key := make([]byte, 0, 1+qs.hasherSize) key = append(key, []byte("z")...) - key = append(key, qs.convertStringToByteHash(s)...) + key = append(key, qs.convertStringToByteHash(s, hasher)...) return key } @@ -171,7 +176,7 @@ var ( cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object} ) -func (qs *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { +func (qs *TripleStore) ApplyDeltas(deltas []graph.Delta) error { batch := &leveldb.Batch{} resizeMap := make(map[string]int64) size_change := int64(0) @@ -341,11 +346,11 @@ func (qs *TripleStore) Quad(k graph.Value) quad.Quad { return triple } -func (qs *TripleStore) convertStringToByteHash(s string) []byte { - qs.hasher.Reset() - key := make([]byte, 0, qs.hasher.Size()) - qs.hasher.Write([]byte(s)) - key = qs.hasher.Sum(key) +func (qs *TripleStore) convertStringToByteHash(s string, hasher hash.Hash) []byte { + hasher.Reset() + key := make([]byte, 0, qs.hasherSize) + hasher.Write([]byte(s)) + key = hasher.Sum(key) return key } @@ -462,7 +467,7 @@ func (qs *TripleStore) TripleDirection(val graph.Value, d quad.Direction) graph. v := val.(Token) offset := PositionOf(v[0:2], d, qs) if offset != -1 { - return Token(append([]byte("z"), v[offset:offset+qs.hasher.Size()]...)) + return Token(append([]byte("z"), v[offset:offset+qs.hasherSize]...)) } else { return Token(qs.Quad(val).Get(d)) } diff --git a/graph/memstore/all_iterator.go b/graph/memstore/all_iterator.go index eb01f78..6717a13 100644 --- a/graph/memstore/all_iterator.go +++ b/graph/memstore/all_iterator.go @@ -35,17 +35,17 @@ func NewMemstoreNodesAllIterator(ts *TripleStore) *NodesAllIterator { } // No subiterators. -func (nit *NodesAllIterator) SubIterators() []graph.Iterator { +func (it *NodesAllIterator) SubIterators() []graph.Iterator { return nil } -func (nit *NodesAllIterator) Next() bool { - if !nit.Int64.Next() { +func (it *NodesAllIterator) Next() bool { + if !it.Int64.Next() { return false } - _, ok := nit.ts.revIdMap[nit.Int64.Result().(int64)] + _, ok := it.ts.revIdMap[it.Int64.Result().(int64)] if !ok { - return nit.Next() + return it.Next() } return true } diff --git a/graph/memstore/triplestore.go b/graph/memstore/triplestore.go index afe1b5f..da03810 100644 --- a/graph/memstore/triplestore.go +++ b/graph/memstore/triplestore.go @@ -94,7 +94,7 @@ func newTripleStore() *TripleStore { } } -func (ts *TripleStore) ApplyDeltas(deltas []*graph.Delta) error { +func (ts *TripleStore) ApplyDeltas(deltas []graph.Delta) error { for _, d := range deltas { var err error if d.Action == graph.Add { @@ -144,12 +144,12 @@ func (ts *TripleStore) indexOf(t quad.Quad) (int64, bool) { return 0, false } -func (ts *TripleStore) AddDelta(d *graph.Delta) error { +func (ts *TripleStore) AddDelta(d graph.Delta) error { if _, exists := ts.indexOf(d.Quad); exists { return graph.ErrQuadExists } qid := ts.quadIdCounter - ts.log = append(ts.log, LogEntry{Delta: *d}) + ts.log = append(ts.log, LogEntry{Delta: d}) ts.size++ ts.quadIdCounter++ @@ -178,14 +178,14 @@ func (ts *TripleStore) AddDelta(d *graph.Delta) error { return nil } -func (ts *TripleStore) RemoveDelta(d *graph.Delta) error { +func (ts *TripleStore) RemoveDelta(d graph.Delta) error { prevQuadID, exists := ts.indexOf(d.Quad) if !exists { return graph.ErrQuadNotExist } quadID := ts.quadIdCounter - ts.log = append(ts.log, LogEntry{Delta: *d}) + ts.log = append(ts.log, LogEntry{Delta: d}) ts.log[prevQuadID].DeletedBy = quadID ts.size-- ts.quadIdCounter++ diff --git a/graph/mongo/iterator.go b/graph/mongo/iterator.go index 1495eca..3ba695d 100644 --- a/graph/mongo/iterator.go +++ b/graph/mongo/iterator.go @@ -45,17 +45,7 @@ type Iterator struct { func NewIterator(qs *TripleStore, collection string, d quad.Direction, val graph.Value) *Iterator { name := qs.NameOf(val) - var constraint bson.M - switch d { - case quad.Subject: - constraint = bson.M{"Subject": name} - case quad.Predicate: - constraint = bson.M{"Predicate": name} - case quad.Object: - constraint = bson.M{"Object": name} - case quad.Label: - constraint = bson.M{"Label": name} - } + constraint := bson.M{d.String(): name} size, err := qs.db.C(collection).Find(constraint).Count() if err != nil { @@ -187,13 +177,13 @@ func (it *Iterator) Contains(v graph.Value) bool { case quad.Subject: offset = 0 case quad.Predicate: - offset = (it.qs.hasher.Size() * 2) + offset = (it.qs.hasherSize * 2) case quad.Object: - offset = (it.qs.hasher.Size() * 2) * 2 + offset = (it.qs.hasherSize * 2) * 2 case quad.Label: - offset = (it.qs.hasher.Size() * 2) * 3 + offset = (it.qs.hasherSize * 2) * 3 } - val := v.(string)[offset : it.qs.hasher.Size()*2+offset] + val := v.(string)[offset : it.qs.hasherSize*2+offset] if val == it.hash { it.result = v return graph.ContainsLogOut(it, v, true) diff --git a/graph/mongo/triplestore.go b/graph/mongo/triplestore.go index 7165900..294ba91 100644 --- a/graph/mongo/triplestore.go +++ b/graph/mongo/triplestore.go @@ -35,10 +35,11 @@ func init() { const DefaultDBName = "cayley" type TripleStore struct { - session *mgo.Session - db *mgo.Database - hasher hash.Hash - idCache *IDLru + session *mgo.Session + db *mgo.Database + hasherSize int + makeHasher func() hash.Hash + idCache *IDLru } func createNewMongoGraph(addr string, options graph.Options) error { @@ -53,18 +54,18 @@ func createNewMongoGraph(addr string, options graph.Options) error { } db := conn.DB(dbName) indexOpts := mgo.Index{ - Key: []string{"Sub"}, + Key: []string{"subject"}, Unique: false, DropDups: false, Background: true, Sparse: true, } db.C("quads").EnsureIndex(indexOpts) - indexOpts.Key = []string{"Pred"} + indexOpts.Key = []string{"predicate"} db.C("quads").EnsureIndex(indexOpts) - indexOpts.Key = []string{"Obj"} + indexOpts.Key = []string{"object"} db.C("quads").EnsureIndex(indexOpts) - indexOpts.Key = []string{"Label"} + indexOpts.Key = []string{"label"} db.C("quads").EnsureIndex(indexOpts) logOpts := mgo.Index{ Key: []string{"LogID"}, @@ -90,24 +91,26 @@ func newTripleStore(addr string, options graph.Options) (graph.TripleStore, erro } qs.db = conn.DB(dbName) qs.session = conn - qs.hasher = sha1.New() + qs.hasherSize = sha1.Size + qs.makeHasher = sha1.New qs.idCache = NewIDLru(1 << 16) return &qs, nil } -func (qs *TripleStore) getIdForTriple(t quad.Quad) string { - id := qs.ConvertStringToByteHash(t.Subject) - id += qs.ConvertStringToByteHash(t.Predicate) - id += qs.ConvertStringToByteHash(t.Object) - id += qs.ConvertStringToByteHash(t.Label) +func (qs *TripleStore) getIdForQuad(t quad.Quad) string { + hasher := qs.makeHasher() + id := qs.convertStringToByteHash(t.Subject, hasher) + id += qs.convertStringToByteHash(t.Predicate, hasher) + id += qs.convertStringToByteHash(t.Object, hasher) + id += qs.convertStringToByteHash(t.Label, hasher) return id } -func (qs *TripleStore) ConvertStringToByteHash(s string) string { - qs.hasher.Reset() - key := make([]byte, 0, qs.hasher.Size()) - qs.hasher.Write([]byte(s)) - key = qs.hasher.Sum(key) +func (qs *TripleStore) convertStringToByteHash(s string, hasher hash.Hash) string { + hasher.Reset() + key := make([]byte, 0, qs.hasherSize) + hasher.Write([]byte(s)) + key = hasher.Sum(key) return hex.EncodeToString(key) } @@ -144,26 +147,20 @@ func (qs *TripleStore) updateNodeBy(node_name string, inc int) error { return err } -func (qs *TripleStore) updateTriple(t quad.Quad, id int64, proc graph.Procedure) error { +func (qs *TripleStore) updateQuad(q quad.Quad, id int64, proc graph.Procedure) error { var setname string if proc == graph.Add { setname = "Added" } else if proc == graph.Delete { setname = "Deleted" } - tripledoc := bson.M{ - "Subject": t.Subject, - "Predicate": t.Predicate, - "Object": t.Object, - "Label": t.Label, - } upsert := bson.M{ - "$setOnInsert": tripledoc, + "$setOnInsert": q, "$push": bson.M{ setname: id, }, } - _, err := qs.db.C("quads").UpsertId(qs.getIdForTriple(t), upsert) + _, err := qs.db.C("quads").UpsertId(qs.getIdForQuad(q), upsert) if err != nil { glog.Errorf("Error: %v", err) } @@ -189,7 +186,7 @@ func (qs *TripleStore) checkValid(key string) bool { return true } -func (qs *TripleStore) updateLog(d *graph.Delta) error { +func (qs *TripleStore) updateLog(d graph.Delta) error { var action string if d.Action == graph.Add { action = "Add" @@ -199,7 +196,7 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error { entry := MongoLogEntry{ LogID: d.ID, Action: action, - Key: qs.getIdForTriple(d.Quad), + Key: qs.getIdForQuad(d.Quad), Timestamp: d.Timestamp.UnixNano(), } err := qs.db.C("log").Insert(entry) @@ -209,12 +206,12 @@ func (qs *TripleStore) updateLog(d *graph.Delta) error { return err } -func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { +func (qs *TripleStore) ApplyDeltas(in []graph.Delta) error { qs.session.SetSafe(nil) ids := make(map[string]int) // Pre-check the existence condition. for _, d := range in { - key := qs.getIdForTriple(d.Quad) + key := qs.getIdForQuad(d.Quad) switch d.Action { case graph.Add: if qs.checkValid(key) { @@ -236,7 +233,7 @@ func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { } } for _, d := range in { - err := qs.updateTriple(d.Quad, d.ID, d.Action) + err := qs.updateQuad(d.Quad, d.ID, d.Action) if err != nil { return err } @@ -264,17 +261,12 @@ func (qs *TripleStore) ApplyDeltas(in []*graph.Delta) error { } func (qs *TripleStore) Quad(val graph.Value) quad.Quad { - var bsonDoc bson.M - err := qs.db.C("quads").FindId(val.(string)).One(&bsonDoc) + var q quad.Quad + err := qs.db.C("quads").FindId(val.(string)).One(&q) if err != nil { glog.Errorf("Error: Couldn't retrieve quad %s %v", val, err) } - return quad.Quad{ - bsonDoc["Subject"].(string), - bsonDoc["Predicate"].(string), - bsonDoc["Object"].(string), - bsonDoc["Label"].(string), - } + return q } func (qs *TripleStore) TripleIterator(d quad.Direction, val graph.Value) graph.Iterator { @@ -290,7 +282,8 @@ func (qs *TripleStore) TriplesAllIterator() graph.Iterator { } func (qs *TripleStore) ValueOf(s string) graph.Value { - return qs.ConvertStringToByteHash(s) + h := qs.makeHasher() + return qs.convertStringToByteHash(s, h) } func (qs *TripleStore) NameOf(v graph.Value) string { @@ -348,13 +341,13 @@ func (qs *TripleStore) TripleDirection(in graph.Value, d quad.Direction) graph.V case quad.Subject: offset = 0 case quad.Predicate: - offset = (qs.hasher.Size() * 2) + offset = (qs.hasherSize * 2) case quad.Object: - offset = (qs.hasher.Size() * 2) * 2 + offset = (qs.hasherSize * 2) * 2 case quad.Label: - offset = (qs.hasher.Size() * 2) * 3 + offset = (qs.hasherSize * 2) * 3 } - val := in.(string)[offset : qs.hasher.Size()*2+offset] + val := in.(string)[offset : qs.hasherSize*2+offset] return val } diff --git a/graph/quadwriter.go b/graph/quadwriter.go index dddc19a..38ae137 100644 --- a/graph/quadwriter.go +++ b/graph/quadwriter.go @@ -53,8 +53,10 @@ func (h *Handle) Close() { h.QuadWriter.Close() } -var ErrQuadExists = errors.New("Quad exists") -var ErrQuadNotExist = errors.New("Quad doesn't exist") +var ( + ErrQuadExists = errors.New("Quad exists") + ErrQuadNotExist = errors.New("Quad doesn't exist") +) type QuadWriter interface { // Add a quad to the store. diff --git a/graph/triplestore.go b/graph/triplestore.go index 3f884ff..df85124 100644 --- a/graph/triplestore.go +++ b/graph/triplestore.go @@ -44,7 +44,7 @@ type Value interface{} type TripleStore interface { // The only way in is through building a transaction, which // is done by a replication strategy. - ApplyDeltas([]*Delta) error + ApplyDeltas([]Delta) error // Given an opaque token, returns the triple for that token from the store. Quad(Value) quad.Quad diff --git a/writer/single.go b/writer/single.go index 4a3a787..d8c3382 100644 --- a/writer/single.go +++ b/writer/single.go @@ -50,8 +50,8 @@ func (s *Single) AcquireNextID() int64 { } func (s *Single) AddQuad(q quad.Quad) error { - deltas := make([]*graph.Delta, 1) - deltas[0] = &graph.Delta{ + deltas := make([]graph.Delta, 1) + deltas[0] = graph.Delta{ ID: s.AcquireNextID(), Quad: q, Action: graph.Add, @@ -61,9 +61,9 @@ func (s *Single) AddQuad(q quad.Quad) error { } func (s *Single) AddQuadSet(set []quad.Quad) error { - deltas := make([]*graph.Delta, len(set)) + deltas := make([]graph.Delta, len(set)) for i, q := range set { - deltas[i] = &graph.Delta{ + deltas[i] = graph.Delta{ ID: s.AcquireNextID(), Quad: q, Action: graph.Add, @@ -75,8 +75,8 @@ func (s *Single) AddQuadSet(set []quad.Quad) error { } func (s *Single) RemoveQuad(q quad.Quad) error { - deltas := make([]*graph.Delta, 1) - deltas[0] = &graph.Delta{ + deltas := make([]graph.Delta, 1) + deltas[0] = graph.Delta{ ID: s.AcquireNextID(), Quad: q, Action: graph.Delete,