Merge pull request #289 from barakmich/psql

graph: Postgres backend
This commit is contained in:
Barak Michener 2015-10-05 17:20:55 -04:00
commit f7cdec8294
21 changed files with 2251 additions and 15 deletions

13
Godeps/Godeps.json generated
View file

@ -6,10 +6,6 @@
],
"Deps": [
{
"ImportPath": "github.com/pborman/uuid",
"Rev": "ca53cad383cad2479bbba7f7a1a05797ec1386e4"
},
{
"ImportPath": "github.com/badgerodon/peg",
"Rev": "9e5f7f4d07ca576562618c23e8abadda278b684f"
},
@ -31,6 +27,15 @@
"Rev": "b59a38004596b696aca7aa2adccfa68760864d86"
},
{
"ImportPath": "github.com/lib/pq",
"Comment": "go1.0-cutoff-58-g0dad96c",
"Rev": "0dad96c0b94f8dee039aa40467f767467392a0af"
},
{
"ImportPath": "github.com/pborman/uuid",
"Rev": "ca53cad383cad2479bbba7f7a1a05797ec1386e4"
},
{
"ImportPath": "github.com/peterh/liner",
"Rev": "1bb0d1c1a25ed393d8feb09bab039b2b1b1fbced"
},

View file

@ -21,6 +21,7 @@ import (
"fmt"
"os"
"runtime"
"runtime/pprof"
"time"
"github.com/barakmich/glog"
@ -36,6 +37,7 @@ import (
_ "github.com/google/cayley/graph/leveldb"
_ "github.com/google/cayley/graph/memstore"
_ "github.com/google/cayley/graph/mongo"
_ "github.com/google/cayley/graph/sql"
// Load writer registry
_ "github.com/google/cayley/writer"
@ -147,6 +149,15 @@ func main() {
os.Args = append(os.Args[:1], os.Args[2:]...)
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
glog.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
var buildString string
if Version != "" {
buildString = fmt.Sprint("Cayley ", Version, " built ", BuildDate)

View file

@ -67,8 +67,10 @@ func (t *Tagger) Fixed() map[string]Value {
}
func (t *Tagger) CopyFrom(src Iterator) {
st := src.Tagger()
t.CopyFromTagger(src.Tagger())
}
func (t *Tagger) CopyFromTagger(st *Tagger) {
t.tags = append(t.tags, st.tags...)
if t.fixedTags == nil {
@ -331,16 +333,16 @@ func DumpStats(it Iterator) StatsContainer {
func ContainsLogIn(it Iterator, val Value) {
if glog.V(4) {
glog.V(4).Infof("%s %d CHECK CONTAINS %d", strings.ToUpper(it.Type().String()), it.UID(), val)
glog.V(4).Infof("%s %d CHECK CONTAINS %v", strings.ToUpper(it.Type().String()), it.UID(), val)
}
}
func ContainsLogOut(it Iterator, val Value, good bool) bool {
if glog.V(4) {
if good {
glog.V(4).Infof("%s %d CHECK CONTAINS %d GOOD", strings.ToUpper(it.Type().String()), it.UID(), val)
glog.V(4).Infof("%s %d CHECK CONTAINS %v GOOD", strings.ToUpper(it.Type().String()), it.UID(), val)
} else {
glog.V(4).Infof("%s %d CHECK CONTAINS %d BAD", strings.ToUpper(it.Type().String()), it.UID(), val)
glog.V(4).Infof("%s %d CHECK CONTAINS %v BAD", strings.ToUpper(it.Type().String()), it.UID(), val)
}
}
return good
@ -355,7 +357,7 @@ func NextLogIn(it Iterator) {
func NextLogOut(it Iterator, val Value, ok bool) bool {
if glog.V(4) {
if ok {
glog.V(4).Infof("%s %d NEXT IS %d", strings.ToUpper(it.Type().String()), it.UID(), val)
glog.V(4).Infof("%s %d NEXT IS %v", strings.ToUpper(it.Type().String()), it.UID(), val)
} else {
glog.V(4).Infof("%s %d NEXT DONE", strings.ToUpper(it.Type().String()), it.UID())
}

View file

@ -103,6 +103,7 @@ func (it *And) Optimize() (graph.Iterator, bool) {
newReplacement, hasOne := it.qs.OptimizeIterator(newAnd)
if hasOne {
newAnd.Close()
glog.V(3).Infoln(it.UID(), "became", newReplacement.UID(), "from quadstore")
return newReplacement, true
}
}

View file

@ -105,6 +105,14 @@ func (it *HasA) Optimize() (graph.Iterator, bool) {
return it.primaryIt, true
}
}
// Ask the graph.QuadStore if we can be replaced. Often times, this is a great
// optimization opportunity (there's a fixed iterator underneath us, for
// example).
newReplacement, hasOne := it.qs.OptimizeIterator(it)
if hasOne {
it.Close()
return newReplacement, true
}
return it, false
}

View file

@ -140,6 +140,7 @@ func (it *Not) Optimize() (graph.Iterator, bool) {
if optimized {
it.primaryIt = optimizedPrimaryIt
}
it.primaryIt = NewMaterialize(it.primaryIt)
return it, false
}

View file

@ -269,6 +269,9 @@ func (qs *QuadStore) ValueOf(name string) graph.Value {
}
func (qs *QuadStore) NameOf(id graph.Value) string {
if id == nil {
return ""
}
return qs.revIDMap[id.(int64)]
}

215
graph/sql/all_iterator.go Normal file
View file

@ -0,0 +1,215 @@
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
import (
"database/sql"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
type AllIterator struct {
uid uint64
tags graph.Tagger
qs *QuadStore
dir quad.Direction
val graph.Value
table string
cursor *sql.Rows
result graph.Value
err error
}
func (it *AllIterator) makeCursor() {
var cursor *sql.Rows
var err error
if it.cursor != nil {
it.cursor.Close()
}
if it.table == "quads" {
cursor, err = it.qs.db.Query(`SELECT subject, predicate, object, label FROM quads;`)
if err != nil {
glog.Errorln("Couldn't get cursor from SQL database: %v", err)
cursor = nil
}
} else {
glog.V(4).Infoln("sql: getting node query")
cursor, err = it.qs.db.Query(`SELECT node FROM
(
SELECT subject FROM quads
UNION
SELECT predicate FROM quads
UNION
SELECT object FROM quads
UNION
SELECT label FROM quads
) AS DistinctNodes (node) WHERE node IS NOT NULL;`)
if err != nil {
glog.Errorln("Couldn't get cursor from SQL database: %v", err)
cursor = nil
}
glog.V(4).Infoln("sql: got node query")
}
it.cursor = cursor
}
func NewAllIterator(qs *QuadStore, table string) *AllIterator {
it := &AllIterator{
uid: iterator.NextUID(),
qs: qs,
table: table,
}
return it
}
func (it *AllIterator) UID() uint64 {
return it.uid
}
func (it *AllIterator) Reset() {
it.err = nil
it.Close()
}
func (it *AllIterator) Err() error {
return it.err
}
func (it *AllIterator) Close() error {
if it.cursor != nil {
err := it.cursor.Close()
if err != nil {
return err
}
it.cursor = nil
}
return nil
}
func (it *AllIterator) Tagger() *graph.Tagger {
return &it.tags
}
func (it *AllIterator) TagResults(dst map[string]graph.Value) {
for _, tag := range it.tags.Tags() {
dst[tag] = it.Result()
}
for tag, value := range it.tags.Fixed() {
dst[tag] = value
}
}
func (it *AllIterator) Clone() graph.Iterator {
var m *AllIterator
m = NewAllIterator(it.qs, it.table)
m.tags.CopyFrom(it)
return m
}
func (it *AllIterator) SubIterators() []graph.Iterator {
return nil
}
func (it *AllIterator) Next() bool {
graph.NextLogIn(it)
if it.cursor == nil {
it.makeCursor()
if it.cursor == nil {
return false
}
}
if !it.cursor.Next() {
glog.V(4).Infoln("sql: No next")
err := it.cursor.Err()
if err != nil {
glog.Errorf("Cursor error in SQL: %v", err)
it.err = err
}
it.cursor.Close()
return false
}
if it.table == "nodes" {
var node string
err := it.cursor.Scan(&node)
if err != nil {
glog.Errorf("Error nexting node iterator: %v", err)
it.err = err
return false
}
it.result = node
return true
}
var q quad.Quad
err := it.cursor.Scan(&q.Subject, &q.Predicate, &q.Object, &q.Label)
if err != nil {
glog.Errorf("Error scanning sql iterator: %v", err)
it.err = err
return false
}
it.result = q
return graph.NextLogOut(it, it.result, true)
}
func (it *AllIterator) Contains(v graph.Value) bool {
graph.ContainsLogIn(it, v)
it.result = v
return graph.ContainsLogOut(it, v, true)
}
func (it *AllIterator) Size() (int64, bool) {
return it.qs.Size(), true
}
func (it *AllIterator) Result() graph.Value {
if it.result == nil {
glog.Fatalln("result was nil", it)
}
return it.result
}
func (it *AllIterator) NextPath() bool {
return false
}
func (it *AllIterator) Type() graph.Type {
return graph.All
}
func (it *AllIterator) Sorted() bool { return false }
func (it *AllIterator) Optimize() (graph.Iterator, bool) { return it, false }
func (it *AllIterator) Describe() graph.Description {
size, _ := it.Size()
return graph.Description{
UID: it.UID(),
Name: "sql/all",
Type: it.Type(),
Size: size,
}
}
func (it *AllIterator) Stats() graph.IteratorStats {
size, _ := it.Size()
return graph.IteratorStats{
ContainsCost: 1,
NextCost: 9999,
Size: size,
}
}

63
graph/sql/lru.go Normal file
View file

@ -0,0 +1,63 @@
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
import (
"container/list"
)
// cache implements an LRU cache.
type cache struct {
cache map[string]*list.Element
priority *list.List
maxSize int
}
type kv struct {
key string
value int64
}
func newCache(size int) *cache {
var lru cache
lru.maxSize = size
lru.priority = list.New()
lru.cache = make(map[string]*list.Element)
return &lru
}
func (lru *cache) Put(key string, value int64) {
if _, ok := lru.Get(key); ok {
return
}
if len(lru.cache) == lru.maxSize {
lru.removeOldest()
}
lru.priority.PushFront(kv{key: key, value: value})
lru.cache[key] = lru.priority.Front()
}
func (lru *cache) Get(key string) (int64, bool) {
if element, ok := lru.cache[key]; ok {
lru.priority.MoveToFront(element)
return element.Value.(kv).value, true
}
return 0, false
}
func (lru *cache) removeOldest() {
last := lru.priority.Remove(lru.priority.Back())
delete(lru.cache, last.(kv).key)
}

261
graph/sql/optimizers.go Normal file
View file

@ -0,0 +1,261 @@
// Copyright 2015 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
import (
"errors"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
func intersect(a sqlIterator, b sqlIterator, qs *QuadStore) (*SQLIterator, error) {
if anew, ok := a.(*SQLNodeIterator); ok {
if bnew, ok := b.(*SQLNodeIterator); ok {
return intersectNode(anew, bnew, qs)
}
if bnew, ok := b.(*SQLNodeIntersection); ok {
return appendNodeIntersection(bnew, anew, qs)
}
} else if anew, ok := a.(*SQLNodeIntersection); ok {
if bnew, ok := b.(*SQLNodeIterator); ok {
return appendNodeIntersection(anew, bnew, qs)
}
if bnew, ok := b.(*SQLNodeIntersection); ok {
return combineNodeIntersection(anew, bnew, qs)
}
} else if anew, ok := a.(*SQLLinkIterator); ok {
if bnew, ok := b.(*SQLLinkIterator); ok {
return intersectLink(anew, bnew, qs)
}
} else {
return nil, errors.New("Unknown iterator types")
}
return nil, errors.New("Cannot combine SQL iterators of two different types")
}
func intersectNode(a *SQLNodeIterator, b *SQLNodeIterator, qs *QuadStore) (*SQLIterator, error) {
m := &SQLNodeIntersection{
tableName: newTableName(),
nodeIts: []sqlIterator{a, b},
}
m.Tagger().CopyFromTagger(a.Tagger())
m.Tagger().CopyFromTagger(b.Tagger())
it := NewSQLIterator(qs, m)
return it, nil
}
func appendNodeIntersection(a *SQLNodeIntersection, b *SQLNodeIterator, qs *QuadStore) (*SQLIterator, error) {
m := &SQLNodeIntersection{
tableName: newTableName(),
nodeIts: append(a.nodeIts, b),
}
m.Tagger().CopyFromTagger(a.Tagger())
m.Tagger().CopyFromTagger(b.Tagger())
it := NewSQLIterator(qs, m)
return it, nil
}
func combineNodeIntersection(a *SQLNodeIntersection, b *SQLNodeIntersection, qs *QuadStore) (*SQLIterator, error) {
m := &SQLNodeIntersection{
tableName: newTableName(),
nodeIts: append(a.nodeIts, b.nodeIts...),
}
m.Tagger().CopyFromTagger(a.Tagger())
m.Tagger().CopyFromTagger(b.Tagger())
it := NewSQLIterator(qs, m)
return it, nil
}
func intersectLink(a *SQLLinkIterator, b *SQLLinkIterator, qs *QuadStore) (*SQLIterator, error) {
m := &SQLLinkIterator{
tableName: newTableName(),
nodeIts: append(a.nodeIts, b.nodeIts...),
constraints: append(a.constraints, b.constraints...),
tagdirs: append(a.tagdirs, b.tagdirs...),
}
m.Tagger().CopyFromTagger(a.Tagger())
m.Tagger().CopyFromTagger(b.Tagger())
it := NewSQLIterator(qs, m)
return it, nil
}
func hasa(aIn sqlIterator, d quad.Direction, qs *QuadStore) (*SQLIterator, error) {
a, ok := aIn.(*SQLLinkIterator)
if !ok {
return nil, errors.New("Can't take the HASA of a link SQL iterator")
}
out := &SQLNodeIterator{
tableName: newTableName(),
linkIt: sqlItDir{
it: a,
dir: d,
},
}
it := NewSQLIterator(qs, out)
return it, nil
}
func linksto(aIn sqlIterator, d quad.Direction, qs *QuadStore) (*SQLIterator, error) {
var a sqlIterator
a, ok := aIn.(*SQLNodeIterator)
if !ok {
a, ok = aIn.(*SQLNodeIntersection)
if !ok {
return nil, errors.New("Can't take the LINKSTO of a node SQL iterator")
}
}
out := &SQLLinkIterator{
tableName: newTableName(),
nodeIts: []sqlItDir{
sqlItDir{
it: a,
dir: d,
},
},
}
it := NewSQLIterator(qs, out)
return it, nil
}
func (qs *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) {
switch it.Type() {
case graph.LinksTo:
return qs.optimizeLinksTo(it.(*iterator.LinksTo))
case graph.HasA:
return qs.optimizeHasA(it.(*iterator.HasA))
case graph.And:
return qs.optimizeAnd(it.(*iterator.And))
}
return it, false
}
func (qs *QuadStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) {
subs := it.SubIterators()
if len(subs) != 1 {
return it, false
}
primary := subs[0]
switch primary.Type() {
case graph.Fixed:
size, _ := primary.Size()
if size == 1 {
if !graph.Next(primary) {
panic("unexpected size during optimize")
}
val := primary.Result()
newIt := qs.QuadIterator(it.Direction(), val)
nt := newIt.Tagger()
nt.CopyFrom(it)
for _, tag := range primary.Tagger().Tags() {
nt.AddFixed(tag, val)
}
it.Close()
return newIt, true
}
case sqlType:
p := primary.(*SQLIterator)
newit, err := linksto(p.sql, it.Direction(), qs)
if err != nil {
glog.Errorln(err)
return it, false
}
newit.Tagger().CopyFrom(it)
return newit, true
case graph.All:
linkit := &SQLLinkIterator{
tableName: newTableName(),
size: qs.Size(),
}
for _, t := range primary.Tagger().Tags() {
linkit.tagdirs = append(linkit.tagdirs, tagDir{
dir: it.Direction(),
tag: t,
})
}
for k, v := range primary.Tagger().Fixed() {
linkit.tagger.AddFixed(k, v)
}
linkit.tagger.CopyFrom(it)
newit := NewSQLIterator(qs, linkit)
return newit, true
}
return it, false
}
func (qs *QuadStore) optimizeAnd(it *iterator.And) (graph.Iterator, bool) {
subs := it.SubIterators()
var unusedIts []graph.Iterator
var newit *SQLIterator
newit = nil
changed := false
var err error
for _, it := range subs {
if it.Type() == sqlType {
if newit == nil {
newit = it.(*SQLIterator)
} else {
changed = true
newit, err = intersect(newit.sql, it.(*SQLIterator).sql, qs)
if err != nil {
glog.Error(err)
return it, false
}
}
} else {
unusedIts = append(unusedIts, it)
}
}
if !changed {
return it, false
}
if len(unusedIts) == 0 {
newit.Tagger().CopyFrom(it)
return newit, true
}
newAnd := iterator.NewAnd(qs)
newAnd.Tagger().CopyFrom(it)
newAnd.AddSubIterator(newit)
for _, i := range unusedIts {
newAnd.AddSubIterator(i)
}
return newAnd.Optimize()
}
func (qs *QuadStore) optimizeHasA(it *iterator.HasA) (graph.Iterator, bool) {
subs := it.SubIterators()
if len(subs) != 1 {
return it, false
}
primary := subs[0]
if primary.Type() == sqlType {
p := primary.(*SQLIterator)
newit, err := hasa(p.sql, it.Direction(), qs)
if err != nil {
glog.Errorln(err)
return it, false
}
newit.Tagger().CopyFrom(it)
return newit, true
}
return it, false
}

View file

@ -0,0 +1,128 @@
// Copyright 2015 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
import (
"testing"
"github.com/google/cayley/graph"
"github.com/google/cayley/quad"
)
func TestBuildIntersect(t *testing.T) {
a := NewSQLLinkIterator(nil, quad.Subject, "Foo")
b := NewSQLLinkIterator(nil, quad.Predicate, "is_equivalent_to")
it, err := intersect(a.sql, b.sql, nil)
if err != nil {
t.Error(err)
}
s, v := it.sql.buildSQL(true, nil)
t.Log(s, v)
}
func TestBuildHasa(t *testing.T) {
a := NewSQLLinkIterator(nil, quad.Subject, "Foo")
a.Tagger().Add("foo")
b := NewSQLLinkIterator(nil, quad.Predicate, "is_equivalent_to")
it1, err := intersect(a.sql, b.sql, nil)
if err != nil {
t.Error(err)
}
it2, err := hasa(it1.sql, quad.Object, nil)
if err != nil {
t.Error(err)
}
s, v := it2.sql.buildSQL(true, nil)
t.Log(s, v)
}
func TestBuildLinksTo(t *testing.T) {
a := NewSQLLinkIterator(nil, quad.Subject, "Foo")
b := NewSQLLinkIterator(nil, quad.Predicate, "is_equivalent_to")
it1, err := intersect(a.sql, b.sql, nil)
if err != nil {
t.Error(err)
}
it2, err := hasa(it1.sql, quad.Object, nil)
it2.Tagger().Add("foo")
if err != nil {
t.Error(err)
}
it3, err := linksto(it2.sql, quad.Subject, nil)
if err != nil {
t.Error(err)
}
s, v := it3.sql.buildSQL(true, nil)
t.Log(s, v)
}
func TestInterestingQuery(t *testing.T) {
if *postgres_path == "" {
t.SkipNow()
}
db, err := newQuadStore(*postgres_path, nil)
if err != nil {
t.Fatal(err)
}
qs := db.(*QuadStore)
a := NewSQLLinkIterator(qs, quad.Object, "Humphrey Bogart")
b := NewSQLLinkIterator(qs, quad.Predicate, "name")
it1, err := intersect(a.sql, b.sql, qs)
if err != nil {
t.Error(err)
}
it2, err := hasa(it1.sql, quad.Subject, qs)
if err != nil {
t.Error(err)
}
it2.Tagger().Add("hb")
it3, err := linksto(it2.sql, quad.Object, qs)
if err != nil {
t.Error(err)
}
b = NewSQLLinkIterator(db.(*QuadStore), quad.Predicate, "/film/performance/actor")
it4, err := intersect(it3.sql, b.sql, qs)
if err != nil {
t.Error(err)
}
it5, err := hasa(it4.sql, quad.Subject, qs)
if err != nil {
t.Error(err)
}
it6, err := linksto(it5.sql, quad.Object, qs)
if err != nil {
t.Error(err)
}
b = NewSQLLinkIterator(db.(*QuadStore), quad.Predicate, "/film/film/starring")
it7, err := intersect(it6.sql, b.sql, qs)
if err != nil {
t.Error(err)
}
it8, err := hasa(it7.sql, quad.Subject, qs)
if err != nil {
t.Error(err)
}
s, v := it8.sql.buildSQL(true, nil)
it8.Tagger().Add("id")
t.Log(s, v)
for graph.Next(it8) {
t.Log(it8.Result())
out := make(map[string]graph.Value)
it8.TagResults(out)
for k, v := range out {
t.Log("%s: %v\n", k, v.(string))
}
}
}

341
graph/sql/quadstore.go Normal file
View file

@ -0,0 +1,341 @@
package sql
import (
"crypto/sha1"
"database/sql"
"encoding/hex"
"errors"
"fmt"
"hash"
"sync"
"github.com/lib/pq"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
const QuadStoreType = "sql"
func init() {
graph.RegisterQuadStore(QuadStoreType, true, newQuadStore, createSQLTables, nil)
}
var (
hashPool = sync.Pool{
New: func() interface{} { return sha1.New() },
}
hashSize = sha1.Size
)
type QuadStore struct {
db *sql.DB
sqlFlavor string
size int64
lru *cache
noSizes bool
}
func connectSQLTables(addr string, _ graph.Options) (*sql.DB, error) {
// TODO(barakmich): Parse options for more friendly addr, other SQLs.
conn, err := sql.Open("postgres", addr)
if err != nil {
glog.Errorf("Couldn't open database at %s: %#v", addr, err)
return nil, err
}
// "Open may just validate its arguments without creating a connection to the database."
// "To verify that the data source name is valid, call Ping."
// Source: http://golang.org/pkg/database/sql/#Open
if err := conn.Ping(); err != nil {
glog.Errorf("Couldn't open database at %s: %#v", addr, err)
return nil, err
}
return conn, nil
}
func createSQLTables(addr string, options graph.Options) error {
conn, err := connectSQLTables(addr, options)
if err != nil {
return err
}
tx, err := conn.Begin()
if err != nil {
glog.Errorf("Couldn't begin creation transaction: %s", err)
return err
}
quadTable, err := tx.Exec(`
CREATE TABLE quads (
subject TEXT NOT NULL,
predicate TEXT NOT NULL,
object TEXT NOT NULL,
label TEXT,
horizon BIGSERIAL PRIMARY KEY,
id BIGINT,
ts timestamp,
subject_hash TEXT NOT NULL,
predicate_hash TEXT NOT NULL,
object_hash TEXT NOT NULL,
label_hash TEXT,
UNIQUE(subject_hash, predicate_hash, object_hash, label_hash)
);`)
if err != nil {
glog.Errorf("Cannot create quad table: %v", quadTable)
return err
}
factor, factorOk, err := options.IntKey("db_fill_factor")
if !factorOk {
factor = 50
}
var index sql.Result
index, err = tx.Exec(fmt.Sprintf(`
CREATE INDEX spo_index ON quads (subject_hash) WITH (FILLFACTOR = %d);
CREATE INDEX pos_index ON quads (predicate_hash) WITH (FILLFACTOR = %d);
CREATE INDEX osp_index ON quads (object_hash) WITH (FILLFACTOR = %d);
`, factor, factor, factor))
if err != nil {
glog.Errorf("Cannot create indices: %v", index)
return err
}
tx.Commit()
return nil
}
func newQuadStore(addr string, options graph.Options) (graph.QuadStore, error) {
var qs QuadStore
conn, err := connectSQLTables(addr, options)
if err != nil {
return nil, err
}
localOpt, localOptOk, err := options.BoolKey("local_optimize")
if err != nil {
return nil, err
}
qs.db = conn
qs.sqlFlavor = "postgres"
qs.size = -1
qs.lru = newCache(1024)
// Skip size checking by default.
qs.noSizes = true
if localOptOk {
if localOpt {
qs.noSizes = false
}
}
return &qs, nil
}
func hashOf(s string) string {
h := hashPool.Get().(hash.Hash)
h.Reset()
defer hashPool.Put(h)
key := make([]byte, 0, hashSize)
h.Write([]byte(s))
key = h.Sum(key)
return hex.EncodeToString(key)
}
func (qs *QuadStore) copyFrom(tx *sql.Tx, in []graph.Delta) error {
stmt, err := tx.Prepare(pq.CopyIn("quads", "subject", "predicate", "object", "label", "id", "ts", "subject_hash", "predicate_hash", "object_hash", "label_hash"))
if err != nil {
return err
}
for _, d := range in {
_, err := stmt.Exec(
d.Quad.Subject,
d.Quad.Predicate,
d.Quad.Object,
d.Quad.Label,
d.ID.Int(),
d.Timestamp,
hashOf(d.Quad.Subject),
hashOf(d.Quad.Predicate),
hashOf(d.Quad.Object),
hashOf(d.Quad.Label),
)
if err != nil {
glog.Errorf("couldn't prepare COPY statement: %v", err)
return err
}
}
_, err = stmt.Exec()
if err != nil {
return err
}
return stmt.Close()
}
func (qs *QuadStore) runTxPostgres(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error {
allAdds := true
for _, d := range in {
if d.Action != graph.Add {
allAdds = false
}
}
if allAdds {
return qs.copyFrom(tx, in)
}
insert, err := tx.Prepare(`INSERT INTO quads(subject, predicate, object, label, id, ts, subject_hash, predicate_hash, object_hash, label_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`)
defer insert.Close()
if err != nil {
glog.Errorf("Cannot prepare insert statement: %v", err)
return err
}
for _, d := range in {
switch d.Action {
case graph.Add:
_, err := insert.Exec(
d.Quad.Subject,
d.Quad.Predicate,
d.Quad.Object,
d.Quad.Label,
d.ID.Int(),
d.Timestamp,
hashOf(d.Quad.Subject),
hashOf(d.Quad.Predicate),
hashOf(d.Quad.Object),
hashOf(d.Quad.Label),
)
if err != nil {
glog.Errorf("couldn't prepare INSERT statement: %v", err)
return err
}
case graph.Delete:
result, err := tx.Exec(`DELETE FROM quads WHERE subject=$1 and predicate=$2 and object=$3 and label=$4;`,
d.Quad.Subject, d.Quad.Predicate, d.Quad.Object, d.Quad.Label)
if err != nil {
glog.Errorf("couldn't exec DELETE statement: %v", err)
}
affected, err := result.RowsAffected()
if err != nil {
glog.Errorf("couldn't get DELETE RowsAffected: %v", err)
}
if affected != 1 && !opts.IgnoreMissing {
return errors.New("deleting non-existent triple; rolling back")
}
default:
panic("unknown action")
}
}
return nil
}
func (qs *QuadStore) ApplyDeltas(in []graph.Delta, opts graph.IgnoreOpts) error {
// TODO(barakmich): Support more ignoreOpts? "ON CONFLICT IGNORE"
tx, err := qs.db.Begin()
if err != nil {
glog.Errorf("couldn't begin write transaction: %v", err)
return err
}
switch qs.sqlFlavor {
case "postgres":
err = qs.runTxPostgres(tx, in, opts)
if err != nil {
tx.Rollback()
return err
}
default:
panic("no support for flavor: " + qs.sqlFlavor)
}
return tx.Commit()
}
func (qs *QuadStore) Quad(val graph.Value) quad.Quad {
return val.(quad.Quad)
}
func (qs *QuadStore) QuadIterator(d quad.Direction, val graph.Value) graph.Iterator {
return NewSQLLinkIterator(qs, d, val.(string))
}
func (qs *QuadStore) NodesAllIterator() graph.Iterator {
return NewAllIterator(qs, "nodes")
}
func (qs *QuadStore) QuadsAllIterator() graph.Iterator {
return NewAllIterator(qs, "quads")
}
func (qs *QuadStore) ValueOf(s string) graph.Value {
return s
}
func (qs *QuadStore) NameOf(v graph.Value) string {
if v == nil {
glog.V(2).Info("NameOf was nil")
return ""
}
return v.(string)
}
func (qs *QuadStore) Size() int64 {
// TODO(barakmich): Sync size with writes.
if qs.size != -1 {
return qs.size
}
c := qs.db.QueryRow("SELECT COUNT(*) FROM quads;")
err := c.Scan(&qs.size)
if err != nil {
glog.Errorf("Couldn't execute COUNT: %v", err)
return 0
}
return qs.size
}
func (qs *QuadStore) Horizon() graph.PrimaryKey {
var horizon int64
err := qs.db.QueryRow("SELECT horizon FROM quads ORDER BY horizon DESC LIMIT 1;").Scan(&horizon)
if err != nil {
glog.Errorf("Couldn't execute horizon: %v", err)
return graph.NewSequentialKey(0)
}
return graph.NewSequentialKey(horizon)
}
func (qs *QuadStore) FixedIterator() graph.FixedIterator {
return iterator.NewFixed(iterator.Identity)
}
func (qs *QuadStore) Close() {
qs.db.Close()
}
func (qs *QuadStore) QuadDirection(in graph.Value, d quad.Direction) graph.Value {
q := in.(quad.Quad)
return q.Get(d)
}
func (qs *QuadStore) Type() string {
return QuadStoreType
}
func (qs *QuadStore) sizeForIterator(isAll bool, dir quad.Direction, val string) int64 {
var err error
if isAll {
return qs.Size()
}
if qs.noSizes {
if dir == quad.Predicate {
return (qs.Size() / 100) + 1
}
return (qs.Size() / 1000) + 1
}
if val, ok := qs.lru.Get(val + string(dir.Prefix())); ok {
return val
}
var size int64
glog.V(4).Infoln("sql: getting size for select %s, %s", dir.String(), val)
err = qs.db.QueryRow(
fmt.Sprintf("SELECT count(*) FROM quads WHERE %s_hash = $1;", dir.String()), hashOf(val)).Scan(&size)
if err != nil {
glog.Errorln("Error getting size from SQL database: %v", err)
return 0
}
qs.lru.Put(val+string(dir.Prefix()), size)
return size
}

348
graph/sql/sql_iterator.go Normal file
View file

@ -0,0 +1,348 @@
// Copyright 2015 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
import (
"database/sql"
"fmt"
"strings"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
var sqlType graph.Type
func init() {
sqlType = graph.RegisterIterator("sql")
}
type SQLIterator struct {
uid uint64
qs *QuadStore
cursor *sql.Rows
err error
sql sqlIterator
result map[string]string
resultIndex int
resultList [][]string
resultNext [][]string
cols []string
}
func (it *SQLIterator) Clone() graph.Iterator {
m := &SQLIterator{
uid: iterator.NextUID(),
qs: it.qs,
sql: it.sql.sqlClone(),
}
return m
}
func (it *SQLIterator) UID() uint64 {
return it.uid
}
func (it *SQLIterator) Reset() {
it.err = nil
it.Close()
}
func (it *SQLIterator) Err() error {
return it.err
}
func (it *SQLIterator) Close() error {
if it.cursor != nil {
err := it.cursor.Close()
if err != nil {
return err
}
it.cursor = nil
}
return nil
}
func (it *SQLIterator) Tagger() *graph.Tagger {
return it.sql.Tagger()
}
func (it *SQLIterator) Result() graph.Value {
return it.sql.Result()
}
func (it *SQLIterator) TagResults(dst map[string]graph.Value) {
for tag, value := range it.result {
if tag == "__execd" {
for _, tag := range it.Tagger().Tags() {
dst[tag] = value
}
continue
}
dst[tag] = value
}
for tag, value := range it.Tagger().Fixed() {
dst[tag] = value
}
}
func (it *SQLIterator) Type() graph.Type {
return sqlType
}
func (it *SQLIterator) SubIterators() []graph.Iterator {
return nil
}
func (it *SQLIterator) Sorted() bool { return false }
func (it *SQLIterator) Optimize() (graph.Iterator, bool) { return it, false }
func (it *SQLIterator) Size() (int64, bool) {
return it.sql.Size(it.qs)
}
func (it *SQLIterator) Describe() graph.Description {
size, _ := it.Size()
return graph.Description{
UID: it.UID(),
Name: it.sql.Describe(),
Type: it.Type(),
Size: size,
}
}
func (it *SQLIterator) Stats() graph.IteratorStats {
size, _ := it.Size()
return graph.IteratorStats{
ContainsCost: 1,
NextCost: 5,
Size: size,
}
}
func (it *SQLIterator) NextPath() bool {
it.resultIndex += 1
if it.resultIndex >= len(it.resultList) {
return false
}
it.buildResult(it.resultIndex)
return true
}
func (it *SQLIterator) Next() bool {
var err error
graph.NextLogIn(it)
if it.cursor == nil {
err = it.makeCursor(true, nil)
if err != nil {
glog.Errorf("Couldn't make query: %v", err)
it.err = err
return false
}
it.cols, err = it.cursor.Columns()
if err != nil {
glog.Errorf("Couldn't get columns")
it.err = err
it.cursor.Close()
return false
}
// iterate the first one
if !it.cursor.Next() {
glog.V(4).Infoln("sql: No next")
err := it.cursor.Err()
if err != nil {
glog.Errorf("Cursor error in SQL: %v", err)
it.err = err
}
it.cursor.Close()
return false
}
s, err := scan(it.cursor, len(it.cols))
if err != nil {
it.err = err
it.cursor.Close()
return false
}
it.resultNext = append(it.resultNext, s)
}
if it.resultList != nil && it.resultNext == nil {
// We're on something and there's no next
return false
}
it.resultList = it.resultNext
it.resultNext = nil
it.resultIndex = 0
for {
if !it.cursor.Next() {
glog.V(4).Infoln("sql: No next")
err := it.cursor.Err()
if err != nil {
glog.Errorf("Cursor error in SQL: %v", err)
it.err = err
}
it.cursor.Close()
break
}
s, err := scan(it.cursor, len(it.cols))
if err != nil {
it.err = err
it.cursor.Close()
return false
}
if it.sql.sameTopResult(it.resultList[0], s) {
it.resultList = append(it.resultList, s)
} else {
it.resultNext = append(it.resultNext, s)
break
}
}
if len(it.resultList) == 0 {
return graph.NextLogOut(it, nil, false)
}
it.buildResult(0)
return graph.NextLogOut(it, it.Result(), true)
}
func (it *SQLIterator) Contains(v graph.Value) bool {
var err error
if ok, res := it.sql.quickContains(v); ok {
return res
}
err = it.makeCursor(false, v)
if err != nil {
glog.Errorf("Couldn't make query: %v", err)
it.err = err
if it.cursor != nil {
it.cursor.Close()
}
return false
}
it.cols, err = it.cursor.Columns()
if err != nil {
glog.Errorf("Couldn't get columns")
it.err = err
it.cursor.Close()
return false
}
it.resultList = nil
for {
if !it.cursor.Next() {
glog.V(4).Infoln("sql: No next")
err := it.cursor.Err()
if err != nil {
glog.Errorf("Cursor error in SQL: %v", err)
it.err = err
}
it.cursor.Close()
break
}
s, err := scan(it.cursor, len(it.cols))
if err != nil {
it.err = err
it.cursor.Close()
return false
}
it.resultList = append(it.resultList, s)
}
it.cursor.Close()
it.cursor = nil
if len(it.resultList) != 0 {
it.resultIndex = 0
it.buildResult(0)
return true
}
return false
}
func scan(cursor *sql.Rows, nCols int) ([]string, error) {
pointers := make([]interface{}, nCols)
container := make([]string, nCols)
for i, _ := range pointers {
pointers[i] = &container[i]
}
err := cursor.Scan(pointers...)
if err != nil {
glog.Errorf("Error scanning iterator: %v", err)
return nil, err
}
return container, nil
}
func (it *SQLIterator) buildResult(i int) {
it.result = it.sql.buildResult(it.resultList[i], it.cols)
}
func (it *SQLIterator) makeCursor(next bool, value graph.Value) error {
if it.cursor != nil {
it.cursor.Close()
}
var q string
var values []string
q, values = it.sql.buildSQL(next, value)
q = convertToPostgres(q, values)
ivalues := make([]interface{}, 0, len(values))
for _, v := range values {
ivalues = append(ivalues, v)
}
cursor, err := it.qs.db.Query(q, ivalues...)
if err != nil {
glog.Errorf("Couldn't get cursor from SQL database: %v", err)
cursor = nil
return err
}
it.cursor = cursor
return nil
}
func convertToPostgres(query string, values []string) string {
for i := 1; i <= len(values); i++ {
query = strings.Replace(query, "?", fmt.Sprintf("$%d", i), 1)
}
return query
}
func NewSQLLinkIterator(qs *QuadStore, d quad.Direction, val string) *SQLIterator {
l := &SQLIterator{
uid: iterator.NextUID(),
qs: qs,
sql: &SQLLinkIterator{
constraints: []constraint{
constraint{
dir: d,
vals: []string{val},
},
},
tableName: newTableName(),
size: 0,
},
}
return l
}
func NewSQLIterator(qs *QuadStore, sql sqlIterator) *SQLIterator {
l := &SQLIterator{
uid: iterator.NextUID(),
qs: qs,
sql: sql,
}
return l
}

View file

@ -0,0 +1,310 @@
// Copyright 2015 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
import (
"fmt"
"strings"
"sync/atomic"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/quad"
)
var sqlTableID uint64
func init() {
atomic.StoreUint64(&sqlTableID, 0)
}
func newTableName() string {
id := atomic.AddUint64(&sqlTableID, 1)
return fmt.Sprintf("t_%d", id)
}
type constraint struct {
dir quad.Direction
vals []string
}
type tagDir struct {
tag string
dir quad.Direction
table string
justLocal bool
}
func (t tagDir) String() string {
if t.dir == quad.Any {
if t.justLocal {
return fmt.Sprintf("%s.__execd as \"%s\", %s.__execd_hash as %s_hash", t.table, t.tag, t.table, t.tag)
}
return fmt.Sprintf("%s.\"%s\" as \"%s\", %s.%s_hash as %s_hash", t.table, t.tag, t.tag, t.table, t.tag, t.tag)
}
return fmt.Sprintf("%s.%s as \"%s\", %s.%s_hash as %s_hash", t.table, t.dir, t.tag, t.table, t.dir, t.tag)
}
type tableDef struct {
table string
name string
values []string
}
type sqlItDir struct {
dir quad.Direction
it sqlIterator
}
type sqlIterator interface {
sqlClone() sqlIterator
buildSQL(next bool, val graph.Value) (string, []string)
getTables() []tableDef
getTags() []tagDir
buildWhere() (string, []string)
tableID() tagDir
quickContains(graph.Value) (ok bool, result bool)
buildResult(result []string, cols []string) map[string]string
sameTopResult(target []string, test []string) bool
Result() graph.Value
Size(*QuadStore) (int64, bool)
Describe() string
Type() sqlQueryType
Tagger() *graph.Tagger
}
type SQLLinkIterator struct {
tagger graph.Tagger
nodeIts []sqlItDir
constraints []constraint
tableName string
size int64
tagdirs []tagDir
resultQuad quad.Quad
}
func (l *SQLLinkIterator) sqlClone() sqlIterator {
m := &SQLLinkIterator{
tableName: l.tableName,
size: l.size,
constraints: make([]constraint, len(l.constraints)),
tagdirs: make([]tagDir, len(l.tagdirs)),
}
for _, i := range l.nodeIts {
m.nodeIts = append(m.nodeIts, sqlItDir{
dir: i.dir,
it: i.it.sqlClone(),
})
}
copy(m.constraints, l.constraints)
copy(m.tagdirs, l.tagdirs)
m.tagger.CopyFromTagger(l.Tagger())
return m
}
func (l *SQLLinkIterator) Tagger() *graph.Tagger {
return &l.tagger
}
func (l *SQLLinkIterator) Result() graph.Value {
return l.resultQuad
}
func (l *SQLLinkIterator) Size(qs *QuadStore) (int64, bool) {
if l.size != 0 {
return l.size, true
}
if len(l.constraints) > 0 {
l.size = qs.sizeForIterator(false, l.constraints[0].dir, l.constraints[0].vals[0])
} else if len(l.nodeIts) > 1 {
subsize, _ := l.nodeIts[0].it.(*SQLNodeIterator).Size(qs)
return subsize * 20, false
} else {
return qs.Size(), false
}
return l.size, true
}
func (l *SQLLinkIterator) Describe() string {
s, _ := l.buildSQL(true, nil)
return fmt.Sprintf("SQL_LINK_QUERY: %s", s)
}
func (l *SQLLinkIterator) Type() sqlQueryType {
return link
}
func (l *SQLLinkIterator) quickContains(v graph.Value) (bool, bool) {
for _, c := range l.constraints {
none := true
desired := v.(quad.Quad).Get(c.dir)
for _, s := range c.vals {
if s == desired {
none = false
break
}
}
if none {
return true, false
}
}
if len(l.nodeIts) == 0 {
return true, true
}
return false, false
}
func (l *SQLLinkIterator) buildResult(result []string, cols []string) map[string]string {
var q quad.Quad
q.Subject = result[0]
q.Predicate = result[1]
q.Object = result[2]
q.Label = result[3]
l.resultQuad = q
m := make(map[string]string)
for i, c := range cols[4:] {
m[c] = result[i+4]
}
return m
}
func (l *SQLLinkIterator) getTables() []tableDef {
out := []tableDef{tableDef{table: "quads", name: l.tableName}}
for _, i := range l.nodeIts {
out = append(out, i.it.getTables()...)
}
return out
}
func (l *SQLLinkIterator) getTags() []tagDir {
var out []tagDir
for _, tag := range l.tagger.Tags() {
out = append(out, tagDir{
dir: quad.Any,
table: l.tableName,
tag: tag,
})
}
for _, tag := range l.tagdirs {
out = append(out, tagDir{
dir: tag.dir,
table: l.tableName,
tag: tag.tag,
})
}
for _, i := range l.nodeIts {
out = append(out, i.it.getTags()...)
}
return out
}
func (l *SQLLinkIterator) buildWhere() (string, []string) {
var q []string
var vals []string
for _, c := range l.constraints {
q = append(q, fmt.Sprintf("%s.%s_hash = ?", l.tableName, c.dir))
vals = append(vals, hashOf(c.vals[0]))
}
for _, i := range l.nodeIts {
t := i.it.tableID()
dir := t.dir.String()
if t.dir == quad.Any {
dir = t.tag
}
q = append(q, fmt.Sprintf("%s.%s_hash = %s.%s_hash", l.tableName, i.dir, t.table, dir))
}
for _, i := range l.nodeIts {
s, v := i.it.buildWhere()
q = append(q, s)
vals = append(vals, v...)
}
query := strings.Join(q, " AND ")
return query, vals
}
func (l *SQLLinkIterator) tableID() tagDir {
return tagDir{
dir: quad.Any,
table: l.tableName,
}
}
func (l *SQLLinkIterator) buildSQL(next bool, val graph.Value) (string, []string) {
query := "SELECT "
t := []string{
fmt.Sprintf("%s.subject", l.tableName),
fmt.Sprintf("%s.predicate", l.tableName),
fmt.Sprintf("%s.object", l.tableName),
fmt.Sprintf("%s.label", l.tableName),
}
for _, v := range l.getTags() {
t = append(t, v.String())
}
query += strings.Join(t, ", ")
query += " FROM "
t = []string{}
var values []string
for _, k := range l.getTables() {
values = append(values, k.values...)
t = append(t, fmt.Sprintf("%s as %s", k.table, k.name))
}
query += strings.Join(t, ", ")
constraint, wherevalues := l.buildWhere()
if constraint != "" {
query += " WHERE "
}
values = append(values, wherevalues...)
if !next {
v := val.(quad.Quad)
if constraint != "" {
constraint += " AND "
} else {
constraint += " WHERE "
}
t = []string{
fmt.Sprintf("%s.subject_hash = ?", l.tableName),
fmt.Sprintf("%s.predicate_hash = ?", l.tableName),
fmt.Sprintf("%s.object_hash = ?", l.tableName),
fmt.Sprintf("%s.label_hash = ?", l.tableName),
}
constraint += strings.Join(t, " AND ")
values = append(values, hashOf(v.Subject))
values = append(values, hashOf(v.Predicate))
values = append(values, hashOf(v.Object))
values = append(values, hashOf(v.Label))
}
query += constraint
query += ";"
if glog.V(4) {
dstr := query
for i := 1; i <= len(values); i++ {
dstr = strings.Replace(dstr, "?", fmt.Sprintf("'%s'", values[i-1]), 1)
}
glog.V(4).Infoln(dstr)
}
return query, values
}
func (l *SQLLinkIterator) sameTopResult(target []string, test []string) bool {
return target[0] == test[0] && target[1] == test[1] && target[2] == test[2] && target[3] == test[3]
}

View file

@ -0,0 +1,92 @@
// Copyright 2015 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
import (
"flag"
"fmt"
"testing"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
var postgres_path = flag.String("postgres_path", "", "Path to running DB")
func TestSQLLink(t *testing.T) {
it := NewSQLLinkIterator(nil, quad.Object, "cool")
s, v := it.sql.buildSQL(true, nil)
t.Log(s, v)
}
func TestSQLLinkIteration(t *testing.T) {
if *postgres_path == "" {
t.SkipNow()
}
db, err := newQuadStore(*postgres_path, nil)
qs := db.(*QuadStore)
if err != nil {
t.Fatal(err)
}
it := NewSQLLinkIterator(qs, quad.Object, "Humphrey Bogart")
for graph.Next(it) {
fmt.Println(it.Result())
}
it = NewSQLLinkIterator(qs, quad.Subject, "/en/casablanca_1942")
s, v := it.sql.buildSQL(true, nil)
t.Log(s, v)
c := 0
for graph.Next(it) {
fmt.Println(it.Result())
c += 1
}
if c != 18 {
t.Errorf("Not enough results, got %d expected 18", c)
}
}
func TestSQLNodeIteration(t *testing.T) {
if *postgres_path == "" {
t.SkipNow()
}
db, err := newQuadStore(*postgres_path, nil)
if err != nil {
t.Fatal(err)
}
link := NewSQLLinkIterator(db.(*QuadStore), quad.Object, "/en/humphrey_bogart")
it := &SQLIterator{
uid: iterator.NextUID(),
qs: db.(*QuadStore),
sql: &SQLNodeIterator{
tableName: newTableName(),
linkIt: sqlItDir{
it: link.sql,
dir: quad.Subject,
},
},
}
s, v := it.sql.buildSQL(true, nil)
t.Log(s, v)
c := 0
for graph.Next(it) {
t.Log(it.Result())
c += 1
}
if c != 56 {
t.Errorf("Not enough results, got %d expected 56", c)
}
}

View file

@ -0,0 +1,211 @@
// Copyright 2015 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
import (
"fmt"
"strings"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/quad"
)
type SQLNodeIntersection struct {
tableName string
nodeIts []sqlIterator
nodetables []string
size int64
tagger graph.Tagger
result string
}
func (n *SQLNodeIntersection) sqlClone() sqlIterator {
m := &SQLNodeIntersection{
tableName: n.tableName,
size: n.size,
}
for _, i := range n.nodeIts {
m.nodeIts = append(m.nodeIts, i.sqlClone())
}
m.tagger.CopyFromTagger(n.Tagger())
return m
}
func (n *SQLNodeIntersection) Tagger() *graph.Tagger {
return &n.tagger
}
func (n *SQLNodeIntersection) Result() graph.Value {
return n.result
}
func (n *SQLNodeIntersection) Type() sqlQueryType {
return nodeIntersect
}
func (n *SQLNodeIntersection) Size(qs *QuadStore) (int64, bool) {
return qs.Size() / int64(len(n.nodeIts)+1), true
}
func (n *SQLNodeIntersection) Describe() string {
s, _ := n.buildSQL(true, nil)
return fmt.Sprintf("SQL_NODE_INTERSECTION: %s", s)
}
func (n *SQLNodeIntersection) buildResult(result []string, cols []string) map[string]string {
m := make(map[string]string)
for i, c := range cols {
if strings.HasSuffix(c, "_hash") {
continue
}
if c == "__execd" {
n.result = result[i]
}
m[c] = result[i]
}
return m
}
func (n *SQLNodeIntersection) makeNodeTableNames() {
if n.nodetables != nil {
return
}
n.nodetables = make([]string, len(n.nodeIts))
for i, _ := range n.nodetables {
n.nodetables[i] = newNodeTableName()
}
}
func (n *SQLNodeIntersection) getTables() []tableDef {
if len(n.nodeIts) == 0 {
panic("Combined no subnode queries")
}
return n.buildSubqueries()
}
func (n *SQLNodeIntersection) buildSubqueries() []tableDef {
var out []tableDef
n.makeNodeTableNames()
for i, it := range n.nodeIts {
var td tableDef
var table string
table, td.values = it.buildSQL(true, nil)
td.table = fmt.Sprintf("\n(%s)", table[:len(table)-1])
td.name = n.nodetables[i]
out = append(out, td)
}
return out
}
func (n *SQLNodeIntersection) tableID() tagDir {
n.makeNodeTableNames()
return tagDir{
table: n.nodetables[0],
dir: quad.Any,
tag: "__execd",
}
}
func (n *SQLNodeIntersection) getLocalTags() []tagDir {
myTag := n.tableID()
var out []tagDir
for _, tag := range n.tagger.Tags() {
out = append(out, tagDir{
dir: myTag.dir,
table: myTag.table,
tag: tag,
justLocal: true,
})
}
return out
}
func (n *SQLNodeIntersection) getTags() []tagDir {
out := n.getLocalTags()
n.makeNodeTableNames()
for i, it := range n.nodeIts {
for _, v := range it.getTags() {
out = append(out, tagDir{
tag: v.tag,
dir: quad.Any,
table: n.nodetables[i],
})
}
}
return out
}
func (n *SQLNodeIntersection) buildWhere() (string, []string) {
var q []string
var vals []string
for _, tb := range n.nodetables[1:] {
q = append(q, fmt.Sprintf("%s.__execd_hash = %s.__execd_hash", n.nodetables[0], tb))
}
query := strings.Join(q, " AND ")
return query, vals
}
func (n *SQLNodeIntersection) buildSQL(next bool, val graph.Value) (string, []string) {
topData := n.tableID()
tags := []tagDir{topData}
tags = append(tags, n.getTags()...)
query := "SELECT "
var t []string
for _, v := range tags {
t = append(t, v.String())
}
query += strings.Join(t, ", ")
query += " FROM "
t = []string{}
var values []string
for _, k := range n.getTables() {
values = append(values, k.values...)
t = append(t, fmt.Sprintf("%s as %s", k.table, k.name))
}
query += strings.Join(t, ", ")
query += " WHERE "
constraint, wherevalues := n.buildWhere()
values = append(values, wherevalues...)
if !next {
v := val.(string)
if constraint != "" {
constraint += " AND "
}
constraint += fmt.Sprintf("%s.%s_hash = ?", topData.table, topData.dir)
values = append(values, hashOf(v))
}
query += constraint
query += ";"
if glog.V(4) {
dstr := query
for i := 1; i <= len(values); i++ {
dstr = strings.Replace(dstr, "?", fmt.Sprintf("'%s'", values[i-1]), 1)
}
glog.V(4).Infoln(dstr)
}
return query, values
}
func (n *SQLNodeIntersection) sameTopResult(target []string, test []string) bool {
return target[0] == test[0]
}
func (n *SQLNodeIntersection) quickContains(_ graph.Value) (bool, bool) { return false, false }

View file

@ -0,0 +1,213 @@
// Copyright 2015 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
import (
"fmt"
"strings"
"sync/atomic"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/quad"
)
var sqlNodeTableID uint64
type sqlQueryType int
const (
node sqlQueryType = iota
link
nodeIntersect
)
func init() {
atomic.StoreUint64(&sqlNodeTableID, 0)
}
func newNodeTableName() string {
id := atomic.AddUint64(&sqlNodeTableID, 1)
return fmt.Sprintf("n_%d", id)
}
type SQLNodeIterator struct {
tableName string
linkIt sqlItDir
size int64
tagger graph.Tagger
result string
}
func (n *SQLNodeIterator) sqlClone() sqlIterator {
m := &SQLNodeIterator{
tableName: n.tableName,
size: n.size,
linkIt: sqlItDir{
dir: n.linkIt.dir,
it: n.linkIt.it.sqlClone(),
},
}
m.tagger.CopyFromTagger(n.Tagger())
return m
}
func (n *SQLNodeIterator) Tagger() *graph.Tagger {
return &n.tagger
}
func (n *SQLNodeIterator) Result() graph.Value {
return n.result
}
func (n *SQLNodeIterator) Type() sqlQueryType {
return node
}
func (n *SQLNodeIterator) Size(qs *QuadStore) (int64, bool) {
return qs.Size() / 2, true
}
func (n *SQLNodeIterator) Describe() string {
s, _ := n.buildSQL(true, nil)
return fmt.Sprintf("SQL_NODE_QUERY: %s", s)
}
func (n *SQLNodeIterator) buildResult(result []string, cols []string) map[string]string {
m := make(map[string]string)
for i, c := range cols {
if strings.HasSuffix(c, "_hash") {
continue
}
if c == "__execd" {
n.result = result[i]
}
m[c] = result[i]
}
return m
}
func (n *SQLNodeIterator) getTables() []tableDef {
var out []tableDef
if n.linkIt.it != nil {
out = n.linkIt.it.getTables()
}
if len(out) == 0 {
out = append(out, tableDef{table: "quads", name: n.tableName})
}
return out
}
func (n *SQLNodeIterator) tableID() tagDir {
if n.linkIt.it != nil {
return tagDir{
table: n.linkIt.it.tableID().table,
dir: n.linkIt.dir,
tag: "__execd",
}
}
return tagDir{
table: n.tableName,
dir: quad.Any,
tag: "__execd",
}
}
func (n *SQLNodeIterator) getLocalTags() []tagDir {
myTag := n.tableID()
var out []tagDir
for _, tag := range n.tagger.Tags() {
out = append(out, tagDir{
dir: myTag.dir,
table: myTag.table,
tag: tag,
justLocal: true,
})
}
return out
}
func (n *SQLNodeIterator) getTags() []tagDir {
out := n.getLocalTags()
if n.linkIt.it != nil {
out = append(out, n.linkIt.it.getTags()...)
}
return out
}
func (n *SQLNodeIterator) buildWhere() (string, []string) {
var q []string
var vals []string
if n.linkIt.it != nil {
s, v := n.linkIt.it.buildWhere()
q = append(q, s)
vals = append(vals, v...)
}
query := strings.Join(q, " AND ")
return query, vals
}
func (n *SQLNodeIterator) buildSQL(next bool, val graph.Value) (string, []string) {
topData := n.tableID()
tags := []tagDir{topData}
tags = append(tags, n.getTags()...)
query := "SELECT "
var t []string
for _, v := range tags {
t = append(t, v.String())
}
query += strings.Join(t, ", ")
query += " FROM "
t = []string{}
var values []string
for _, k := range n.getTables() {
values = append(values, k.values...)
t = append(t, fmt.Sprintf("%s as %s", k.table, k.name))
}
query += strings.Join(t, ", ")
query += " WHERE "
constraint, wherevalues := n.buildWhere()
values = append(values, wherevalues...)
if !next {
v := val.(string)
if constraint != "" {
constraint += " AND "
}
constraint += fmt.Sprintf("%s.%s_hash = ?", topData.table, topData.dir)
values = append(values, hashOf(v))
}
query += constraint
query += ";"
if glog.V(4) {
dstr := query
for i := 1; i <= len(values); i++ {
dstr = strings.Replace(dstr, "?", fmt.Sprintf("'%s'", values[i-1]), 1)
}
glog.V(4).Infoln(dstr)
}
return query, values
}
func (n *SQLNodeIterator) sameTopResult(target []string, test []string) bool {
return target[0] == test[0]
}
func (n *SQLNodeIterator) quickContains(_ graph.Value) (bool, bool) { return false, false }

View file

@ -37,19 +37,23 @@ import (
_ "github.com/google/cayley/graph/leveldb"
_ "github.com/google/cayley/graph/memstore"
_ "github.com/google/cayley/graph/mongo"
_ "github.com/google/cayley/graph/sql"
// Load writer registry
_ "github.com/google/cayley/writer"
)
var backend = flag.String("backend", "memstore", "Which backend to test. Loads test data to /tmp if not present.")
var backendPath = flag.String("backend_path", "", "Path to the chosen backend. Will have sane testing defaults if not specified")
var benchmarkQueries = []struct {
message string
long bool
query string
tag string
expect []interface{}
// for testing
skip bool
expect []interface{}
}{
// Easy one to get us started. How quick is the most straightforward retrieval?
{
@ -422,6 +426,7 @@ var (
)
func prepare(t testing.TB) {
var remote bool
cfg.DatabaseType = *backend
switch *backend {
case "memstore":
@ -436,14 +441,21 @@ func prepare(t testing.TB) {
cfg.DatabaseOptions = map[string]interface{}{
"database_name": "cayley_test", // provide a default test database
}
remote = true
case "sql":
cfg.DatabasePath = "postgres://localhost/cayley_test"
remote = true
default:
t.Fatalf("Untestable backend store %s", *backend)
}
if *backendPath != "" {
cfg.DatabasePath = *backendPath
}
var err error
create.Do(func() {
needsLoad := true
if graph.IsPersistent(cfg.DatabaseType) {
if graph.IsPersistent(cfg.DatabaseType) && !remote {
if _, err := os.Stat(cfg.DatabasePath); os.IsNotExist(err) {
err = db.Init(cfg)
if err != nil {
@ -459,7 +471,7 @@ func prepare(t testing.TB) {
t.Fatalf("Failed to open %q: %v", cfg.DatabasePath, err)
}
if needsLoad {
if needsLoad && !remote {
err = internal.Load(handle.QuadWriter, cfg, "../data/30kmoviedata.nq.gz", "cquad")
if err != nil {
t.Fatalf("Failed to load %q: %v", cfg.DatabasePath, err)
@ -524,6 +536,11 @@ func checkQueries(t *testing.T) {
if testing.Short() && test.long {
continue
}
if test.skip {
continue
}
tInit := time.Now()
t.Logf("Now testing %s ", test.message)
ses := gremlin.NewSession(handle.QuadStore, cfg.Timeout, true)
_, err := ses.Parse(test.query)
if err != nil {
@ -552,6 +569,7 @@ func checkQueries(t *testing.T) {
t.Error("Query timed out: skipping validation.")
continue
}
t.Logf("(%v)\n", time.Since(tInit))
if len(got) != len(test.expect) {
t.Errorf("Unexpected number of results, got:%d expect:%d on %s.", len(got), len(test.expect), test.message)

View file

@ -161,7 +161,10 @@ func Repl(h *graph.Handle, queryLanguage string, cfg *config.Config) error {
fmt.Printf("Error: not a valid quad: %v\n", err)
continue
}
h.QuadWriter.RemoveQuad(quad)
err = h.QuadWriter.RemoveQuad(quad)
if err != nil {
fmt.Printf("error deleting: %v\n", err)
}
continue
case "exit":

View file

@ -66,6 +66,8 @@ const (
Label
)
var Directions = []Direction{Subject, Predicate, Object, Label}
func (d Direction) Prefix() byte {
switch d {
case Any:

View file

@ -281,9 +281,9 @@ func (wk *worker) runIterator(it graph.Iterator) {
if glog.V(2) {
b, err := json.MarshalIndent(it.Describe(), "", " ")
if err != nil {
glog.Infof("failed to format description: %v", err)
glog.V(2).Infof("failed to format description: %v", err)
} else {
glog.Infof("%s", b)
glog.V(2).Infof("%s", b)
}
}
for {