From 3f391a782c7cda1b4360c1577133d7bde9481349 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Thu, 9 Apr 2015 17:28:41 -0400 Subject: [PATCH] first working-ish Postgres backend Subcommits: implement iterator and remove ResultTree add Err() to sql remove redundant and less helpful indices, change fillfactor, and use COPY FROM --- cmd/cayley/cayley.go | 11 ++ graph/sql/iterator.go | 270 ++++++++++++++++++++++++++++++++++++++++++++ graph/sql/lru.go | 63 +++++++++++ graph/sql/quadstore.go | 295 ++++++++++++++++++++++++++++++++++++++++++++++++ quad/quad.go | 2 + query/gremlin/finals.go | 4 +- 6 files changed, 643 insertions(+), 2 deletions(-) create mode 100644 graph/sql/iterator.go create mode 100644 graph/sql/lru.go create mode 100644 graph/sql/quadstore.go diff --git a/cmd/cayley/cayley.go b/cmd/cayley/cayley.go index 302d9e7..7f76559 100644 --- a/cmd/cayley/cayley.go +++ b/cmd/cayley/cayley.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "runtime" + "runtime/pprof" "time" "github.com/barakmich/glog" @@ -36,6 +37,7 @@ import ( _ "github.com/google/cayley/graph/leveldb" _ "github.com/google/cayley/graph/memstore" _ "github.com/google/cayley/graph/mongo" + _ "github.com/google/cayley/graph/sql" // Load writer registry _ "github.com/google/cayley/writer" @@ -147,6 +149,15 @@ func main() { os.Args = append(os.Args[:1], os.Args[2:]...) flag.Parse() + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + glog.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + var buildString string if Version != "" { buildString = fmt.Sprint("Cayley ", Version, " built ", BuildDate) diff --git a/graph/sql/iterator.go b/graph/sql/iterator.go new file mode 100644 index 0000000..af8a8a7 --- /dev/null +++ b/graph/sql/iterator.go @@ -0,0 +1,270 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "database/sql" + "fmt" + + "github.com/barakmich/glog" + + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +type Iterator struct { + uid uint64 + tags graph.Tagger + qs *QuadStore + dir quad.Direction + val graph.Value + size int64 + isAll bool + table string + cursor *sql.Rows + result graph.Value + err error +} + +func (it *Iterator) makeCursor() { + var cursor *sql.Rows + var err error + if it.cursor != nil { + it.cursor.Close() + } + if it.isAll { + if it.table == "quads" { + cursor, err = it.qs.db.Query(`SELECT subject, predicate, object, label FROM quads;`) + if err != nil { + glog.Errorln("Couldn't get cursor from SQL database: %v", err) + cursor = nil + } + } else { + glog.V(4).Infoln("sql: getting node query") + cursor, err = it.qs.db.Query(`SELECT node FROM + ( + SELECT subject FROM quads + UNION + SELECT predicate FROM quads + UNION + SELECT object FROM quads + UNION + SELECT label FROM quads + ) AS DistinctNodes (node) WHERE node IS NOT NULL;`) + if err != nil { + glog.Errorln("Couldn't get cursor from SQL database: %v", err) + cursor = nil + } + glog.V(4).Infoln("sql: got node query") + } + } else { + cursor, err = it.qs.db.Query( + fmt.Sprintf("SELECT subject, predicate, object, label FROM quads WHERE %s = $1;", it.dir.String()), it.val.(string)) + if err != nil { + glog.Errorln("Couldn't get cursor from SQL database: %v", err) + cursor = nil + } + } + it.cursor = cursor +} + +func NewIterator(qs *QuadStore, d quad.Direction, val graph.Value) *Iterator { + it := &Iterator{ + uid: iterator.NextUID(), + qs: qs, + dir: d, + size: -1, + val: val, + table: "quads", + isAll: false, + } + return it +} + +func NewAllIterator(qs *QuadStore, table string) *Iterator { + var size int64 + it := &Iterator{ + uid: iterator.NextUID(), + qs: qs, + dir: quad.Any, + size: size, + table: table, + isAll: true, + } + return it +} + +func (it *Iterator) UID() uint64 { + return it.uid +} + +func (it *Iterator) Reset() { + it.err = nil + it.Close() +} + +func (it *Iterator) Err() error { + return it.err +} + +func (it *Iterator) Close() error { + if it.cursor != nil { + err := it.cursor.Close() + if err != nil { + return err + } + it.cursor = nil + } + return nil +} + +func (it *Iterator) Tagger() *graph.Tagger { + return &it.tags +} + +func (it *Iterator) TagResults(dst map[string]graph.Value) { + for _, tag := range it.tags.Tags() { + dst[tag] = it.Result() + } + + for tag, value := range it.tags.Fixed() { + dst[tag] = value + } +} + +func (it *Iterator) Clone() graph.Iterator { + var m *Iterator + if it.isAll { + m = NewAllIterator(it.qs, it.table) + } else { + m = NewIterator(it.qs, it.dir, it.val) + } + m.tags.CopyFrom(it) + return m +} + +func (it *Iterator) SubIterators() []graph.Iterator { + return nil +} + +func (it *Iterator) Next() bool { + graph.NextLogIn(it) + if it.cursor == nil { + it.makeCursor() + } + if !it.cursor.Next() { + glog.V(4).Infoln("sql: No next") + err := it.cursor.Err() + if err != nil { + glog.Errorf("Cursor error in SQL: %v", err) + it.err = err + } + it.cursor.Close() + return false + } + if it.table == "nodes" { + var node string + err := it.cursor.Scan(&node) + if err != nil { + glog.Errorf("Error nexting node iterator: %v", err) + it.err = err + return false + } + it.result = node + return true + } + var q quad.Quad + err := it.cursor.Scan(&q.Subject, &q.Predicate, &q.Object, &q.Label) + if err != nil { + glog.Errorf("Error scanning sql iterator: %v", err) + it.err = err + return false + } + it.result = q + return graph.NextLogOut(it, it.result, true) +} + +func (it *Iterator) Contains(v graph.Value) bool { + graph.ContainsLogIn(it, v) + if it.isAll { + return graph.ContainsLogOut(it, v, true) + } + q := v.(quad.Quad) + if q.Get(it.dir) == it.val.(string) { + return graph.ContainsLogOut(it, v, true) + } + return graph.ContainsLogOut(it, v, false) +} + +func (it *Iterator) Size() (int64, bool) { + if it.size != -1 { + return it.size, true + } + it.size = it.qs.sizeForIterator(it.isAll, it.dir, it.val.(string)) + return it.size, true +} + +func (it *Iterator) Result() graph.Value { + return it.result +} + +func (it *Iterator) NextPath() bool { + return false +} + +var sqlType graph.Type + +func init() { + sqlType = graph.RegisterIterator("sql") +} + +func Type() graph.Type { return sqlType } + +func (it *Iterator) Type() graph.Type { + if it.isAll { + return graph.All + } + return sqlType +} + +func (it *Iterator) Sorted() bool { return true } +func (it *Iterator) Optimize() (graph.Iterator, bool) { return it, false } + +func (it *Iterator) Describe() graph.Description { + size, _ := it.Size() + return graph.Description{ + UID: it.UID(), + Name: fmt.Sprintf("%s/%s", it.val, it.dir), + Type: it.Type(), + Size: size, + } +} + +func (it *Iterator) Stats() graph.IteratorStats { + size, _ := it.Size() + if it.table == "nodes" { + return graph.IteratorStats{ + ContainsCost: 1, + NextCost: 9999, + Size: size, + } + } + return graph.IteratorStats{ + ContainsCost: 1, + NextCost: 5, + Size: size, + } +} diff --git a/graph/sql/lru.go b/graph/sql/lru.go new file mode 100644 index 0000000..e3aca7f --- /dev/null +++ b/graph/sql/lru.go @@ -0,0 +1,63 @@ +// Copyright 2014 The Cayley Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "container/list" +) + +// cache implements an LRU cache. +type cache struct { + cache map[string]*list.Element + priority *list.List + maxSize int +} + +type kv struct { + key string + value int64 +} + +func newCache(size int) *cache { + var lru cache + lru.maxSize = size + lru.priority = list.New() + lru.cache = make(map[string]*list.Element) + return &lru +} + +func (lru *cache) Put(key string, value int64) { + if _, ok := lru.Get(key); ok { + return + } + if len(lru.cache) == lru.maxSize { + lru.removeOldest() + } + lru.priority.PushFront(kv{key: key, value: value}) + lru.cache[key] = lru.priority.Front() +} + +func (lru *cache) Get(key string) (int64, bool) { + if element, ok := lru.cache[key]; ok { + lru.priority.MoveToFront(element) + return element.Value.(kv).value, true + } + return 0, false +} + +func (lru *cache) removeOldest() { + last := lru.priority.Remove(lru.priority.Back()) + delete(lru.cache, last.(kv).key) +} diff --git a/graph/sql/quadstore.go b/graph/sql/quadstore.go new file mode 100644 index 0000000..e6b7162 --- /dev/null +++ b/graph/sql/quadstore.go @@ -0,0 +1,295 @@ +package sql + +import ( + "database/sql" + "fmt" + + "github.com/lib/pq" + + "github.com/barakmich/glog" + "github.com/google/cayley/graph" + "github.com/google/cayley/graph/iterator" + "github.com/google/cayley/quad" +) + +const QuadStoreType = "sql" + +func init() { + graph.RegisterQuadStore(QuadStoreType, true, newQuadStore, createSQLTables, nil) +} + +type QuadStore struct { + db *sql.DB + sqlFlavor string + size int64 + lru *cache +} + +func connectSQLTables(addr string, _ graph.Options) (*sql.DB, error) { + // TODO(barakmich): Parse options for more friendly addr, other SQLs. + conn, err := sql.Open("postgres", addr) + if err != nil { + glog.Errorf("Couldn't open database at %s: %#v", addr, err) + return nil, err + } + return conn, nil +} + +func createSQLTables(addr string, options graph.Options) error { + conn, err := connectSQLTables(addr, options) + if err != nil { + return err + } + tx, err := conn.Begin() + if err != nil { + glog.Errorf("Couldn't begin creation transaction: %s", err) + return err + } + + quadTable, err := tx.Exec(` + CREATE TABLE quads ( + subject TEXT NOT NULL, + predicate TEXT NOT NULL, + object TEXT NOT NULL, + label TEXT, + horizon BIGSERIAL PRIMARY KEY, + id BIGINT, + ts timestamp, + UNIQUE(subject, predicate, object, label) + );`) + if err != nil { + glog.Errorf("Cannot create quad table: %v", quadTable) + return err + } + index, err := tx.Exec(` + CREATE INDEX pos_index ON quads (predicate, object, subject) WITH (FILLFACTOR = 50); + CREATE INDEX osp_index ON quads (object, subject, predicate) WITH (FILLFACTOR = 50); + `) + if err != nil { + glog.Errorf("Cannot create indices: %v", index) + return err + } + tx.Commit() + return nil +} + +func newQuadStore(addr string, options graph.Options) (graph.QuadStore, error) { + var qs QuadStore + conn, err := connectSQLTables(addr, options) + if err != nil { + return nil, err + } + qs.db = conn + qs.sqlFlavor = "postgres" + qs.size = -1 + qs.lru = newCache(1024) + return &qs, nil +} + +func (qs *QuadStore) copyFrom(tx *sql.Tx, in []graph.Delta) error { + stmt, err := tx.Prepare(pq.CopyIn("quads", "subject", "predicate", "object", "label", "id", "ts")) + if err != nil { + return err + } + for _, d := range in { + _, err := stmt.Exec(d.Quad.Subject, d.Quad.Predicate, d.Quad.Object, d.Quad.Label, d.ID.Int(), d.Timestamp) + if err != nil { + glog.Errorf("couldn't prepare COPY statement: %v", err) + return err + } + } + _, err = stmt.Exec() + if err != nil { + return err + } + return stmt.Close() +} + +func (qs *QuadStore) buildTxPostgres(tx *sql.Tx, in []graph.Delta) error { + allAdds := true + for _, d := range in { + if d.Action != graph.Add { + allAdds = false + } + } + if allAdds { + return qs.copyFrom(tx, in) + } + + insert, err := tx.Prepare(`INSERT INTO quads(subject, predicate, object, label, id, ts) VALUES ($1, $2, $3, $4, $5, $6)`) + if err != nil { + glog.Errorf("Cannot prepare insert statement: %v", err) + return err + } + for _, d := range in { + switch d.Action { + case graph.Add: + _, err := insert.Exec(d.Quad.Subject, d.Quad.Predicate, d.Quad.Object, d.Quad.Label, d.ID.Int(), d.Timestamp) + if err != nil { + glog.Errorf("couldn't prepare INSERT statement: %v", err) + return err + } + //for _, dir := range quad.Directions { + //_, err := tx.Exec(` + //WITH upsert AS (UPDATE nodes SET size=size+1 WHERE node=$1 RETURNING *) + //INSERT INTO nodes (node, size) SELECT $1, 1 WHERE NOT EXISTS (SELECT * FROM UPSERT); + //`, d.Quad.Get(dir)) + //if err != nil { + //glog.Errorf("couldn't prepare upsert statement in direction %s: %v", dir, err) + //return err + //} + //} + case graph.Delete: + _, err := tx.Exec(`DELETE FROM quads WHERE subject=$1 and predicate=$2 and object=$3 and label=$4;`, + d.Quad.Subject, d.Quad.Predicate, d.Quad.Object, d.Quad.Label) + if err != nil { + glog.Errorf("couldn't prepare DELETE statement: %v", err) + } + //for _, dir := range quad.Directions { + //tx.Exec(`UPDATE nodes SET size=size-1 WHERE node=$1;`, d.Quad.Get(dir)) + //} + default: + panic("unknown action") + } + } + return nil +} + +func (qs *QuadStore) ApplyDeltas(in []graph.Delta, _ graph.IgnoreOpts) error { + // TODO(barakmich): Support ignoreOpts? "ON CONFLICT IGNORE" + tx, err := qs.db.Begin() + if err != nil { + glog.Errorf("couldn't begin write transaction: %v", err) + return err + } + switch qs.sqlFlavor { + case "postgres": + err = qs.buildTxPostgres(tx, in) + if err != nil { + return err + } + default: + panic("no support for flavor: " + qs.sqlFlavor) + } + return tx.Commit() +} + +func (qs *QuadStore) Quad(val graph.Value) quad.Quad { + return val.(quad.Quad) +} + +func (qs *QuadStore) QuadIterator(d quad.Direction, val graph.Value) graph.Iterator { + return NewIterator(qs, d, val) +} + +func (qs *QuadStore) NodesAllIterator() graph.Iterator { + return NewAllIterator(qs, "nodes") +} + +func (qs *QuadStore) QuadsAllIterator() graph.Iterator { + return NewAllIterator(qs, "quads") +} + +func (qs *QuadStore) ValueOf(s string) graph.Value { + return s +} + +func (qs *QuadStore) NameOf(v graph.Value) string { + return v.(string) +} + +func (qs *QuadStore) Size() int64 { + // TODO(barakmich): Sync size with writes. + if qs.size != -1 { + return qs.size + } + c := qs.db.QueryRow("SELECT COUNT(*) FROM quads;") + err := c.Scan(&qs.size) + if err != nil { + glog.Errorf("Couldn't execute COUNT: %v", err) + return 0 + } + return qs.size +} + +func (qs *QuadStore) Horizon() graph.PrimaryKey { + var horizon int64 + err := qs.db.QueryRow("SELECT horizon FROM quads ORDER BY horizon DESC LIMIT 1;").Scan(&horizon) + if err != nil { + glog.Errorf("Couldn't execute horizon: %v", err) + return graph.NewSequentialKey(0) + } + return graph.NewSequentialKey(horizon) +} + +func (qs *QuadStore) FixedIterator() graph.FixedIterator { + return iterator.NewFixed(iterator.Identity) +} + +func (qs *QuadStore) Close() { + qs.db.Close() +} + +func (qs *QuadStore) QuadDirection(in graph.Value, d quad.Direction) graph.Value { + q := in.(quad.Quad) + return q.Get(d) +} + +func (qs *QuadStore) Type() string { + return QuadStoreType +} + +func (qs *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { + switch it.Type() { + case graph.LinksTo: + return qs.optimizeLinksTo(it.(*iterator.LinksTo)) + + } + return it, false +} + +func (qs *QuadStore) optimizeLinksTo(it *iterator.LinksTo) (graph.Iterator, bool) { + subs := it.SubIterators() + if len(subs) != 1 { + return it, false + } + primary := subs[0] + if primary.Type() == graph.Fixed { + size, _ := primary.Size() + if size == 1 { + if !graph.Next(primary) { + panic("unexpected size during optimize") + } + val := primary.Result() + newIt := qs.QuadIterator(it.Direction(), val) + nt := newIt.Tagger() + nt.CopyFrom(it) + for _, tag := range primary.Tagger().Tags() { + nt.AddFixed(tag, val) + } + it.Close() + return newIt, true + } + } + return it, false +} + +func (qs *QuadStore) sizeForIterator(isAll bool, dir quad.Direction, val string) int64 { + var err error + if isAll { + return qs.Size() + } + if val, ok := qs.lru.Get(val + string(dir.Prefix())); ok { + return val + } + var size int64 + glog.V(4).Infoln("sql: getting size for select %s, %s", dir.String(), val) + err = qs.db.QueryRow( + fmt.Sprintf("SELECT count(*) FROM quads WHERE %s = $1;", dir.String()), val).Scan(&size) + if err != nil { + glog.Errorln("Error getting size from SQL database: %v", err) + return 0 + } + qs.lru.Put(val+string(dir.Prefix()), size) + return size +} diff --git a/quad/quad.go b/quad/quad.go index e448cf2..62e22ca 100644 --- a/quad/quad.go +++ b/quad/quad.go @@ -66,6 +66,8 @@ const ( Label ) +var Directions = []Direction{Subject, Predicate, Object, Label} + func (d Direction) Prefix() byte { switch d { case Any: diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index 583b8d0..0631d41 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -281,9 +281,9 @@ func (wk *worker) runIterator(it graph.Iterator) { if glog.V(2) { b, err := json.MarshalIndent(it.Describe(), "", " ") if err != nil { - glog.Infof("failed to format description: %v", err) + glog.V(2).Infof("failed to format description: %v", err) } else { - glog.Infof("%s", b) + glog.V(2).Infof("%s", b) } } for {