refactor to SQL builder iterators and standard iterator wrapper

This commit is contained in:
Barak Michener 2015-07-29 15:56:15 -04:00
parent b754810c6e
commit 3e02bb2b71
5 changed files with 453 additions and 593 deletions

View file

@ -15,22 +15,18 @@
package sql
import (
"database/sql"
"fmt"
"strings"
"sync/atomic"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
var sqlLinkType graph.Type
var sqlTableID uint64
func init() {
sqlLinkType = graph.RegisterIterator("sqllink")
atomic.StoreUint64(&sqlTableID, 0)
}
@ -73,59 +69,39 @@ type sqlItDir struct {
}
type sqlIterator interface {
buildSQL(next bool, val graph.Value) (string, []string)
sqlClone() sqlIterator
buildSQL(next bool, val graph.Value) (string, []string)
getTables() []tableDef
getTags() []tagDir
buildWhere() (string, []string)
tableID() tagDir
quickContains(graph.Value) (ok bool, result bool)
buildResult(result []string, cols []string) map[string]string
sameTopResult(target []string, test []string) bool
Result() graph.Value
Size(*QuadStore) (int64, bool)
Describe() string
Type() sqlQueryType
Tagger() *graph.Tagger
}
type SQLLinkIterator struct {
uid uint64
qs *QuadStore
tagger graph.Tagger
err error
cursor *sql.Rows
nodeIts []sqlItDir
constraints []constraint
tableName string
size int64
tagdirs []tagDir
result map[string]string
resultIndex int
resultList [][]string
resultNext [][]string
cols []string
resultQuad quad.Quad
}
func NewSQLLinkIterator(qs *QuadStore, d quad.Direction, val string) *SQLLinkIterator {
l := &SQLLinkIterator{
uid: iterator.NextUID(),
qs: qs,
constraints: []constraint{
constraint{
dir: d,
vals: []string{val},
},
},
tableName: newTableName(),
size: 0,
}
return l
resultQuad quad.Quad
}
func (l *SQLLinkIterator) sqlClone() sqlIterator {
return l.Clone().(*SQLLinkIterator)
}
func (l *SQLLinkIterator) Clone() graph.Iterator {
m := &SQLLinkIterator{
uid: iterator.NextUID(),
qs: l.qs,
tableName: l.tableName,
size: l.size,
constraints: make([]constraint, len(l.constraints)),
@ -139,34 +115,10 @@ func (l *SQLLinkIterator) Clone() graph.Iterator {
}
copy(m.constraints, l.constraints)
copy(m.tagdirs, l.tagdirs)
m.tagger.CopyFrom(l)
m.tagger.CopyFromTagger(l.Tagger())
return m
}
func (l *SQLLinkIterator) UID() uint64 {
return l.uid
}
func (l *SQLLinkIterator) Reset() {
l.err = nil
l.Close()
}
func (l *SQLLinkIterator) Err() error {
return l.err
}
func (l *SQLLinkIterator) Close() error {
if l.cursor != nil {
err := l.cursor.Close()
if err != nil {
return err
}
l.cursor = nil
}
return nil
}
func (l *SQLLinkIterator) Tagger() *graph.Tagger {
return &l.tagger
}
@ -175,70 +127,30 @@ func (l *SQLLinkIterator) Result() graph.Value {
return l.resultQuad
}
func (l *SQLLinkIterator) TagResults(dst map[string]graph.Value) {
for tag, value := range l.result {
if tag == "__execd" {
for _, tag := range l.tagger.Tags() {
dst[tag] = value
}
continue
}
dst[tag] = value
}
for tag, value := range l.tagger.Fixed() {
dst[tag] = value
}
}
func (l *SQLLinkIterator) SubIterators() []graph.Iterator {
// TODO(barakmich): SQL Subiterators shouldn't count? If it makes sense,
// there's no reason not to expose them though.
return nil
}
func (l *SQLLinkIterator) Sorted() bool { return false }
func (l *SQLLinkIterator) Optimize() (graph.Iterator, bool) { return l, false }
func (l *SQLLinkIterator) Size() (int64, bool) {
func (l *SQLLinkIterator) Size(qs *QuadStore) (int64, bool) {
if l.size != 0 {
return l.size, true
}
if len(l.constraints) > 0 {
l.size = l.qs.sizeForIterator(false, l.constraints[0].dir, l.constraints[0].vals[0])
l.size = qs.sizeForIterator(false, l.constraints[0].dir, l.constraints[0].vals[0])
} else if len(l.nodeIts) > 1 {
subsize, _ := l.nodeIts[0].it.(*SQLNodeIterator).Size()
subsize, _ := l.nodeIts[0].it.(*SQLNodeIterator).Size(qs)
return subsize * 20, false
} else {
return l.qs.Size(), false
return qs.Size(), false
}
return l.size, true
}
func (l *SQLLinkIterator) Describe() graph.Description {
size, _ := l.Size()
return graph.Description{
UID: l.UID(),
Name: fmt.Sprintf("SQL_LINK_QUERY: %#v", l),
Type: l.Type(),
Size: size,
}
func (l *SQLLinkIterator) Describe() string {
return fmt.Sprintf("SQL_LINK_QUERY: %#v", l)
}
func (l *SQLLinkIterator) Stats() graph.IteratorStats {
size, _ := l.Size()
return graph.IteratorStats{
ContainsCost: 1,
NextCost: 5,
Size: size,
}
func (l *SQLLinkIterator) Type() sqlQueryType {
return link
}
func (l *SQLLinkIterator) Type() graph.Type {
return sqlLinkType
}
func (l *SQLLinkIterator) preFilter(v graph.Value) bool {
func (l *SQLLinkIterator) quickContains(v graph.Value) (bool, bool) {
for _, c := range l.constraints {
none := true
desired := v.(quad.Quad).Get(c.dir)
@ -249,85 +161,27 @@ func (l *SQLLinkIterator) preFilter(v graph.Value) bool {
}
}
if none {
return true
return true, false
}
}
return false
}
func (l *SQLLinkIterator) Contains(v graph.Value) bool {
var err error
if l.preFilter(v) {
return false
}
if len(l.nodeIts) == 0 {
return true
return true, true
}
err = l.makeCursor(false, v)
if err != nil {
glog.Errorf("Couldn't make query: %v", err)
l.err = err
l.cursor.Close()
return false
}
l.cols, err = l.cursor.Columns()
if err != nil {
glog.Errorf("Couldn't get columns")
l.err = err
l.cursor.Close()
return false
}
l.resultList = nil
for {
if !l.cursor.Next() {
glog.V(4).Infoln("sql: No next")
err := l.cursor.Err()
if err != nil {
glog.Errorf("Cursor error in SQL: %v", err)
l.err = err
}
l.cursor.Close()
break
}
s, err := scan(l.cursor, len(l.cols))
if err != nil {
l.err = err
l.cursor.Close()
return false
}
l.resultList = append(l.resultList, s)
}
l.cursor.Close()
l.cursor = nil
if len(l.resultList) != 0 {
l.resultIndex = 0
l.buildResult(0)
return true
}
return false
return false, false
}
func (l *SQLLinkIterator) NextPath() bool {
l.resultIndex += 1
if l.resultIndex >= len(l.resultList) {
return false
}
l.buildResult(l.resultIndex)
return true
}
func (l *SQLLinkIterator) buildResult(i int) {
container := l.resultList[i]
func (l *SQLLinkIterator) buildResult(result []string, cols []string) map[string]string {
var q quad.Quad
q.Subject = container[0]
q.Predicate = container[1]
q.Object = container[2]
q.Label = container[3]
q.Subject = result[0]
q.Predicate = result[1]
q.Object = result[2]
q.Label = result[3]
l.resultQuad = q
l.result = make(map[string]string)
for i, c := range l.cols[4:] {
l.result[c] = container[i+4]
m := make(map[string]string)
for i, c := range cols[4:] {
m[c] = result[i+4]
}
return m
}
func (l *SQLLinkIterator) getTables() []tableDef {
@ -448,119 +302,6 @@ func (l *SQLLinkIterator) buildSQL(next bool, val graph.Value) (string, []string
return query, values
}
func convertToPostgres(query string, values []string) string {
for i := 1; i <= len(values); i++ {
query = strings.Replace(query, "?", fmt.Sprintf("$%d", i), 1)
}
return query
}
func (l *SQLLinkIterator) makeCursor(next bool, value graph.Value) error {
if l.cursor != nil {
l.cursor.Close()
}
var q string
var values []string
q, values = l.buildSQL(next, value)
q = convertToPostgres(q, values)
ivalues := make([]interface{}, 0, len(values))
for _, v := range values {
ivalues = append(ivalues, v)
}
cursor, err := l.qs.db.Query(q, ivalues...)
if err != nil {
glog.Errorf("Couldn't get cursor from SQL database: %v", err)
cursor = nil
return err
}
l.cursor = cursor
return nil
}
func scan(cursor *sql.Rows, nCols int) ([]string, error) {
pointers := make([]interface{}, nCols)
container := make([]string, nCols)
for i, _ := range pointers {
pointers[i] = &container[i]
}
err := cursor.Scan(pointers...)
if err != nil {
glog.Errorf("Error scanning iterator: %v", err)
return nil, err
}
return container, nil
}
func (l *SQLLinkIterator) Next() bool {
var err error
graph.NextLogIn(l)
if l.cursor == nil {
err = l.makeCursor(true, nil)
l.cols, err = l.cursor.Columns()
if err != nil {
glog.Errorf("Couldn't get columns")
l.err = err
l.cursor.Close()
return false
}
// iterate the first one
if !l.cursor.Next() {
glog.V(4).Infoln("sql: No next")
err := l.cursor.Err()
if err != nil {
glog.Errorf("Cursor error in SQL: %v", err)
l.err = err
}
l.cursor.Close()
return false
}
s, err := scan(l.cursor, len(l.cols))
if err != nil {
l.err = err
l.cursor.Close()
return false
}
l.resultNext = append(l.resultNext, s)
}
if l.resultList != nil && l.resultNext == nil {
// We're on something and there's no next
return false
}
l.resultList = l.resultNext
l.resultNext = nil
l.resultIndex = 0
for {
if !l.cursor.Next() {
glog.V(4).Infoln("sql: No next")
err := l.cursor.Err()
if err != nil {
glog.Errorf("Cursor error in SQL: %v", err)
l.err = err
}
l.cursor.Close()
break
}
s, err := scan(l.cursor, len(l.cols))
if err != nil {
l.err = err
l.cursor.Close()
return false
}
if l.resultList[0][0] == s[0] && l.resultList[0][1] == s[1] && l.resultList[0][2] == s[2] && l.resultList[0][3] == s[3] {
l.resultList = append(l.resultList, s)
} else {
l.resultNext = append(l.resultNext, s)
break
}
}
if len(l.resultList) == 0 {
return graph.NextLogOut(l, nil, false)
}
l.buildResult(0)
return graph.NextLogOut(l, l.Result(), true)
}
type SQLAllIterator struct {
// TBD
func (l *SQLLinkIterator) sameTopResult(target []string, test []string) bool {
return target[0] == test[0] && target[1] == test[1] && target[2] == test[2] && target[3] == test[3]
}