Merge branch 'master' into nexter
Conflicts: graph/leveldb/all_iterator.go graph/leveldb/iterator.go graph/memstore/triplestore.go query/gremlin/finals.go
This commit is contained in:
commit
62785d25c2
37 changed files with 882 additions and 467 deletions
|
|
@ -84,7 +84,7 @@ func setupGremlin(env *otto.Otto, ses *Session) {
|
|||
graph.Set("Emit", func(call otto.FunctionCall) otto.Value {
|
||||
value := call.Argument(0)
|
||||
if value.IsDefined() {
|
||||
ses.SendResult(&GremlinResult{metaresult: false, err: "", val: &value, actualResults: nil})
|
||||
ses.SendResult(&Result{val: &value})
|
||||
}
|
||||
return otto.NullValue()
|
||||
})
|
||||
|
|
|
|||
|
|
@ -148,8 +148,10 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string
|
|||
count := 0
|
||||
it, _ = it.Optimize()
|
||||
for {
|
||||
if ses.doHalt {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
if !graph.Next(it) {
|
||||
break
|
||||
|
|
@ -161,9 +163,11 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string
|
|||
if limit >= 0 && count >= limit {
|
||||
break
|
||||
}
|
||||
for it.NextPath() == true {
|
||||
if ses.doHalt {
|
||||
for it.NextPath() {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
|
|
@ -183,8 +187,10 @@ func runIteratorToArrayNoTags(it graph.Iterator, ses *Session, limit int) []stri
|
|||
count := 0
|
||||
it, _ = it.Optimize()
|
||||
for {
|
||||
if ses.doHalt {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
if !graph.Next(it) {
|
||||
break
|
||||
|
|
@ -203,8 +209,10 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu
|
|||
count := 0
|
||||
it, _ = it.Optimize()
|
||||
for {
|
||||
if ses.doHalt {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
return
|
||||
default:
|
||||
}
|
||||
if !graph.Next(it) {
|
||||
break
|
||||
|
|
@ -217,9 +225,11 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu
|
|||
if limit >= 0 && count >= limit {
|
||||
break
|
||||
}
|
||||
for it.NextPath() == true {
|
||||
if ses.doHalt {
|
||||
for it.NextPath() {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
return
|
||||
default:
|
||||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
|
|
@ -235,34 +245,35 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu
|
|||
}
|
||||
|
||||
func runIteratorOnSession(it graph.Iterator, ses *Session) {
|
||||
if ses.lookingForQueryShape {
|
||||
iterator.OutputQueryShapeForIterator(it, ses.ts, ses.queryShape)
|
||||
if ses.wantShape {
|
||||
iterator.OutputQueryShapeForIterator(it, ses.ts, ses.shape)
|
||||
return
|
||||
}
|
||||
it, _ = it.Optimize()
|
||||
glog.V(2).Infoln(it.DebugString(0))
|
||||
for {
|
||||
// TODO(barakmich): Better halting.
|
||||
if ses.doHalt {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
return
|
||||
default:
|
||||
}
|
||||
if !graph.Next(it) {
|
||||
break
|
||||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
cont := ses.SendResult(&GremlinResult{metaresult: false, err: "", val: nil, actualResults: &tags})
|
||||
if !cont {
|
||||
if !ses.SendResult(&Result{actualResults: &tags}) {
|
||||
break
|
||||
}
|
||||
for it.NextPath() == true {
|
||||
if ses.doHalt {
|
||||
for it.NextPath() {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
return
|
||||
default:
|
||||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
cont := ses.SendResult(&GremlinResult{metaresult: false, err: "", val: nil, actualResults: &tags})
|
||||
if !cont {
|
||||
if !ses.SendResult(&Result{actualResults: &tags}) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,8 +20,9 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/google/cayley/graph"
|
||||
_ "github.com/google/cayley/graph/memstore"
|
||||
"github.com/google/cayley/quad"
|
||||
|
||||
_ "github.com/google/cayley/graph/memstore"
|
||||
)
|
||||
|
||||
// This is a simple test graph.
|
||||
|
|
@ -37,7 +38,7 @@ import (
|
|||
// \-->|#D#|------------->+---+
|
||||
// +---+
|
||||
//
|
||||
var simpleGraph = []*quad.Quad{
|
||||
var simpleGraph = []quad.Quad{
|
||||
{"A", "follows", "B", ""},
|
||||
{"C", "follows", "B", ""},
|
||||
{"C", "follows", "D", ""},
|
||||
|
|
@ -51,7 +52,7 @@ var simpleGraph = []*quad.Quad{
|
|||
{"G", "status", "cool", "status_graph"},
|
||||
}
|
||||
|
||||
func makeTestSession(data []*quad.Quad) *Session {
|
||||
func makeTestSession(data []quad.Quad) *Session {
|
||||
ts, _ := graph.NewTripleStore("memstore", "", nil)
|
||||
for _, t := range data {
|
||||
ts.AddTriple(t)
|
||||
|
|
@ -245,14 +246,14 @@ var testQueries = []struct {
|
|||
},
|
||||
}
|
||||
|
||||
func runQueryGetTag(g []*quad.Quad, query string, tag string) []string {
|
||||
func runQueryGetTag(g []quad.Quad, query string, tag string) []string {
|
||||
js := makeTestSession(g)
|
||||
c := make(chan interface{}, 5)
|
||||
js.ExecInput(query, c, -1)
|
||||
|
||||
var results []string
|
||||
for res := range c {
|
||||
data := res.(*GremlinResult)
|
||||
data := res.(*Result)
|
||||
if data.val == nil {
|
||||
val := (*data.actualResults)[tag]
|
||||
if val != nil {
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/robertkrimen/otto"
|
||||
|
|
@ -26,45 +27,42 @@ import (
|
|||
"github.com/google/cayley/query"
|
||||
)
|
||||
|
||||
var ErrKillTimeout = errors.New("query timed out")
|
||||
|
||||
type Session struct {
|
||||
ts graph.TripleStore
|
||||
currentChannel chan interface{}
|
||||
env *otto.Otto
|
||||
debug bool
|
||||
limit int
|
||||
count int
|
||||
dataOutput []interface{}
|
||||
lookingForQueryShape bool
|
||||
queryShape map[string]interface{}
|
||||
err error
|
||||
script *otto.Script
|
||||
doHalt bool
|
||||
timeoutSec time.Duration
|
||||
emptyEnv *otto.Otto
|
||||
ts graph.TripleStore
|
||||
results chan interface{}
|
||||
env *otto.Otto
|
||||
envLock sync.Mutex
|
||||
debug bool
|
||||
limit int
|
||||
count int
|
||||
dataOutput []interface{}
|
||||
wantShape bool
|
||||
shape map[string]interface{}
|
||||
err error
|
||||
script *otto.Script
|
||||
kill chan struct{}
|
||||
timeout time.Duration
|
||||
emptyEnv *otto.Otto
|
||||
}
|
||||
|
||||
func NewSession(inputTripleStore graph.TripleStore, timeoutSec int, persist bool) *Session {
|
||||
var g Session
|
||||
g.ts = inputTripleStore
|
||||
func NewSession(ts graph.TripleStore, timeout time.Duration, persist bool) *Session {
|
||||
g := Session{
|
||||
ts: ts,
|
||||
limit: -1,
|
||||
timeout: timeout,
|
||||
}
|
||||
g.env = BuildEnviron(&g)
|
||||
g.limit = -1
|
||||
g.count = 0
|
||||
g.lookingForQueryShape = false
|
||||
if persist {
|
||||
g.emptyEnv = g.env
|
||||
}
|
||||
if timeoutSec < 0 {
|
||||
g.timeoutSec = time.Duration(-1)
|
||||
} else {
|
||||
g.timeoutSec = time.Duration(timeoutSec)
|
||||
}
|
||||
g.ClearJson()
|
||||
return &g
|
||||
}
|
||||
|
||||
type GremlinResult struct {
|
||||
type Result struct {
|
||||
metaresult bool
|
||||
err string
|
||||
err error
|
||||
val *otto.Value
|
||||
actualResults *map[string]graph.Value
|
||||
}
|
||||
|
|
@ -73,13 +71,13 @@ func (s *Session) ToggleDebug() {
|
|||
s.debug = !s.debug
|
||||
}
|
||||
|
||||
func (s *Session) GetQuery(input string, output_struct chan map[string]interface{}) {
|
||||
defer close(output_struct)
|
||||
s.queryShape = make(map[string]interface{})
|
||||
s.lookingForQueryShape = true
|
||||
func (s *Session) GetQuery(input string, out chan map[string]interface{}) {
|
||||
defer close(out)
|
||||
s.shape = make(map[string]interface{})
|
||||
s.wantShape = true
|
||||
s.env.Run(input)
|
||||
output_struct <- s.queryShape
|
||||
s.queryShape = nil
|
||||
out <- s.shape
|
||||
s.shape = nil
|
||||
}
|
||||
|
||||
func (s *Session) InputParses(input string) (query.ParseResult, error) {
|
||||
|
|
@ -91,15 +89,17 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) {
|
|||
return query.Parsed, nil
|
||||
}
|
||||
|
||||
func (s *Session) SendResult(result *GremlinResult) bool {
|
||||
func (s *Session) SendResult(r *Result) bool {
|
||||
if s.limit >= 0 && s.limit == s.count {
|
||||
return false
|
||||
}
|
||||
if s.doHalt {
|
||||
select {
|
||||
case <-s.kill:
|
||||
return false
|
||||
default:
|
||||
}
|
||||
if s.currentChannel != nil {
|
||||
s.currentChannel <- result
|
||||
if s.results != nil {
|
||||
s.results <- r
|
||||
s.count++
|
||||
if s.limit >= 0 && s.limit == s.count {
|
||||
return false
|
||||
|
|
@ -110,42 +110,46 @@ func (s *Session) SendResult(result *GremlinResult) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
var halt = errors.New("Query Timeout")
|
||||
|
||||
func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
|
||||
s.doHalt = false
|
||||
s.kill = make(chan struct{})
|
||||
defer func() {
|
||||
if caught := recover(); caught != nil {
|
||||
if caught == halt {
|
||||
s.err = halt
|
||||
if r := recover(); r != nil {
|
||||
if r == ErrKillTimeout {
|
||||
s.err = ErrKillTimeout
|
||||
return
|
||||
}
|
||||
panic(caught) // Something else happened, repanic!
|
||||
panic(r)
|
||||
}
|
||||
}()
|
||||
|
||||
s.env.Interrupt = make(chan func(), 1) // The buffer prevents blocking
|
||||
// Use buffered chan to prevent blocking.
|
||||
s.env.Interrupt = make(chan func(), 1)
|
||||
|
||||
if s.timeoutSec != -1 {
|
||||
if s.timeout >= 0 {
|
||||
go func() {
|
||||
time.Sleep(s.timeoutSec * time.Second) // Stop after two seconds
|
||||
s.doHalt = true
|
||||
time.Sleep(s.timeout)
|
||||
close(s.kill)
|
||||
s.envLock.Lock()
|
||||
defer s.envLock.Unlock()
|
||||
if s.env != nil {
|
||||
s.env.Interrupt <- func() {
|
||||
panic(halt)
|
||||
panic(ErrKillTimeout)
|
||||
}
|
||||
s.env = s.emptyEnv
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return s.env.Run(input) // Here be dragons (risky code)
|
||||
s.envLock.Lock()
|
||||
env := s.env
|
||||
s.envLock.Unlock()
|
||||
return env.Run(input)
|
||||
}
|
||||
|
||||
func (s *Session) ExecInput(input string, out chan interface{}, limit int) {
|
||||
defer close(out)
|
||||
s.err = nil
|
||||
s.currentChannel = out
|
||||
s.results = out
|
||||
var err error
|
||||
var value otto.Value
|
||||
if s.script == nil {
|
||||
|
|
@ -153,28 +157,23 @@ func (s *Session) ExecInput(input string, out chan interface{}, limit int) {
|
|||
} else {
|
||||
value, err = s.runUnsafe(s.script)
|
||||
}
|
||||
if err != nil {
|
||||
out <- &GremlinResult{metaresult: true,
|
||||
err: err.Error(),
|
||||
val: &value,
|
||||
actualResults: nil}
|
||||
} else {
|
||||
out <- &GremlinResult{metaresult: true,
|
||||
err: "",
|
||||
val: &value,
|
||||
actualResults: nil}
|
||||
out <- &Result{
|
||||
metaresult: true,
|
||||
err: err,
|
||||
val: &value,
|
||||
}
|
||||
s.currentChannel = nil
|
||||
s.results = nil
|
||||
s.script = nil
|
||||
s.envLock.Lock()
|
||||
s.env = s.emptyEnv
|
||||
return
|
||||
s.envLock.Unlock()
|
||||
}
|
||||
|
||||
func (s *Session) ToText(result interface{}) string {
|
||||
data := result.(*GremlinResult)
|
||||
data := result.(*Result)
|
||||
if data.metaresult {
|
||||
if data.err != "" {
|
||||
return fmt.Sprintln("Error: ", data.err)
|
||||
if data.err != nil {
|
||||
return fmt.Sprintf("Error: %v\n", data.err)
|
||||
}
|
||||
if data.val != nil {
|
||||
s, _ := data.val.Export()
|
||||
|
|
@ -221,8 +220,8 @@ func (s *Session) ToText(result interface{}) string {
|
|||
}
|
||||
|
||||
// Web stuff
|
||||
func (ses *Session) BuildJson(result interface{}) {
|
||||
data := result.(*GremlinResult)
|
||||
func (s *Session) BuildJson(result interface{}) {
|
||||
data := result.(*Result)
|
||||
if !data.metaresult {
|
||||
if data.val == nil {
|
||||
obj := make(map[string]string)
|
||||
|
|
@ -235,33 +234,34 @@ func (ses *Session) BuildJson(result interface{}) {
|
|||
}
|
||||
sort.Strings(tagKeys)
|
||||
for _, k := range tagKeys {
|
||||
obj[k] = ses.ts.NameOf((*tags)[k])
|
||||
obj[k] = s.ts.NameOf((*tags)[k])
|
||||
}
|
||||
ses.dataOutput = append(ses.dataOutput, obj)
|
||||
s.dataOutput = append(s.dataOutput, obj)
|
||||
} else {
|
||||
if data.val.IsObject() {
|
||||
export, _ := data.val.Export()
|
||||
ses.dataOutput = append(ses.dataOutput, export)
|
||||
s.dataOutput = append(s.dataOutput, export)
|
||||
} else {
|
||||
strVersion, _ := data.val.ToString()
|
||||
ses.dataOutput = append(ses.dataOutput, strVersion)
|
||||
s.dataOutput = append(s.dataOutput, strVersion)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (ses *Session) GetJson() ([]interface{}, error) {
|
||||
defer ses.ClearJson()
|
||||
if ses.err != nil {
|
||||
return nil, ses.err
|
||||
func (s *Session) GetJson() ([]interface{}, error) {
|
||||
defer s.ClearJson()
|
||||
if s.err != nil {
|
||||
return nil, s.err
|
||||
}
|
||||
if ses.doHalt {
|
||||
return nil, halt
|
||||
select {
|
||||
case <-s.kill:
|
||||
return nil, ErrKillTimeout
|
||||
default:
|
||||
return s.dataOutput, nil
|
||||
}
|
||||
return ses.dataOutput, nil
|
||||
}
|
||||
|
||||
func (ses *Session) ClearJson() {
|
||||
ses.dataOutput = nil
|
||||
func (s *Session) ClearJson() {
|
||||
s.dataOutput = nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ import (
|
|||
// \-->|#D#|------------->+---+
|
||||
// +---+
|
||||
//
|
||||
var simpleGraph = []*quad.Quad{
|
||||
var simpleGraph = []quad.Quad{
|
||||
{"A", "follows", "B", ""},
|
||||
{"C", "follows", "B", ""},
|
||||
{"C", "follows", "D", ""},
|
||||
|
|
@ -51,7 +51,7 @@ var simpleGraph = []*quad.Quad{
|
|||
{"G", "status", "cool", "status_graph"},
|
||||
}
|
||||
|
||||
func makeTestSession(data []*quad.Quad) *Session {
|
||||
func makeTestSession(data []quad.Quad) *Session {
|
||||
ts, _ := graph.NewTripleStore("memstore", "", nil)
|
||||
for _, t := range data {
|
||||
ts.AddTriple(t)
|
||||
|
|
@ -165,7 +165,7 @@ var testQueries = []struct {
|
|||
},
|
||||
}
|
||||
|
||||
func runQuery(g []*quad.Quad, query string) interface{} {
|
||||
func runQuery(g []quad.Quad, query string) interface{} {
|
||||
s := makeTestSession(g)
|
||||
c := make(chan interface{}, 5)
|
||||
go s.ExecInput(query, c, -1)
|
||||
|
|
|
|||
|
|
@ -32,21 +32,21 @@ func TestBadParse(t *testing.T) {
|
|||
|
||||
var testQueries = []struct {
|
||||
message string
|
||||
add *quad.Quad
|
||||
add quad.Quad
|
||||
query string
|
||||
typ graph.Type
|
||||
expect string
|
||||
}{
|
||||
{
|
||||
message: "get a single triple linkage",
|
||||
add: &quad.Quad{"i", "can", "win", ""},
|
||||
add: quad.Quad{"i", "can", "win", ""},
|
||||
query: "($a (:can \"win\"))",
|
||||
typ: graph.And,
|
||||
expect: "i",
|
||||
},
|
||||
{
|
||||
message: "get a single triple linkage",
|
||||
add: &quad.Quad{"i", "can", "win", ""},
|
||||
add: quad.Quad{"i", "can", "win", ""},
|
||||
query: "(\"i\" (:can $a))",
|
||||
typ: graph.And,
|
||||
expect: "i",
|
||||
|
|
@ -60,7 +60,7 @@ func TestMemstoreBackedSexp(t *testing.T) {
|
|||
t.Errorf(`Incorrect type for empty query, got:%q expect: "null"`, it.Type())
|
||||
}
|
||||
for _, test := range testQueries {
|
||||
if test.add != nil {
|
||||
if test.add.IsValid() {
|
||||
ts.AddTriple(test.add)
|
||||
}
|
||||
it := BuildIteratorTreeForQuery(ts, test.query)
|
||||
|
|
@ -79,8 +79,8 @@ func TestMemstoreBackedSexp(t *testing.T) {
|
|||
|
||||
func TestTreeConstraintParse(t *testing.T) {
|
||||
ts, _ := graph.NewTripleStore("memstore", "", nil)
|
||||
ts.AddTriple(&quad.Quad{"i", "like", "food", ""})
|
||||
ts.AddTriple(&quad.Quad{"food", "is", "good", ""})
|
||||
ts.AddTriple(quad.Quad{"i", "like", "food", ""})
|
||||
ts.AddTriple(quad.Quad{"food", "is", "good", ""})
|
||||
query := "(\"i\"\n" +
|
||||
"(:like\n" +
|
||||
"($a (:is :good))))"
|
||||
|
|
@ -99,8 +99,8 @@ func TestTreeConstraintParse(t *testing.T) {
|
|||
|
||||
func TestTreeConstraintTagParse(t *testing.T) {
|
||||
ts, _ := graph.NewTripleStore("memstore", "", nil)
|
||||
ts.AddTriple(&quad.Quad{"i", "like", "food", ""})
|
||||
ts.AddTriple(&quad.Quad{"food", "is", "good", ""})
|
||||
ts.AddTriple(quad.Quad{"i", "like", "food", ""})
|
||||
ts.AddTriple(quad.Quad{"food", "is", "good", ""})
|
||||
query := "(\"i\"\n" +
|
||||
"(:like\n" +
|
||||
"($a (:is :good))))"
|
||||
|
|
@ -118,7 +118,7 @@ func TestTreeConstraintTagParse(t *testing.T) {
|
|||
|
||||
func TestMultipleConstraintParse(t *testing.T) {
|
||||
ts, _ := graph.NewTripleStore("memstore", "", nil)
|
||||
for _, tv := range []*quad.Quad{
|
||||
for _, tv := range []quad.Quad{
|
||||
{"i", "like", "food", ""},
|
||||
{"i", "like", "beer", ""},
|
||||
{"you", "like", "beer", ""},
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue