Merge pull request #145 from kortschak/gremlin
Fix gremlin timeout handling
This commit is contained in:
commit
1e62aaf374
7 changed files with 272 additions and 283 deletions
|
|
@ -25,21 +25,15 @@ import (
|
|||
"github.com/google/cayley/quad"
|
||||
)
|
||||
|
||||
func getStrings(obj *otto.Object, field string) []string {
|
||||
strings := make([]string, 0)
|
||||
val, _ := obj.Get(field)
|
||||
if !val.IsUndefined() {
|
||||
export, _ := val.Export()
|
||||
array := export.([]interface{})
|
||||
for _, arg := range array {
|
||||
strings = append(strings, arg.(string))
|
||||
}
|
||||
func propertiesOf(obj *otto.Object, name string) []string {
|
||||
val, _ := obj.Get(name)
|
||||
if val.IsUndefined() {
|
||||
return nil
|
||||
}
|
||||
return strings
|
||||
export, _ := val.Export()
|
||||
return export.([]string)
|
||||
}
|
||||
|
||||
func getStringArgs(obj *otto.Object) []string { return getStrings(obj, "string_args") }
|
||||
|
||||
func buildIteratorTree(obj *otto.Object, ts graph.TripleStore) graph.Iterator {
|
||||
if !isVertexChain(obj) {
|
||||
return iterator.NewNull()
|
||||
|
|
@ -47,22 +41,18 @@ func buildIteratorTree(obj *otto.Object, ts graph.TripleStore) graph.Iterator {
|
|||
return buildIteratorTreeHelper(obj, ts, iterator.NewNull())
|
||||
}
|
||||
|
||||
func makeListOfStringsFromArrayValue(obj *otto.Object) []string {
|
||||
func stringsFrom(obj *otto.Object) []string {
|
||||
var output []string
|
||||
lengthValue, _ := obj.Get("length")
|
||||
length, _ := lengthValue.ToInteger()
|
||||
ulength := uint32(length)
|
||||
for index := uint32(0); index < ulength; index += 1 {
|
||||
name := strconv.FormatInt(int64(index), 10)
|
||||
for i := uint32(0); i < ulength; i++ {
|
||||
name := strconv.FormatInt(int64(i), 10)
|
||||
value, err := obj.Get(name)
|
||||
if err != nil {
|
||||
if err != nil || !value.IsString() {
|
||||
continue
|
||||
}
|
||||
if !value.IsString() {
|
||||
continue
|
||||
}
|
||||
s, _ := value.ToString()
|
||||
output = append(output, s)
|
||||
output = append(output, value.String())
|
||||
}
|
||||
return output
|
||||
}
|
||||
|
|
@ -87,7 +77,7 @@ func buildIteratorFromValue(val otto.Value, ts graph.TripleStore) graph.Iterator
|
|||
return buildIteratorTree(val.Object(), ts)
|
||||
case "Array":
|
||||
// Had better be an array of strings
|
||||
strings := makeListOfStringsFromArrayValue(val.Object())
|
||||
strings := stringsFrom(val.Object())
|
||||
it := ts.FixedIterator()
|
||||
for _, x := range strings {
|
||||
it.Add(ts.ValueOf(x))
|
||||
|
|
@ -101,8 +91,7 @@ func buildIteratorFromValue(val otto.Value, ts graph.TripleStore) graph.Iterator
|
|||
fallthrough
|
||||
case "String":
|
||||
it := ts.FixedIterator()
|
||||
str, _ := val.ToString()
|
||||
it.Add(ts.ValueOf(str))
|
||||
it.Add(ts.ValueOf(val.String()))
|
||||
return it
|
||||
default:
|
||||
glog.Errorln("Trying to handle unsupported Javascript value.")
|
||||
|
|
@ -130,10 +119,9 @@ func buildInOutIterator(obj *otto.Object, ts graph.TripleStore, base graph.Itera
|
|||
var tags []string
|
||||
one, _ := argArray.Get("1")
|
||||
if one.IsString() {
|
||||
s, _ := one.ToString()
|
||||
tags = append(tags, s)
|
||||
tags = append(tags, one.String())
|
||||
} else if one.Class() == "Array" {
|
||||
tags = makeListOfStringsFromArrayValue(one.Object())
|
||||
tags = stringsFrom(one.Object())
|
||||
}
|
||||
for _, tag := range tags {
|
||||
predicateNodeIterator.Tagger().Add(tag)
|
||||
|
|
@ -152,21 +140,19 @@ func buildInOutIterator(obj *otto.Object, ts graph.TripleStore, base graph.Itera
|
|||
}
|
||||
|
||||
func buildIteratorTreeHelper(obj *otto.Object, ts graph.TripleStore, base graph.Iterator) graph.Iterator {
|
||||
var it graph.Iterator
|
||||
it = base
|
||||
var it graph.Iterator = base
|
||||
|
||||
// TODO: Better error handling
|
||||
kindVal, _ := obj.Get("_gremlin_type")
|
||||
stringArgs := getStringArgs(obj)
|
||||
var subIt graph.Iterator
|
||||
prevVal, _ := obj.Get("_gremlin_prev")
|
||||
if !prevVal.IsObject() {
|
||||
if prev, _ := obj.Get("_gremlin_prev"); !prev.IsObject() {
|
||||
subIt = base
|
||||
} else {
|
||||
subIt = buildIteratorTreeHelper(prevVal.Object(), ts, base)
|
||||
subIt = buildIteratorTreeHelper(prev.Object(), ts, base)
|
||||
}
|
||||
|
||||
kind, _ := kindVal.ToString()
|
||||
switch kind {
|
||||
stringArgs := propertiesOf(obj, "string_args")
|
||||
val, _ := obj.Get("_gremlin_type")
|
||||
switch val.String() {
|
||||
case "vertex":
|
||||
if len(stringArgs) == 0 {
|
||||
it = ts.NodesAllIterator()
|
||||
|
|
|
|||
|
|
@ -17,46 +17,38 @@ package gremlin
|
|||
// Builds a new Gremlin environment pointing at a session.
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/barakmich/glog"
|
||||
"github.com/robertkrimen/otto"
|
||||
|
||||
"github.com/google/cayley/graph"
|
||||
)
|
||||
|
||||
func BuildEnviron(ses *Session) *otto.Otto {
|
||||
type worker struct {
|
||||
ts graph.TripleStore
|
||||
env *otto.Otto
|
||||
sync.Mutex
|
||||
|
||||
results chan interface{}
|
||||
shape map[string]interface{}
|
||||
|
||||
count int
|
||||
limit int
|
||||
|
||||
kill <-chan struct{}
|
||||
}
|
||||
|
||||
func newWorker(ts graph.TripleStore) *worker {
|
||||
env := otto.New()
|
||||
setupGremlin(env, ses)
|
||||
return env
|
||||
}
|
||||
|
||||
func concatStringArgs(call otto.FunctionCall) *[]interface{} {
|
||||
outStrings := make([]interface{}, 0)
|
||||
for _, arg := range call.ArgumentList {
|
||||
if arg.IsString() {
|
||||
outStrings = append(outStrings, arg.String())
|
||||
}
|
||||
if arg.IsObject() && arg.Class() == "Array" {
|
||||
obj, _ := arg.Export()
|
||||
for _, x := range obj.([]interface{}) {
|
||||
outStrings = append(outStrings, x.(string))
|
||||
}
|
||||
}
|
||||
wk := &worker{
|
||||
ts: ts,
|
||||
env: env,
|
||||
limit: -1,
|
||||
}
|
||||
return &outStrings
|
||||
}
|
||||
|
||||
func isVertexChain(obj *otto.Object) bool {
|
||||
val, _ := obj.Get("_gremlin_type")
|
||||
if x, _ := val.ToString(); x == "vertex" {
|
||||
return true
|
||||
}
|
||||
val, _ = obj.Get("_gremlin_prev")
|
||||
if val.IsObject() {
|
||||
return isVertexChain(val.Object())
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func setupGremlin(env *otto.Otto, ses *Session) {
|
||||
graph, _ := env.Object("graph = {}")
|
||||
env.Run("g = graph")
|
||||
|
||||
graph.Set("Vertex", func(call otto.FunctionCall) otto.Value {
|
||||
call.Otto.Run("var out = {}")
|
||||
out, err := call.Otto.Object("out")
|
||||
|
|
@ -65,31 +57,64 @@ func setupGremlin(env *otto.Otto, ses *Session) {
|
|||
return otto.TrueValue()
|
||||
}
|
||||
out.Set("_gremlin_type", "vertex")
|
||||
outStrings := concatStringArgs(call)
|
||||
if len(*outStrings) > 0 {
|
||||
out.Set("string_args", *outStrings)
|
||||
args := argsOf(call)
|
||||
if len(args) > 0 {
|
||||
out.Set("string_args", args)
|
||||
}
|
||||
embedTraversals(env, ses, out)
|
||||
embedFinals(env, ses, out)
|
||||
wk.embedTraversals(env, out)
|
||||
wk.embedFinals(env, out)
|
||||
return out.Value()
|
||||
})
|
||||
env.Run("graph.V = graph.Vertex")
|
||||
|
||||
graph.Set("Morphism", func(call otto.FunctionCall) otto.Value {
|
||||
call.Otto.Run("var out = {}")
|
||||
out, _ := call.Otto.Object("out")
|
||||
out.Set("_gremlin_type", "morphism")
|
||||
embedTraversals(env, ses, out)
|
||||
wk.embedTraversals(env, out)
|
||||
return out.Value()
|
||||
})
|
||||
env.Run("graph.M = graph.Morphism")
|
||||
|
||||
graph.Set("Emit", func(call otto.FunctionCall) otto.Value {
|
||||
value := call.Argument(0)
|
||||
if value.IsDefined() {
|
||||
ses.SendResult(&Result{val: &value})
|
||||
wk.send(&Result{val: &value})
|
||||
}
|
||||
return otto.NullValue()
|
||||
})
|
||||
env.Run("graph.V = graph.Vertex")
|
||||
env.Run("graph.M = graph.Morphism")
|
||||
env.Run("g = graph")
|
||||
|
||||
return wk
|
||||
}
|
||||
|
||||
func (wk *worker) wantShape() bool {
|
||||
return wk.shape != nil
|
||||
}
|
||||
|
||||
func argsOf(call otto.FunctionCall) []string {
|
||||
var out []string
|
||||
for _, arg := range call.ArgumentList {
|
||||
if arg.IsString() {
|
||||
out = append(out, arg.String())
|
||||
}
|
||||
if arg.IsObject() && arg.Class() == "Array" {
|
||||
obj, _ := arg.Export()
|
||||
for _, x := range obj.([]interface{}) {
|
||||
out = append(out, x.(string))
|
||||
}
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func isVertexChain(obj *otto.Object) bool {
|
||||
val, _ := obj.Get("_gremlin_type")
|
||||
if val.String() == "vertex" {
|
||||
return true
|
||||
}
|
||||
val, _ = obj.Get("_gremlin_prev")
|
||||
if val.IsObject() {
|
||||
return isVertexChain(val.Object())
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,45 +26,45 @@ import (
|
|||
|
||||
const TopResultTag = "id"
|
||||
|
||||
func embedFinals(env *otto.Otto, ses *Session, obj *otto.Object) {
|
||||
obj.Set("All", allFunc(env, ses, obj))
|
||||
obj.Set("GetLimit", limitFunc(env, ses, obj))
|
||||
obj.Set("ToArray", toArrayFunc(env, ses, obj, false))
|
||||
obj.Set("ToValue", toValueFunc(env, ses, obj, false))
|
||||
obj.Set("TagArray", toArrayFunc(env, ses, obj, true))
|
||||
obj.Set("TagValue", toValueFunc(env, ses, obj, true))
|
||||
obj.Set("Map", mapFunc(env, ses, obj))
|
||||
obj.Set("ForEach", mapFunc(env, ses, obj))
|
||||
func (wk *worker) embedFinals(env *otto.Otto, obj *otto.Object) {
|
||||
obj.Set("All", wk.allFunc(env, obj))
|
||||
obj.Set("GetLimit", wk.limitFunc(env, obj))
|
||||
obj.Set("ToArray", wk.toArrayFunc(env, obj, false))
|
||||
obj.Set("ToValue", wk.toValueFunc(env, obj, false))
|
||||
obj.Set("TagArray", wk.toArrayFunc(env, obj, true))
|
||||
obj.Set("TagValue", wk.toValueFunc(env, obj, true))
|
||||
obj.Set("Map", wk.mapFunc(env, obj))
|
||||
obj.Set("ForEach", wk.mapFunc(env, obj))
|
||||
}
|
||||
|
||||
func allFunc(env *otto.Otto, ses *Session, obj *otto.Object) func(otto.FunctionCall) otto.Value {
|
||||
func (wk *worker) allFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value {
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
it := buildIteratorTree(obj, ses.ts)
|
||||
it := buildIteratorTree(obj, wk.ts)
|
||||
it.Tagger().Add(TopResultTag)
|
||||
ses.limit = -1
|
||||
ses.count = 0
|
||||
runIteratorOnSession(it, ses)
|
||||
wk.limit = -1
|
||||
wk.count = 0
|
||||
wk.runIterator(it)
|
||||
return otto.NullValue()
|
||||
}
|
||||
}
|
||||
|
||||
func limitFunc(env *otto.Otto, ses *Session, obj *otto.Object) func(otto.FunctionCall) otto.Value {
|
||||
func (wk *worker) limitFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value {
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
if len(call.ArgumentList) > 0 {
|
||||
limitVal, _ := call.Argument(0).ToInteger()
|
||||
it := buildIteratorTree(obj, ses.ts)
|
||||
it := buildIteratorTree(obj, wk.ts)
|
||||
it.Tagger().Add(TopResultTag)
|
||||
ses.limit = int(limitVal)
|
||||
ses.count = 0
|
||||
runIteratorOnSession(it, ses)
|
||||
wk.limit = int(limitVal)
|
||||
wk.count = 0
|
||||
wk.runIterator(it)
|
||||
}
|
||||
return otto.NullValue()
|
||||
}
|
||||
}
|
||||
|
||||
func toArrayFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value {
|
||||
func (wk *worker) toArrayFunc(env *otto.Otto, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value {
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
it := buildIteratorTree(obj, ses.ts)
|
||||
it := buildIteratorTree(obj, wk.ts)
|
||||
it.Tagger().Add(TopResultTag)
|
||||
limit := -1
|
||||
if len(call.ArgumentList) > 0 {
|
||||
|
|
@ -74,10 +74,10 @@ func toArrayFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool)
|
|||
var val otto.Value
|
||||
var err error
|
||||
if !withTags {
|
||||
array := runIteratorToArrayNoTags(it, ses, limit)
|
||||
array := wk.runIteratorToArrayNoTags(it, limit)
|
||||
val, err = call.Otto.ToValue(array)
|
||||
} else {
|
||||
array := runIteratorToArray(it, ses, limit)
|
||||
array := wk.runIteratorToArray(it, limit)
|
||||
val, err = call.Otto.ToValue(array)
|
||||
}
|
||||
|
||||
|
|
@ -89,21 +89,21 @@ func toArrayFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool)
|
|||
}
|
||||
}
|
||||
|
||||
func toValueFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value {
|
||||
func (wk *worker) toValueFunc(env *otto.Otto, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value {
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
it := buildIteratorTree(obj, ses.ts)
|
||||
it := buildIteratorTree(obj, wk.ts)
|
||||
it.Tagger().Add(TopResultTag)
|
||||
limit := 1
|
||||
var val otto.Value
|
||||
var err error
|
||||
if !withTags {
|
||||
array := runIteratorToArrayNoTags(it, ses, limit)
|
||||
array := wk.runIteratorToArrayNoTags(it, limit)
|
||||
if len(array) < 1 {
|
||||
return otto.NullValue()
|
||||
}
|
||||
val, err = call.Otto.ToValue(array[0])
|
||||
} else {
|
||||
array := runIteratorToArray(it, ses, limit)
|
||||
array := wk.runIteratorToArray(it, limit)
|
||||
if len(array) < 1 {
|
||||
return otto.NullValue()
|
||||
}
|
||||
|
|
@ -115,13 +115,12 @@ func toValueFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool)
|
|||
} else {
|
||||
return val
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func mapFunc(env *otto.Otto, ses *Session, obj *otto.Object) func(otto.FunctionCall) otto.Value {
|
||||
func (wk *worker) mapFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value {
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
it := buildIteratorTree(obj, ses.ts)
|
||||
it := buildIteratorTree(obj, wk.ts)
|
||||
it.Tagger().Add(TopResultTag)
|
||||
limit := -1
|
||||
if len(call.ArgumentList) == 0 {
|
||||
|
|
@ -132,26 +131,26 @@ func mapFunc(env *otto.Otto, ses *Session, obj *otto.Object) func(otto.FunctionC
|
|||
limitParsed, _ := call.Argument(0).ToInteger()
|
||||
limit = int(limitParsed)
|
||||
}
|
||||
runIteratorWithCallback(it, ses, callback, call, limit)
|
||||
wk.runIteratorWithCallback(it, callback, call, limit)
|
||||
return otto.NullValue()
|
||||
}
|
||||
}
|
||||
|
||||
func tagsToValueMap(m map[string]graph.Value, ses *Session) map[string]string {
|
||||
func (wk *worker) tagsToValueMap(m map[string]graph.Value) map[string]string {
|
||||
outputMap := make(map[string]string)
|
||||
for k, v := range m {
|
||||
outputMap[k] = ses.ts.NameOf(v)
|
||||
outputMap[k] = wk.ts.NameOf(v)
|
||||
}
|
||||
return outputMap
|
||||
}
|
||||
|
||||
func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string]string {
|
||||
func (wk *worker) runIteratorToArray(it graph.Iterator, limit int) []map[string]string {
|
||||
output := make([]map[string]string, 0)
|
||||
count := 0
|
||||
n := 0
|
||||
it, _ = it.Optimize()
|
||||
for {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
case <-wk.kill:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
|
@ -160,22 +159,22 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string
|
|||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
output = append(output, tagsToValueMap(tags, ses))
|
||||
count++
|
||||
if limit >= 0 && count >= limit {
|
||||
output = append(output, wk.tagsToValueMap(tags))
|
||||
n++
|
||||
if limit >= 0 && n >= limit {
|
||||
break
|
||||
}
|
||||
for it.NextPath() {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
case <-wk.kill:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
output = append(output, tagsToValueMap(tags, ses))
|
||||
count++
|
||||
if limit >= 0 && count >= limit {
|
||||
output = append(output, wk.tagsToValueMap(tags))
|
||||
n++
|
||||
if limit >= 0 && n >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
@ -184,22 +183,22 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string
|
|||
return output
|
||||
}
|
||||
|
||||
func runIteratorToArrayNoTags(it graph.Iterator, ses *Session, limit int) []string {
|
||||
func (wk *worker) runIteratorToArrayNoTags(it graph.Iterator, limit int) []string {
|
||||
output := make([]string, 0)
|
||||
count := 0
|
||||
n := 0
|
||||
it, _ = it.Optimize()
|
||||
for {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
case <-wk.kill:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
if !graph.Next(it) {
|
||||
break
|
||||
}
|
||||
output = append(output, ses.ts.NameOf(it.Result()))
|
||||
count++
|
||||
if limit >= 0 && count >= limit {
|
||||
output = append(output, wk.ts.NameOf(it.Result()))
|
||||
n++
|
||||
if limit >= 0 && n >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
@ -207,13 +206,13 @@ func runIteratorToArrayNoTags(it graph.Iterator, ses *Session, limit int) []stri
|
|||
return output
|
||||
}
|
||||
|
||||
func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Value, this otto.FunctionCall, limit int) {
|
||||
count := 0
|
||||
func (wk *worker) runIteratorWithCallback(it graph.Iterator, callback otto.Value, this otto.FunctionCall, limit int) {
|
||||
n := 0
|
||||
it, _ = it.Optimize()
|
||||
glog.V(2).Infoln(it.DebugString(0))
|
||||
for {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
case <-wk.kill:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
|
@ -222,24 +221,24 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu
|
|||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
val, _ := this.Otto.ToValue(tagsToValueMap(tags, ses))
|
||||
val, _ := this.Otto.ToValue(wk.tagsToValueMap(tags))
|
||||
val, _ = callback.Call(this.This, val)
|
||||
count++
|
||||
if limit >= 0 && count >= limit {
|
||||
n++
|
||||
if limit >= 0 && n >= limit {
|
||||
break
|
||||
}
|
||||
for it.NextPath() {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
case <-wk.kill:
|
||||
return
|
||||
default:
|
||||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
val, _ := this.Otto.ToValue(tagsToValueMap(tags, ses))
|
||||
val, _ := this.Otto.ToValue(wk.tagsToValueMap(tags))
|
||||
val, _ = callback.Call(this.This, val)
|
||||
count++
|
||||
if limit >= 0 && count >= limit {
|
||||
n++
|
||||
if limit >= 0 && n >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
@ -247,16 +246,37 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu
|
|||
it.Close()
|
||||
}
|
||||
|
||||
func runIteratorOnSession(it graph.Iterator, ses *Session) {
|
||||
if ses.wantShape {
|
||||
iterator.OutputQueryShapeForIterator(it, ses.ts, ses.shape)
|
||||
func (wk *worker) send(r *Result) bool {
|
||||
if wk.limit >= 0 && wk.limit == wk.count {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case <-wk.kill:
|
||||
return false
|
||||
default:
|
||||
}
|
||||
if wk.results != nil {
|
||||
wk.results <- r
|
||||
wk.count++
|
||||
if wk.limit >= 0 && wk.limit == wk.count {
|
||||
return false
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (wk *worker) runIterator(it graph.Iterator) {
|
||||
if wk.wantShape() {
|
||||
iterator.OutputQueryShapeForIterator(it, wk.ts, wk.shape)
|
||||
return
|
||||
}
|
||||
it, _ = it.Optimize()
|
||||
glog.V(2).Infoln(it.DebugString(0))
|
||||
for {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
case <-wk.kill:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
|
@ -265,18 +285,18 @@ func runIteratorOnSession(it graph.Iterator, ses *Session) {
|
|||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
if !ses.SendResult(&Result{actualResults: &tags}) {
|
||||
if !wk.send(&Result{actualResults: tags}) {
|
||||
break
|
||||
}
|
||||
for it.NextPath() {
|
||||
select {
|
||||
case <-ses.kill:
|
||||
case <-wk.kill:
|
||||
return
|
||||
default:
|
||||
}
|
||||
tags := make(map[string]graph.Value)
|
||||
it.TagResults(tags)
|
||||
if !ses.SendResult(&Result{actualResults: &tags}) {
|
||||
if !wk.send(&Result{actualResults: tags}) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -257,7 +257,7 @@ func runQueryGetTag(g []quad.Quad, query string, tag string) []string {
|
|||
for res := range c {
|
||||
data := res.(*Result)
|
||||
if data.val == nil {
|
||||
val := (*data.actualResults)[tag]
|
||||
val := data.actualResults[tag]
|
||||
if val != nil {
|
||||
results = append(results, js.ts.NameOf(val))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/robertkrimen/otto"
|
||||
|
|
@ -31,32 +30,29 @@ import (
|
|||
var ErrKillTimeout = errors.New("query timed out")
|
||||
|
||||
type Session struct {
|
||||
ts graph.TripleStore
|
||||
results chan interface{}
|
||||
env *otto.Otto
|
||||
envLock sync.Mutex
|
||||
ts graph.TripleStore
|
||||
|
||||
wk *worker
|
||||
script *otto.Script
|
||||
persist *otto.Otto
|
||||
|
||||
timeout time.Duration
|
||||
kill chan struct{}
|
||||
|
||||
debug bool
|
||||
limit int
|
||||
count int
|
||||
dataOutput []interface{}
|
||||
wantShape bool
|
||||
shape map[string]interface{}
|
||||
err error
|
||||
script *otto.Script
|
||||
kill chan struct{}
|
||||
timeout time.Duration
|
||||
emptyEnv *otto.Otto
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func NewSession(ts graph.TripleStore, timeout time.Duration, persist bool) *Session {
|
||||
g := Session{
|
||||
ts: ts,
|
||||
limit: -1,
|
||||
wk: newWorker(ts),
|
||||
timeout: timeout,
|
||||
}
|
||||
g.env = BuildEnviron(&g)
|
||||
if persist {
|
||||
g.emptyEnv = g.env
|
||||
g.persist = g.wk.env
|
||||
}
|
||||
return &g
|
||||
}
|
||||
|
|
@ -65,7 +61,7 @@ type Result struct {
|
|||
metaresult bool
|
||||
err error
|
||||
val *otto.Value
|
||||
actualResults *map[string]graph.Value
|
||||
actualResults map[string]graph.Value
|
||||
}
|
||||
|
||||
func (s *Session) ToggleDebug() {
|
||||
|
|
@ -74,15 +70,14 @@ func (s *Session) ToggleDebug() {
|
|||
|
||||
func (s *Session) GetQuery(input string, out chan map[string]interface{}) {
|
||||
defer close(out)
|
||||
s.shape = make(map[string]interface{})
|
||||
s.wantShape = true
|
||||
s.env.Run(input)
|
||||
out <- s.shape
|
||||
s.shape = nil
|
||||
s.wk.shape = make(map[string]interface{})
|
||||
s.wk.env.Run(input)
|
||||
out <- s.wk.shape
|
||||
s.wk.shape = nil
|
||||
}
|
||||
|
||||
func (s *Session) InputParses(input string) (query.ParseResult, error) {
|
||||
script, err := s.env.Compile("", input)
|
||||
script, err := s.wk.env.Compile("", input)
|
||||
if err != nil {
|
||||
return query.ParseFail, err
|
||||
}
|
||||
|
|
@ -90,35 +85,13 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) {
|
|||
return query.Parsed, nil
|
||||
}
|
||||
|
||||
func (s *Session) SendResult(r *Result) bool {
|
||||
if s.limit >= 0 && s.limit == s.count {
|
||||
return false
|
||||
}
|
||||
s.envLock.Lock()
|
||||
kill := s.kill
|
||||
s.envLock.Unlock()
|
||||
select {
|
||||
case <-kill:
|
||||
return false
|
||||
default:
|
||||
}
|
||||
if s.results != nil {
|
||||
s.results <- r
|
||||
s.count++
|
||||
if s.limit >= 0 && s.limit == s.count {
|
||||
return false
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
|
||||
wk := s.wk
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if r == ErrKillTimeout {
|
||||
s.err = ErrKillTimeout
|
||||
wk.env = s.persist
|
||||
return
|
||||
}
|
||||
panic(r)
|
||||
|
|
@ -126,49 +99,40 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
|
|||
}()
|
||||
|
||||
// Use buffered chan to prevent blocking.
|
||||
s.env.Interrupt = make(chan func(), 1)
|
||||
wk.env.Interrupt = make(chan func(), 1)
|
||||
s.kill = make(chan struct{})
|
||||
wk.kill = s.kill
|
||||
|
||||
ready := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
if s.timeout >= 0 {
|
||||
go func() {
|
||||
time.Sleep(s.timeout)
|
||||
<-ready
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
close(s.kill)
|
||||
s.envLock.Lock()
|
||||
defer s.envLock.Unlock()
|
||||
s.kill = nil
|
||||
if s.env != nil {
|
||||
s.env.Interrupt <- func() {
|
||||
wk.Lock()
|
||||
if wk.env != nil {
|
||||
wk.env.Interrupt <- func() {
|
||||
panic(ErrKillTimeout)
|
||||
}
|
||||
s.env = s.emptyEnv
|
||||
}
|
||||
return
|
||||
wk.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
s.envLock.Lock()
|
||||
env := s.env
|
||||
if s.kill == nil {
|
||||
s.kill = make(chan struct{})
|
||||
}
|
||||
s.envLock.Unlock()
|
||||
close(ready)
|
||||
out, err := env.Run(input)
|
||||
close(done)
|
||||
return out, err
|
||||
wk.Lock()
|
||||
env := wk.env
|
||||
wk.Unlock()
|
||||
return env.Run(input)
|
||||
}
|
||||
|
||||
func (s *Session) ExecInput(input string, out chan interface{}, limit int) {
|
||||
func (s *Session) ExecInput(input string, out chan interface{}, _ int) {
|
||||
defer close(out)
|
||||
s.err = nil
|
||||
s.results = out
|
||||
s.wk.results = out
|
||||
var err error
|
||||
var value otto.Value
|
||||
if s.script == nil {
|
||||
|
|
@ -181,11 +145,11 @@ func (s *Session) ExecInput(input string, out chan interface{}, limit int) {
|
|||
err: err,
|
||||
val: &value,
|
||||
}
|
||||
s.results = nil
|
||||
s.wk.results = nil
|
||||
s.script = nil
|
||||
s.envLock.Lock()
|
||||
s.env = s.emptyEnv
|
||||
s.envLock.Unlock()
|
||||
s.wk.Lock()
|
||||
s.wk.env = s.persist
|
||||
s.wk.Unlock()
|
||||
}
|
||||
|
||||
func (s *Session) ToText(result interface{}) string {
|
||||
|
|
@ -210,9 +174,9 @@ func (s *Session) ToText(result interface{}) string {
|
|||
out = fmt.Sprintln("****")
|
||||
if data.val == nil {
|
||||
tags := data.actualResults
|
||||
tagKeys := make([]string, len(*tags))
|
||||
tagKeys := make([]string, len(tags))
|
||||
i := 0
|
||||
for k, _ := range *tags {
|
||||
for k, _ := range tags {
|
||||
tagKeys[i] = k
|
||||
i++
|
||||
}
|
||||
|
|
@ -221,7 +185,7 @@ func (s *Session) ToText(result interface{}) string {
|
|||
if k == "$_" {
|
||||
continue
|
||||
}
|
||||
out += fmt.Sprintf("%s : %s\n", k, s.ts.NameOf((*tags)[k]))
|
||||
out += fmt.Sprintf("%s : %s\n", k, s.ts.NameOf(tags[k]))
|
||||
}
|
||||
} else {
|
||||
if data.val.IsObject() {
|
||||
|
|
@ -245,15 +209,15 @@ func (s *Session) BuildJson(result interface{}) {
|
|||
if data.val == nil {
|
||||
obj := make(map[string]string)
|
||||
tags := data.actualResults
|
||||
tagKeys := make([]string, len(*tags))
|
||||
tagKeys := make([]string, len(tags))
|
||||
i := 0
|
||||
for k, _ := range *tags {
|
||||
for k, _ := range tags {
|
||||
tagKeys[i] = k
|
||||
i++
|
||||
}
|
||||
sort.Strings(tagKeys)
|
||||
for _, k := range tagKeys {
|
||||
obj[k] = s.ts.NameOf((*tags)[k])
|
||||
obj[k] = s.ts.NameOf(tags[k])
|
||||
}
|
||||
s.dataOutput = append(s.dataOutput, obj)
|
||||
} else {
|
||||
|
|
@ -273,11 +237,8 @@ func (s *Session) GetJson() ([]interface{}, error) {
|
|||
if s.err != nil {
|
||||
return nil, s.err
|
||||
}
|
||||
s.envLock.Lock()
|
||||
kill := s.kill
|
||||
s.envLock.Unlock()
|
||||
select {
|
||||
case <-kill:
|
||||
case <-s.kill:
|
||||
return nil, ErrKillTimeout
|
||||
default:
|
||||
return s.dataOutput, nil
|
||||
|
|
|
|||
|
|
@ -21,123 +21,121 @@ import (
|
|||
"github.com/robertkrimen/otto"
|
||||
)
|
||||
|
||||
func embedTraversals(env *otto.Otto, ses *Session, obj *otto.Object) {
|
||||
obj.Set("In", gremlinFunc("in", obj, env, ses))
|
||||
obj.Set("Out", gremlinFunc("out", obj, env, ses))
|
||||
obj.Set("Is", gremlinFunc("is", obj, env, ses))
|
||||
obj.Set("Both", gremlinFunc("both", obj, env, ses))
|
||||
obj.Set("Follow", gremlinFunc("follow", obj, env, ses))
|
||||
obj.Set("FollowR", gremlinFollowR("followr", obj, env, ses))
|
||||
obj.Set("And", gremlinFunc("and", obj, env, ses))
|
||||
obj.Set("Intersect", gremlinFunc("and", obj, env, ses))
|
||||
obj.Set("Union", gremlinFunc("or", obj, env, ses))
|
||||
obj.Set("Or", gremlinFunc("or", obj, env, ses))
|
||||
obj.Set("Back", gremlinBack("back", obj, env, ses))
|
||||
obj.Set("Tag", gremlinFunc("tag", obj, env, ses))
|
||||
obj.Set("As", gremlinFunc("tag", obj, env, ses))
|
||||
obj.Set("Has", gremlinFunc("has", obj, env, ses))
|
||||
obj.Set("Save", gremlinFunc("save", obj, env, ses))
|
||||
obj.Set("SaveR", gremlinFunc("saver", obj, env, ses))
|
||||
func (wk *worker) embedTraversals(env *otto.Otto, obj *otto.Object) {
|
||||
obj.Set("In", wk.gremlinFunc("in", obj, env))
|
||||
obj.Set("Out", wk.gremlinFunc("out", obj, env))
|
||||
obj.Set("Is", wk.gremlinFunc("is", obj, env))
|
||||
obj.Set("Both", wk.gremlinFunc("both", obj, env))
|
||||
obj.Set("Follow", wk.gremlinFunc("follow", obj, env))
|
||||
obj.Set("FollowR", wk.gremlinFollowR("followr", obj, env))
|
||||
obj.Set("And", wk.gremlinFunc("and", obj, env))
|
||||
obj.Set("Intersect", wk.gremlinFunc("and", obj, env))
|
||||
obj.Set("Union", wk.gremlinFunc("or", obj, env))
|
||||
obj.Set("Or", wk.gremlinFunc("or", obj, env))
|
||||
obj.Set("Back", wk.gremlinBack("back", obj, env))
|
||||
obj.Set("Tag", wk.gremlinFunc("tag", obj, env))
|
||||
obj.Set("As", wk.gremlinFunc("tag", obj, env))
|
||||
obj.Set("Has", wk.gremlinFunc("has", obj, env))
|
||||
obj.Set("Save", wk.gremlinFunc("save", obj, env))
|
||||
obj.Set("SaveR", wk.gremlinFunc("saver", obj, env))
|
||||
}
|
||||
|
||||
func gremlinFunc(kind string, prevObj *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value {
|
||||
func (wk *worker) gremlinFunc(kind string, prev *otto.Object, env *otto.Otto) func(otto.FunctionCall) otto.Value {
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
call.Otto.Run("var out = {}")
|
||||
out, _ := call.Otto.Object("out")
|
||||
out.Set("_gremlin_type", kind)
|
||||
out.Set("_gremlin_values", call.ArgumentList)
|
||||
out.Set("_gremlin_prev", prevObj)
|
||||
outStrings := concatStringArgs(call)
|
||||
if len(*outStrings) > 0 {
|
||||
out.Set("string_args", *outStrings)
|
||||
out.Set("_gremlin_prev", prev)
|
||||
args := argsOf(call)
|
||||
if len(args) > 0 {
|
||||
out.Set("string_args", args)
|
||||
}
|
||||
embedTraversals(env, ses, out)
|
||||
wk.embedTraversals(env, out)
|
||||
if isVertexChain(call.This.Object()) {
|
||||
embedFinals(env, ses, out)
|
||||
wk.embedFinals(env, out)
|
||||
}
|
||||
return out.Value()
|
||||
}
|
||||
}
|
||||
|
||||
func gremlinBack(kind string, prevObj *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value {
|
||||
func (wk *worker) gremlinBack(kind string, prev *otto.Object, env *otto.Otto) func(otto.FunctionCall) otto.Value {
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
call.Otto.Run("var out = {}")
|
||||
out, _ := call.Otto.Object("out")
|
||||
out.Set("_gremlin_type", kind)
|
||||
out.Set("_gremlin_values", call.ArgumentList)
|
||||
outStrings := concatStringArgs(call)
|
||||
if len(*outStrings) > 0 {
|
||||
out.Set("string_args", *outStrings)
|
||||
args := argsOf(call)
|
||||
if len(args) > 0 {
|
||||
out.Set("string_args", args)
|
||||
}
|
||||
var otherChain *otto.Object
|
||||
var thisObj *otto.Object
|
||||
if len(*outStrings) != 0 {
|
||||
otherChain, thisObj = reverseGremlinChainTo(call.Otto, prevObj, (*outStrings)[0].(string))
|
||||
if len(args) != 0 {
|
||||
otherChain, thisObj = reverseGremlinChainTo(call.Otto, prev, args[0])
|
||||
} else {
|
||||
otherChain, thisObj = reverseGremlinChainTo(call.Otto, prevObj, "")
|
||||
otherChain, thisObj = reverseGremlinChainTo(call.Otto, prev, "")
|
||||
}
|
||||
out.Set("_gremlin_prev", thisObj)
|
||||
out.Set("_gremlin_back_chain", otherChain)
|
||||
embedTraversals(env, ses, out)
|
||||
wk.embedTraversals(env, out)
|
||||
if isVertexChain(call.This.Object()) {
|
||||
embedFinals(env, ses, out)
|
||||
wk.embedFinals(env, out)
|
||||
}
|
||||
return out.Value()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func gremlinFollowR(kind string, prevObj *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value {
|
||||
func (wk *worker) gremlinFollowR(kind string, prev *otto.Object, env *otto.Otto) func(otto.FunctionCall) otto.Value {
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
call.Otto.Run("var out = {}")
|
||||
out, _ := call.Otto.Object("out")
|
||||
out.Set("_gremlin_type", kind)
|
||||
out.Set("_gremlin_values", call.ArgumentList)
|
||||
outStrings := concatStringArgs(call)
|
||||
if len(*outStrings) > 0 {
|
||||
out.Set("string_args", *outStrings)
|
||||
args := argsOf(call)
|
||||
if len(args) > 0 {
|
||||
out.Set("string_args", args)
|
||||
}
|
||||
if len(call.ArgumentList) == 0 {
|
||||
return prevObj.Value()
|
||||
return prev.Value()
|
||||
}
|
||||
arg := call.Argument(0)
|
||||
if isVertexChain(arg.Object()) {
|
||||
return prevObj.Value()
|
||||
return prev.Value()
|
||||
}
|
||||
newChain, _ := reverseGremlinChainTo(call.Otto, arg.Object(), "")
|
||||
out.Set("_gremlin_prev", prevObj)
|
||||
out.Set("_gremlin_prev", prev)
|
||||
out.Set("_gremlin_followr", newChain)
|
||||
embedTraversals(env, ses, out)
|
||||
wk.embedTraversals(env, out)
|
||||
if isVertexChain(call.This.Object()) {
|
||||
embedFinals(env, ses, out)
|
||||
wk.embedFinals(env, out)
|
||||
}
|
||||
return out.Value()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func reverseGremlinChainTo(env *otto.Otto, prevObj *otto.Object, tag string) (*otto.Object, *otto.Object) {
|
||||
func reverseGremlinChainTo(env *otto.Otto, prev *otto.Object, tag string) (*otto.Object, *otto.Object) {
|
||||
env.Run("var _base_object = {}")
|
||||
base, err := env.Object("_base_object")
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return otto.NullValue().Object(), otto.NullValue().Object()
|
||||
}
|
||||
if isVertexChain(prevObj) {
|
||||
if isVertexChain(prev) {
|
||||
base.Set("_gremlin_type", "vertex")
|
||||
} else {
|
||||
base.Set("_gremlin_type", "morphism")
|
||||
}
|
||||
return reverseGremlinChainHelper(env, prevObj, base, tag)
|
||||
return reverseGremlinChainHelper(env, prev, base, tag)
|
||||
}
|
||||
|
||||
func reverseGremlinChainHelper(env *otto.Otto, chain *otto.Object, newBase *otto.Object, tag string) (*otto.Object, *otto.Object) {
|
||||
kindVal, _ := chain.Get("_gremlin_type")
|
||||
kind, _ := kindVal.ToString()
|
||||
kind := kindVal.String()
|
||||
|
||||
if tag != "" {
|
||||
if kind == "tag" {
|
||||
tags := getStringArgs(chain)
|
||||
tags := propertiesOf(chain, "string_args")
|
||||
for _, t := range tags {
|
||||
if t == tag {
|
||||
return newBase, chain
|
||||
|
|
@ -174,8 +172,7 @@ func reverseGremlinChainHelper(env *otto.Otto, chain *otto.Object, newBase *otto
|
|||
|
||||
func debugChain(obj *otto.Object) bool {
|
||||
val, _ := obj.Get("_gremlin_type")
|
||||
x, _ := val.ToString()
|
||||
glog.V(2).Infoln(x)
|
||||
glog.V(2).Infoln(val)
|
||||
val, _ = obj.Get("_gremlin_prev")
|
||||
if val.IsObject() {
|
||||
return debugChain(val.Object())
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) {
|
|||
return query.Parsed, nil
|
||||
}
|
||||
|
||||
func (s *Session) ExecInput(input string, c chan interface{}, limit int) {
|
||||
func (s *Session) ExecInput(input string, c chan interface{}, _ int) {
|
||||
defer close(c)
|
||||
var mqlQuery interface{}
|
||||
err := json.Unmarshal([]byte(input), &mqlQuery)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue