From 95170eb8edbdda2fede95317e12e4f74395a39da Mon Sep 17 00:00:00 2001 From: kortschak Date: Mon, 25 Aug 2014 15:57:40 +0930 Subject: [PATCH 1/7] Prepare gremlin for introduction of a worker type --- query/gremlin/build_iterator.go | 54 ++++++++------------- query/gremlin/environ.go | 43 ++++++++--------- query/gremlin/finals.go | 104 ++++++++++++++++++++-------------------- query/gremlin/session.go | 2 +- query/gremlin/traversals.go | 98 +++++++++++++++++++------------------ 5 files changed, 141 insertions(+), 160 deletions(-) diff --git a/query/gremlin/build_iterator.go b/query/gremlin/build_iterator.go index 2a0b800..c2a9108 100644 --- a/query/gremlin/build_iterator.go +++ b/query/gremlin/build_iterator.go @@ -25,21 +25,15 @@ import ( "github.com/google/cayley/quad" ) -func getStrings(obj *otto.Object, field string) []string { - strings := make([]string, 0) - val, _ := obj.Get(field) - if !val.IsUndefined() { - export, _ := val.Export() - array := export.([]interface{}) - for _, arg := range array { - strings = append(strings, arg.(string)) - } +func propertiesOf(obj *otto.Object, name string) []string { + val, _ := obj.Get(name) + if val.IsUndefined() { + return nil } - return strings + export, _ := val.Export() + return export.([]string) } -func getStringArgs(obj *otto.Object) []string { return getStrings(obj, "string_args") } - func buildIteratorTree(obj *otto.Object, ts graph.TripleStore) graph.Iterator { if !isVertexChain(obj) { return iterator.NewNull() @@ -47,7 +41,7 @@ func buildIteratorTree(obj *otto.Object, ts graph.TripleStore) graph.Iterator { return buildIteratorTreeHelper(obj, ts, iterator.NewNull()) } -func makeListOfStringsFromArrayValue(obj *otto.Object) []string { +func stringsFrom(obj *otto.Object) []string { var output []string lengthValue, _ := obj.Get("length") length, _ := lengthValue.ToInteger() @@ -55,14 +49,10 @@ func makeListOfStringsFromArrayValue(obj *otto.Object) []string { for index := uint32(0); index < ulength; index += 1 { name := strconv.FormatInt(int64(index), 10) value, err := obj.Get(name) - if err != nil { + if err != nil || !value.IsString() { continue } - if !value.IsString() { - continue - } - s, _ := value.ToString() - output = append(output, s) + output = append(output, value.String()) } return output } @@ -87,7 +77,7 @@ func buildIteratorFromValue(val otto.Value, ts graph.TripleStore) graph.Iterator return buildIteratorTree(val.Object(), ts) case "Array": // Had better be an array of strings - strings := makeListOfStringsFromArrayValue(val.Object()) + strings := stringsFrom(val.Object()) it := ts.FixedIterator() for _, x := range strings { it.Add(ts.ValueOf(x)) @@ -101,8 +91,7 @@ func buildIteratorFromValue(val otto.Value, ts graph.TripleStore) graph.Iterator fallthrough case "String": it := ts.FixedIterator() - str, _ := val.ToString() - it.Add(ts.ValueOf(str)) + it.Add(ts.ValueOf(val.String())) return it default: glog.Errorln("Trying to handle unsupported Javascript value.") @@ -130,10 +119,9 @@ func buildInOutIterator(obj *otto.Object, ts graph.TripleStore, base graph.Itera var tags []string one, _ := argArray.Get("1") if one.IsString() { - s, _ := one.ToString() - tags = append(tags, s) + tags = append(tags, one.String()) } else if one.Class() == "Array" { - tags = makeListOfStringsFromArrayValue(one.Object()) + tags = stringsFrom(one.Object()) } for _, tag := range tags { predicateNodeIterator.Tagger().Add(tag) @@ -152,21 +140,19 @@ func buildInOutIterator(obj *otto.Object, ts graph.TripleStore, base graph.Itera } func buildIteratorTreeHelper(obj *otto.Object, ts graph.TripleStore, base graph.Iterator) graph.Iterator { - var it graph.Iterator - it = base + var it graph.Iterator = base + // TODO: Better error handling - kindVal, _ := obj.Get("_gremlin_type") - stringArgs := getStringArgs(obj) var subIt graph.Iterator - prevVal, _ := obj.Get("_gremlin_prev") - if !prevVal.IsObject() { + if prev, _ := obj.Get("_gremlin_prev"); !prev.IsObject() { subIt = base } else { - subIt = buildIteratorTreeHelper(prevVal.Object(), ts, base) + subIt = buildIteratorTreeHelper(prev.Object(), ts, base) } - kind, _ := kindVal.ToString() - switch kind { + stringArgs := propertiesOf(obj, "string_args") + val, _ := obj.Get("_gremlin_type") + switch val.String() { case "vertex": if len(stringArgs) == 0 { it = ts.NodesAllIterator() diff --git a/query/gremlin/environ.go b/query/gremlin/environ.go index 7918a21..cdbbdb4 100644 --- a/query/gremlin/environ.go +++ b/query/gremlin/environ.go @@ -21,31 +21,25 @@ import ( "github.com/robertkrimen/otto" ) -func BuildEnviron(ses *Session) *otto.Otto { - env := otto.New() - setupGremlin(env, ses) - return env -} - -func concatStringArgs(call otto.FunctionCall) *[]interface{} { - outStrings := make([]interface{}, 0) +func argsOf(call otto.FunctionCall) []string { + var out []string for _, arg := range call.ArgumentList { if arg.IsString() { - outStrings = append(outStrings, arg.String()) + out = append(out, arg.String()) } if arg.IsObject() && arg.Class() == "Array" { obj, _ := arg.Export() for _, x := range obj.([]interface{}) { - outStrings = append(outStrings, x.(string)) + out = append(out, x.(string)) } } } - return &outStrings + return out } func isVertexChain(obj *otto.Object) bool { val, _ := obj.Get("_gremlin_type") - if x, _ := val.ToString(); x == "vertex" { + if val.String() == "vertex" { return true } val, _ = obj.Get("_gremlin_prev") @@ -55,8 +49,10 @@ func isVertexChain(obj *otto.Object) bool { return false } -func setupGremlin(env *otto.Otto, ses *Session) { +func (s *Session) setup(env *otto.Otto) *otto.Otto { graph, _ := env.Object("graph = {}") + env.Run("g = graph") + graph.Set("Vertex", func(call otto.FunctionCall) otto.Value { call.Otto.Run("var out = {}") out, err := call.Otto.Object("out") @@ -65,31 +61,32 @@ func setupGremlin(env *otto.Otto, ses *Session) { return otto.TrueValue() } out.Set("_gremlin_type", "vertex") - outStrings := concatStringArgs(call) - if len(*outStrings) > 0 { - out.Set("string_args", *outStrings) + args := argsOf(call) + if len(args) > 0 { + out.Set("string_args", args) } - embedTraversals(env, ses, out) - embedFinals(env, ses, out) + s.embedTraversals(env, out) + s.embedFinals(env, out) return out.Value() }) + env.Run("graph.V = graph.Vertex") graph.Set("Morphism", func(call otto.FunctionCall) otto.Value { call.Otto.Run("var out = {}") out, _ := call.Otto.Object("out") out.Set("_gremlin_type", "morphism") - embedTraversals(env, ses, out) + s.embedTraversals(env, out) return out.Value() }) + env.Run("graph.M = graph.Morphism") + graph.Set("Emit", func(call otto.FunctionCall) otto.Value { value := call.Argument(0) if value.IsDefined() { - ses.SendResult(&Result{val: &value}) + s.SendResult(&Result{val: &value}) } return otto.NullValue() }) - env.Run("graph.V = graph.Vertex") - env.Run("graph.M = graph.Morphism") - env.Run("g = graph") + return env } diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index 6979e22..1e9f6ed 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -26,45 +26,45 @@ import ( const TopResultTag = "id" -func embedFinals(env *otto.Otto, ses *Session, obj *otto.Object) { - obj.Set("All", allFunc(env, ses, obj)) - obj.Set("GetLimit", limitFunc(env, ses, obj)) - obj.Set("ToArray", toArrayFunc(env, ses, obj, false)) - obj.Set("ToValue", toValueFunc(env, ses, obj, false)) - obj.Set("TagArray", toArrayFunc(env, ses, obj, true)) - obj.Set("TagValue", toValueFunc(env, ses, obj, true)) - obj.Set("Map", mapFunc(env, ses, obj)) - obj.Set("ForEach", mapFunc(env, ses, obj)) +func (s *Session) embedFinals(env *otto.Otto, obj *otto.Object) { + obj.Set("All", s.allFunc(env, obj)) + obj.Set("GetLimit", s.limitFunc(env, obj)) + obj.Set("ToArray", s.toArrayFunc(env, obj, false)) + obj.Set("ToValue", s.toValueFunc(env, obj, false)) + obj.Set("TagArray", s.toArrayFunc(env, obj, true)) + obj.Set("TagValue", s.toValueFunc(env, obj, true)) + obj.Set("Map", s.mapFunc(env, obj)) + obj.Set("ForEach", s.mapFunc(env, obj)) } -func allFunc(env *otto.Otto, ses *Session, obj *otto.Object) func(otto.FunctionCall) otto.Value { +func (s *Session) allFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { - it := buildIteratorTree(obj, ses.ts) + it := buildIteratorTree(obj, s.ts) it.Tagger().Add(TopResultTag) - ses.limit = -1 - ses.count = 0 - runIteratorOnSession(it, ses) + s.limit = -1 + s.count = 0 + s.runIterator(it) return otto.NullValue() } } -func limitFunc(env *otto.Otto, ses *Session, obj *otto.Object) func(otto.FunctionCall) otto.Value { +func (s *Session) limitFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { if len(call.ArgumentList) > 0 { limitVal, _ := call.Argument(0).ToInteger() - it := buildIteratorTree(obj, ses.ts) + it := buildIteratorTree(obj, s.ts) it.Tagger().Add(TopResultTag) - ses.limit = int(limitVal) - ses.count = 0 - runIteratorOnSession(it, ses) + s.limit = int(limitVal) + s.count = 0 + s.runIterator(it) } return otto.NullValue() } } -func toArrayFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value { +func (s *Session) toArrayFunc(env *otto.Otto, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { - it := buildIteratorTree(obj, ses.ts) + it := buildIteratorTree(obj, s.ts) it.Tagger().Add(TopResultTag) limit := -1 if len(call.ArgumentList) > 0 { @@ -74,10 +74,10 @@ func toArrayFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool) var val otto.Value var err error if !withTags { - array := runIteratorToArrayNoTags(it, ses, limit) + array := s.runIteratorToArrayNoTags(it, limit) val, err = call.Otto.ToValue(array) } else { - array := runIteratorToArray(it, ses, limit) + array := s.runIteratorToArray(it, limit) val, err = call.Otto.ToValue(array) } @@ -89,21 +89,21 @@ func toArrayFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool) } } -func toValueFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value { +func (s *Session) toValueFunc(env *otto.Otto, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { - it := buildIteratorTree(obj, ses.ts) + it := buildIteratorTree(obj, s.ts) it.Tagger().Add(TopResultTag) limit := 1 var val otto.Value var err error if !withTags { - array := runIteratorToArrayNoTags(it, ses, limit) + array := s.runIteratorToArrayNoTags(it, limit) if len(array) < 1 { return otto.NullValue() } val, err = call.Otto.ToValue(array[0]) } else { - array := runIteratorToArray(it, ses, limit) + array := s.runIteratorToArray(it, limit) if len(array) < 1 { return otto.NullValue() } @@ -119,9 +119,9 @@ func toValueFunc(env *otto.Otto, ses *Session, obj *otto.Object, withTags bool) } } -func mapFunc(env *otto.Otto, ses *Session, obj *otto.Object) func(otto.FunctionCall) otto.Value { +func (s *Session) mapFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { - it := buildIteratorTree(obj, ses.ts) + it := buildIteratorTree(obj, s.ts) it.Tagger().Add(TopResultTag) limit := -1 if len(call.ArgumentList) == 0 { @@ -132,26 +132,26 @@ func mapFunc(env *otto.Otto, ses *Session, obj *otto.Object) func(otto.FunctionC limitParsed, _ := call.Argument(0).ToInteger() limit = int(limitParsed) } - runIteratorWithCallback(it, ses, callback, call, limit) + s.runIteratorWithCallback(it, callback, call, limit) return otto.NullValue() } } -func tagsToValueMap(m map[string]graph.Value, ses *Session) map[string]string { +func (s *Session) tagsToValueMap(m map[string]graph.Value) map[string]string { outputMap := make(map[string]string) for k, v := range m { - outputMap[k] = ses.ts.NameOf(v) + outputMap[k] = s.ts.NameOf(v) } return outputMap } -func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string]string { +func (s *Session) runIteratorToArray(it graph.Iterator, limit int) []map[string]string { output := make([]map[string]string, 0) count := 0 it, _ = it.Optimize() for { select { - case <-ses.kill: + case <-s.kill: return nil default: } @@ -160,20 +160,20 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string } tags := make(map[string]graph.Value) it.TagResults(tags) - output = append(output, tagsToValueMap(tags, ses)) + output = append(output, s.tagsToValueMap(tags)) count++ if limit >= 0 && count >= limit { break } for it.NextPath() { select { - case <-ses.kill: + case <-s.kill: return nil default: } tags := make(map[string]graph.Value) it.TagResults(tags) - output = append(output, tagsToValueMap(tags, ses)) + output = append(output, s.tagsToValueMap(tags)) count++ if limit >= 0 && count >= limit { break @@ -184,20 +184,20 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string return output } -func runIteratorToArrayNoTags(it graph.Iterator, ses *Session, limit int) []string { +func (s *Session) runIteratorToArrayNoTags(it graph.Iterator, limit int) []string { output := make([]string, 0) count := 0 it, _ = it.Optimize() for { select { - case <-ses.kill: + case <-s.kill: return nil default: } if !graph.Next(it) { break } - output = append(output, ses.ts.NameOf(it.Result())) + output = append(output, s.ts.NameOf(it.Result())) count++ if limit >= 0 && count >= limit { break @@ -207,13 +207,13 @@ func runIteratorToArrayNoTags(it graph.Iterator, ses *Session, limit int) []stri return output } -func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Value, this otto.FunctionCall, limit int) { +func (s *Session) runIteratorWithCallback(it graph.Iterator, callback otto.Value, this otto.FunctionCall, limit int) { count := 0 it, _ = it.Optimize() glog.V(2).Infoln(it.DebugString(0)) for { select { - case <-ses.kill: + case <-s.kill: return default: } @@ -222,7 +222,7 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu } tags := make(map[string]graph.Value) it.TagResults(tags) - val, _ := this.Otto.ToValue(tagsToValueMap(tags, ses)) + val, _ := this.Otto.ToValue(s.tagsToValueMap(tags)) val, _ = callback.Call(this.This, val) count++ if limit >= 0 && count >= limit { @@ -230,13 +230,13 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu } for it.NextPath() { select { - case <-ses.kill: + case <-s.kill: return default: } tags := make(map[string]graph.Value) it.TagResults(tags) - val, _ := this.Otto.ToValue(tagsToValueMap(tags, ses)) + val, _ := this.Otto.ToValue(s.tagsToValueMap(tags)) val, _ = callback.Call(this.This, val) count++ if limit >= 0 && count >= limit { @@ -247,16 +247,16 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu it.Close() } -func runIteratorOnSession(it graph.Iterator, ses *Session) { - if ses.wantShape { - iterator.OutputQueryShapeForIterator(it, ses.ts, ses.shape) +func (s *Session) runIterator(it graph.Iterator) { + if s.wantShape { + iterator.OutputQueryShapeForIterator(it, s.ts, s.shape) return } it, _ = it.Optimize() glog.V(2).Infoln(it.DebugString(0)) for { select { - case <-ses.kill: + case <-s.kill: return default: } @@ -265,18 +265,18 @@ func runIteratorOnSession(it graph.Iterator, ses *Session) { } tags := make(map[string]graph.Value) it.TagResults(tags) - if !ses.SendResult(&Result{actualResults: &tags}) { + if !s.SendResult(&Result{actualResults: &tags}) { break } for it.NextPath() { select { - case <-ses.kill: + case <-s.kill: return default: } tags := make(map[string]graph.Value) it.TagResults(tags) - if !ses.SendResult(&Result{actualResults: &tags}) { + if !s.SendResult(&Result{actualResults: &tags}) { break } } diff --git a/query/gremlin/session.go b/query/gremlin/session.go index d1c84b5..30cd913 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -54,7 +54,7 @@ func NewSession(ts graph.TripleStore, timeout time.Duration, persist bool) *Sess limit: -1, timeout: timeout, } - g.env = BuildEnviron(&g) + g.env = g.setup(otto.New()) if persist { g.emptyEnv = g.env } diff --git a/query/gremlin/traversals.go b/query/gremlin/traversals.go index 32d4490..ff258dc 100644 --- a/query/gremlin/traversals.go +++ b/query/gremlin/traversals.go @@ -21,123 +21,122 @@ import ( "github.com/robertkrimen/otto" ) -func embedTraversals(env *otto.Otto, ses *Session, obj *otto.Object) { - obj.Set("In", gremlinFunc("in", obj, env, ses)) - obj.Set("Out", gremlinFunc("out", obj, env, ses)) - obj.Set("Is", gremlinFunc("is", obj, env, ses)) - obj.Set("Both", gremlinFunc("both", obj, env, ses)) - obj.Set("Follow", gremlinFunc("follow", obj, env, ses)) - obj.Set("FollowR", gremlinFollowR("followr", obj, env, ses)) - obj.Set("And", gremlinFunc("and", obj, env, ses)) - obj.Set("Intersect", gremlinFunc("and", obj, env, ses)) - obj.Set("Union", gremlinFunc("or", obj, env, ses)) - obj.Set("Or", gremlinFunc("or", obj, env, ses)) - obj.Set("Back", gremlinBack("back", obj, env, ses)) - obj.Set("Tag", gremlinFunc("tag", obj, env, ses)) - obj.Set("As", gremlinFunc("tag", obj, env, ses)) - obj.Set("Has", gremlinFunc("has", obj, env, ses)) - obj.Set("Save", gremlinFunc("save", obj, env, ses)) - obj.Set("SaveR", gremlinFunc("saver", obj, env, ses)) +func (s *Session) embedTraversals(env *otto.Otto, obj *otto.Object) { + obj.Set("In", gremlinFunc("in", obj, env, s)) + obj.Set("Out", gremlinFunc("out", obj, env, s)) + obj.Set("Is", gremlinFunc("is", obj, env, s)) + obj.Set("Both", gremlinFunc("both", obj, env, s)) + obj.Set("Follow", gremlinFunc("follow", obj, env, s)) + obj.Set("FollowR", gremlinFollowR("followr", obj, env, s)) + obj.Set("And", gremlinFunc("and", obj, env, s)) + obj.Set("Intersect", gremlinFunc("and", obj, env, s)) + obj.Set("Union", gremlinFunc("or", obj, env, s)) + obj.Set("Or", gremlinFunc("or", obj, env, s)) + obj.Set("Back", gremlinBack("back", obj, env, s)) + obj.Set("Tag", gremlinFunc("tag", obj, env, s)) + obj.Set("As", gremlinFunc("tag", obj, env, s)) + obj.Set("Has", gremlinFunc("has", obj, env, s)) + obj.Set("Save", gremlinFunc("save", obj, env, s)) + obj.Set("SaveR", gremlinFunc("saver", obj, env, s)) } -func gremlinFunc(kind string, prevObj *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value { +func gremlinFunc(kind string, prev *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { call.Otto.Run("var out = {}") out, _ := call.Otto.Object("out") out.Set("_gremlin_type", kind) out.Set("_gremlin_values", call.ArgumentList) - out.Set("_gremlin_prev", prevObj) - outStrings := concatStringArgs(call) - if len(*outStrings) > 0 { - out.Set("string_args", *outStrings) + out.Set("_gremlin_prev", prev) + args := argsOf(call) + if len(args) > 0 { + out.Set("string_args", args) } - embedTraversals(env, ses, out) + ses.embedTraversals(env, out) if isVertexChain(call.This.Object()) { - embedFinals(env, ses, out) + ses.embedFinals(env, out) } return out.Value() } } -func gremlinBack(kind string, prevObj *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value { +func gremlinBack(kind string, prev *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { call.Otto.Run("var out = {}") out, _ := call.Otto.Object("out") out.Set("_gremlin_type", kind) out.Set("_gremlin_values", call.ArgumentList) - outStrings := concatStringArgs(call) - if len(*outStrings) > 0 { - out.Set("string_args", *outStrings) + args := argsOf(call) + if len(args) > 0 { + out.Set("string_args", args) } var otherChain *otto.Object var thisObj *otto.Object - if len(*outStrings) != 0 { - otherChain, thisObj = reverseGremlinChainTo(call.Otto, prevObj, (*outStrings)[0].(string)) + if len(args) != 0 { + otherChain, thisObj = reverseGremlinChainTo(call.Otto, prev, args[0]) } else { - otherChain, thisObj = reverseGremlinChainTo(call.Otto, prevObj, "") + otherChain, thisObj = reverseGremlinChainTo(call.Otto, prev, "") } out.Set("_gremlin_prev", thisObj) out.Set("_gremlin_back_chain", otherChain) - embedTraversals(env, ses, out) + ses.embedTraversals(env, out) if isVertexChain(call.This.Object()) { - embedFinals(env, ses, out) + ses.embedFinals(env, out) } return out.Value() } } -func gremlinFollowR(kind string, prevObj *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value { +func gremlinFollowR(kind string, prev *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { call.Otto.Run("var out = {}") out, _ := call.Otto.Object("out") out.Set("_gremlin_type", kind) out.Set("_gremlin_values", call.ArgumentList) - outStrings := concatStringArgs(call) - if len(*outStrings) > 0 { - out.Set("string_args", *outStrings) + args := argsOf(call) + if len(args) > 0 { + out.Set("string_args", args) } if len(call.ArgumentList) == 0 { - return prevObj.Value() + return prev.Value() } arg := call.Argument(0) if isVertexChain(arg.Object()) { - return prevObj.Value() + return prev.Value() } newChain, _ := reverseGremlinChainTo(call.Otto, arg.Object(), "") - out.Set("_gremlin_prev", prevObj) + out.Set("_gremlin_prev", prev) out.Set("_gremlin_followr", newChain) - embedTraversals(env, ses, out) + ses.embedTraversals(env, out) if isVertexChain(call.This.Object()) { - embedFinals(env, ses, out) + ses.embedFinals(env, out) } return out.Value() - } } -func reverseGremlinChainTo(env *otto.Otto, prevObj *otto.Object, tag string) (*otto.Object, *otto.Object) { +func reverseGremlinChainTo(env *otto.Otto, prev *otto.Object, tag string) (*otto.Object, *otto.Object) { env.Run("var _base_object = {}") base, err := env.Object("_base_object") if err != nil { glog.Error(err) return otto.NullValue().Object(), otto.NullValue().Object() } - if isVertexChain(prevObj) { + if isVertexChain(prev) { base.Set("_gremlin_type", "vertex") } else { base.Set("_gremlin_type", "morphism") } - return reverseGremlinChainHelper(env, prevObj, base, tag) + return reverseGremlinChainHelper(env, prev, base, tag) } func reverseGremlinChainHelper(env *otto.Otto, chain *otto.Object, newBase *otto.Object, tag string) (*otto.Object, *otto.Object) { kindVal, _ := chain.Get("_gremlin_type") - kind, _ := kindVal.ToString() + kind := kindVal.String() if tag != "" { if kind == "tag" { - tags := getStringArgs(chain) + tags := propertiesOf(chain, "string_args") for _, t := range tags { if t == tag { return newBase, chain @@ -174,8 +173,7 @@ func reverseGremlinChainHelper(env *otto.Otto, chain *otto.Object, newBase *otto func debugChain(obj *otto.Object) bool { val, _ := obj.Get("_gremlin_type") - x, _ := val.ToString() - glog.V(2).Infoln(x) + glog.V(2).Infoln(val) val, _ = obj.Get("_gremlin_prev") if val.IsObject() { return debugChain(val.Object()) From 8df21cd8d93282083bcf54311d5dec2c55fb1a8b Mon Sep 17 00:00:00 2001 From: kortschak Date: Tue, 26 Aug 2014 10:12:44 +0930 Subject: [PATCH 2/7] Refactor work out into worker type --- query/gremlin/build_iterator.go | 4 +- query/gremlin/environ.go | 112 ++++++++++++++++++----------- query/gremlin/finals.go | 154 +++++++++++++++++++++++----------------- query/gremlin/session.go | 105 ++++++++++----------------- query/gremlin/traversals.go | 52 +++++++------- 5 files changed, 224 insertions(+), 203 deletions(-) diff --git a/query/gremlin/build_iterator.go b/query/gremlin/build_iterator.go index c2a9108..02196f4 100644 --- a/query/gremlin/build_iterator.go +++ b/query/gremlin/build_iterator.go @@ -46,8 +46,8 @@ func stringsFrom(obj *otto.Object) []string { lengthValue, _ := obj.Get("length") length, _ := lengthValue.ToInteger() ulength := uint32(length) - for index := uint32(0); index < ulength; index += 1 { - name := strconv.FormatInt(int64(index), 10) + for i := uint32(0); i < ulength; i++ { + name := strconv.FormatInt(int64(i), 10) value, err := obj.Get(name) if err != nil || !value.IsString() { continue diff --git a/query/gremlin/environ.go b/query/gremlin/environ.go index cdbbdb4..6402bab 100644 --- a/query/gremlin/environ.go +++ b/query/gremlin/environ.go @@ -17,10 +17,80 @@ package gremlin // Builds a new Gremlin environment pointing at a session. import ( + "sync" + "github.com/barakmich/glog" "github.com/robertkrimen/otto" + + "github.com/google/cayley/graph" ) +type worker struct { + ts graph.TripleStore + env *otto.Otto + envLock sync.Mutex + + results chan interface{} + shape map[string]interface{} + + count int + limit int + + kill chan struct{} +} + +func newWorker(ts graph.TripleStore) *worker { + env := otto.New() + wk := &worker{ + ts: ts, + env: env, + limit: -1, + } + graph, _ := env.Object("graph = {}") + env.Run("g = graph") + + graph.Set("Vertex", func(call otto.FunctionCall) otto.Value { + call.Otto.Run("var out = {}") + out, err := call.Otto.Object("out") + if err != nil { + glog.Error(err.Error()) + return otto.TrueValue() + } + out.Set("_gremlin_type", "vertex") + args := argsOf(call) + if len(args) > 0 { + out.Set("string_args", args) + } + wk.embedTraversals(env, out) + wk.embedFinals(env, out) + return out.Value() + }) + env.Run("graph.V = graph.Vertex") + + graph.Set("Morphism", func(call otto.FunctionCall) otto.Value { + call.Otto.Run("var out = {}") + out, _ := call.Otto.Object("out") + out.Set("_gremlin_type", "morphism") + wk.embedTraversals(env, out) + return out.Value() + }) + env.Run("graph.M = graph.Morphism") + + graph.Set("Emit", func(call otto.FunctionCall) otto.Value { + value := call.Argument(0) + if value.IsDefined() { + wk.send(&Result{val: &value}) + } + return otto.NullValue() + }) + + return wk +} + +func (wk *worker) wantShape() bool { + return wk.shape != nil +} + func argsOf(call otto.FunctionCall) []string { var out []string for _, arg := range call.ArgumentList { @@ -48,45 +118,3 @@ func isVertexChain(obj *otto.Object) bool { } return false } - -func (s *Session) setup(env *otto.Otto) *otto.Otto { - graph, _ := env.Object("graph = {}") - env.Run("g = graph") - - graph.Set("Vertex", func(call otto.FunctionCall) otto.Value { - call.Otto.Run("var out = {}") - out, err := call.Otto.Object("out") - if err != nil { - glog.Error(err.Error()) - return otto.TrueValue() - } - out.Set("_gremlin_type", "vertex") - args := argsOf(call) - if len(args) > 0 { - out.Set("string_args", args) - } - s.embedTraversals(env, out) - s.embedFinals(env, out) - return out.Value() - }) - env.Run("graph.V = graph.Vertex") - - graph.Set("Morphism", func(call otto.FunctionCall) otto.Value { - call.Otto.Run("var out = {}") - out, _ := call.Otto.Object("out") - out.Set("_gremlin_type", "morphism") - s.embedTraversals(env, out) - return out.Value() - }) - env.Run("graph.M = graph.Morphism") - - graph.Set("Emit", func(call otto.FunctionCall) otto.Value { - value := call.Argument(0) - if value.IsDefined() { - s.SendResult(&Result{val: &value}) - } - return otto.NullValue() - }) - - return env -} diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index 1e9f6ed..c1d7ca7 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -26,45 +26,45 @@ import ( const TopResultTag = "id" -func (s *Session) embedFinals(env *otto.Otto, obj *otto.Object) { - obj.Set("All", s.allFunc(env, obj)) - obj.Set("GetLimit", s.limitFunc(env, obj)) - obj.Set("ToArray", s.toArrayFunc(env, obj, false)) - obj.Set("ToValue", s.toValueFunc(env, obj, false)) - obj.Set("TagArray", s.toArrayFunc(env, obj, true)) - obj.Set("TagValue", s.toValueFunc(env, obj, true)) - obj.Set("Map", s.mapFunc(env, obj)) - obj.Set("ForEach", s.mapFunc(env, obj)) +func (wk *worker) embedFinals(env *otto.Otto, obj *otto.Object) { + obj.Set("All", wk.allFunc(env, obj)) + obj.Set("GetLimit", wk.limitFunc(env, obj)) + obj.Set("ToArray", wk.toArrayFunc(env, obj, false)) + obj.Set("ToValue", wk.toValueFunc(env, obj, false)) + obj.Set("TagArray", wk.toArrayFunc(env, obj, true)) + obj.Set("TagValue", wk.toValueFunc(env, obj, true)) + obj.Set("Map", wk.mapFunc(env, obj)) + obj.Set("ForEach", wk.mapFunc(env, obj)) } -func (s *Session) allFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value { +func (wk *worker) allFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { - it := buildIteratorTree(obj, s.ts) + it := buildIteratorTree(obj, wk.ts) it.Tagger().Add(TopResultTag) - s.limit = -1 - s.count = 0 - s.runIterator(it) + wk.limit = -1 + wk.count = 0 + wk.runIterator(it) return otto.NullValue() } } -func (s *Session) limitFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value { +func (wk *worker) limitFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { if len(call.ArgumentList) > 0 { limitVal, _ := call.Argument(0).ToInteger() - it := buildIteratorTree(obj, s.ts) + it := buildIteratorTree(obj, wk.ts) it.Tagger().Add(TopResultTag) - s.limit = int(limitVal) - s.count = 0 - s.runIterator(it) + wk.limit = int(limitVal) + wk.count = 0 + wk.runIterator(it) } return otto.NullValue() } } -func (s *Session) toArrayFunc(env *otto.Otto, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value { +func (wk *worker) toArrayFunc(env *otto.Otto, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { - it := buildIteratorTree(obj, s.ts) + it := buildIteratorTree(obj, wk.ts) it.Tagger().Add(TopResultTag) limit := -1 if len(call.ArgumentList) > 0 { @@ -74,10 +74,10 @@ func (s *Session) toArrayFunc(env *otto.Otto, obj *otto.Object, withTags bool) f var val otto.Value var err error if !withTags { - array := s.runIteratorToArrayNoTags(it, limit) + array := wk.runIteratorToArrayNoTags(it, limit) val, err = call.Otto.ToValue(array) } else { - array := s.runIteratorToArray(it, limit) + array := wk.runIteratorToArray(it, limit) val, err = call.Otto.ToValue(array) } @@ -89,21 +89,21 @@ func (s *Session) toArrayFunc(env *otto.Otto, obj *otto.Object, withTags bool) f } } -func (s *Session) toValueFunc(env *otto.Otto, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value { +func (wk *worker) toValueFunc(env *otto.Otto, obj *otto.Object, withTags bool) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { - it := buildIteratorTree(obj, s.ts) + it := buildIteratorTree(obj, wk.ts) it.Tagger().Add(TopResultTag) limit := 1 var val otto.Value var err error if !withTags { - array := s.runIteratorToArrayNoTags(it, limit) + array := wk.runIteratorToArrayNoTags(it, limit) if len(array) < 1 { return otto.NullValue() } val, err = call.Otto.ToValue(array[0]) } else { - array := s.runIteratorToArray(it, limit) + array := wk.runIteratorToArray(it, limit) if len(array) < 1 { return otto.NullValue() } @@ -119,9 +119,9 @@ func (s *Session) toValueFunc(env *otto.Otto, obj *otto.Object, withTags bool) f } } -func (s *Session) mapFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value { +func (wk *worker) mapFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { - it := buildIteratorTree(obj, s.ts) + it := buildIteratorTree(obj, wk.ts) it.Tagger().Add(TopResultTag) limit := -1 if len(call.ArgumentList) == 0 { @@ -132,26 +132,26 @@ func (s *Session) mapFunc(env *otto.Otto, obj *otto.Object) func(otto.FunctionCa limitParsed, _ := call.Argument(0).ToInteger() limit = int(limitParsed) } - s.runIteratorWithCallback(it, callback, call, limit) + wk.runIteratorWithCallback(it, callback, call, limit) return otto.NullValue() } } -func (s *Session) tagsToValueMap(m map[string]graph.Value) map[string]string { +func (wk *worker) tagsToValueMap(m map[string]graph.Value) map[string]string { outputMap := make(map[string]string) for k, v := range m { - outputMap[k] = s.ts.NameOf(v) + outputMap[k] = wk.ts.NameOf(v) } return outputMap } -func (s *Session) runIteratorToArray(it graph.Iterator, limit int) []map[string]string { +func (wk *worker) runIteratorToArray(it graph.Iterator, limit int) []map[string]string { output := make([]map[string]string, 0) - count := 0 + n := 0 it, _ = it.Optimize() for { select { - case <-s.kill: + case <-wk.kill: return nil default: } @@ -160,22 +160,22 @@ func (s *Session) runIteratorToArray(it graph.Iterator, limit int) []map[string] } tags := make(map[string]graph.Value) it.TagResults(tags) - output = append(output, s.tagsToValueMap(tags)) - count++ - if limit >= 0 && count >= limit { + output = append(output, wk.tagsToValueMap(tags)) + n++ + if limit >= 0 && n >= limit { break } for it.NextPath() { select { - case <-s.kill: + case <-wk.kill: return nil default: } tags := make(map[string]graph.Value) it.TagResults(tags) - output = append(output, s.tagsToValueMap(tags)) - count++ - if limit >= 0 && count >= limit { + output = append(output, wk.tagsToValueMap(tags)) + n++ + if limit >= 0 && n >= limit { break } } @@ -184,22 +184,22 @@ func (s *Session) runIteratorToArray(it graph.Iterator, limit int) []map[string] return output } -func (s *Session) runIteratorToArrayNoTags(it graph.Iterator, limit int) []string { +func (wk *worker) runIteratorToArrayNoTags(it graph.Iterator, limit int) []string { output := make([]string, 0) - count := 0 + n := 0 it, _ = it.Optimize() for { select { - case <-s.kill: + case <-wk.kill: return nil default: } if !graph.Next(it) { break } - output = append(output, s.ts.NameOf(it.Result())) - count++ - if limit >= 0 && count >= limit { + output = append(output, wk.ts.NameOf(it.Result())) + n++ + if limit >= 0 && n >= limit { break } } @@ -207,13 +207,13 @@ func (s *Session) runIteratorToArrayNoTags(it graph.Iterator, limit int) []strin return output } -func (s *Session) runIteratorWithCallback(it graph.Iterator, callback otto.Value, this otto.FunctionCall, limit int) { - count := 0 +func (wk *worker) runIteratorWithCallback(it graph.Iterator, callback otto.Value, this otto.FunctionCall, limit int) { + n := 0 it, _ = it.Optimize() glog.V(2).Infoln(it.DebugString(0)) for { select { - case <-s.kill: + case <-wk.kill: return default: } @@ -222,24 +222,24 @@ func (s *Session) runIteratorWithCallback(it graph.Iterator, callback otto.Value } tags := make(map[string]graph.Value) it.TagResults(tags) - val, _ := this.Otto.ToValue(s.tagsToValueMap(tags)) + val, _ := this.Otto.ToValue(wk.tagsToValueMap(tags)) val, _ = callback.Call(this.This, val) - count++ - if limit >= 0 && count >= limit { + n++ + if limit >= 0 && n >= limit { break } for it.NextPath() { select { - case <-s.kill: + case <-wk.kill: return default: } tags := make(map[string]graph.Value) it.TagResults(tags) - val, _ := this.Otto.ToValue(s.tagsToValueMap(tags)) + val, _ := this.Otto.ToValue(wk.tagsToValueMap(tags)) val, _ = callback.Call(this.This, val) - count++ - if limit >= 0 && count >= limit { + n++ + if limit >= 0 && n >= limit { break } } @@ -247,16 +247,40 @@ func (s *Session) runIteratorWithCallback(it graph.Iterator, callback otto.Value it.Close() } -func (s *Session) runIterator(it graph.Iterator) { - if s.wantShape { - iterator.OutputQueryShapeForIterator(it, s.ts, s.shape) +func (wk *worker) send(r *Result) bool { + if wk.limit >= 0 && wk.limit == wk.count { + return false + } + wk.envLock.Lock() + kill := wk.kill + wk.envLock.Unlock() + select { + case <-kill: + return false + default: + } + if wk.results != nil { + wk.results <- r + wk.count++ + if wk.limit >= 0 && wk.limit == wk.count { + return false + } else { + return true + } + } + return false +} + +func (wk *worker) runIterator(it graph.Iterator) { + if wk.wantShape() { + iterator.OutputQueryShapeForIterator(it, wk.ts, wk.shape) return } it, _ = it.Optimize() glog.V(2).Infoln(it.DebugString(0)) for { select { - case <-s.kill: + case <-wk.kill: return default: } @@ -265,18 +289,18 @@ func (s *Session) runIterator(it graph.Iterator) { } tags := make(map[string]graph.Value) it.TagResults(tags) - if !s.SendResult(&Result{actualResults: &tags}) { + if !wk.send(&Result{actualResults: &tags}) { break } for it.NextPath() { select { - case <-s.kill: + case <-wk.kill: return default: } tags := make(map[string]graph.Value) it.TagResults(tags) - if !s.SendResult(&Result{actualResults: &tags}) { + if !wk.send(&Result{actualResults: &tags}) { break } } diff --git a/query/gremlin/session.go b/query/gremlin/session.go index 30cd913..beac46f 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -18,7 +18,6 @@ import ( "errors" "fmt" "sort" - "sync" "time" "github.com/robertkrimen/otto" @@ -31,32 +30,27 @@ import ( var ErrKillTimeout = errors.New("query timed out") type Session struct { - ts graph.TripleStore - results chan interface{} - env *otto.Otto - envLock sync.Mutex + ts graph.TripleStore + + wk *worker + timeout time.Duration + script *otto.Script + persist *otto.Otto + debug bool - limit int - count int dataOutput []interface{} - wantShape bool - shape map[string]interface{} - err error - script *otto.Script - kill chan struct{} - timeout time.Duration - emptyEnv *otto.Otto + + err error } func NewSession(ts graph.TripleStore, timeout time.Duration, persist bool) *Session { g := Session{ ts: ts, - limit: -1, + wk: newWorker(ts), timeout: timeout, } - g.env = g.setup(otto.New()) if persist { - g.emptyEnv = g.env + g.persist = g.wk.env } return &g } @@ -74,15 +68,14 @@ func (s *Session) ToggleDebug() { func (s *Session) GetQuery(input string, out chan map[string]interface{}) { defer close(out) - s.shape = make(map[string]interface{}) - s.wantShape = true - s.env.Run(input) - out <- s.shape - s.shape = nil + s.wk.shape = make(map[string]interface{}) + s.wk.env.Run(input) + out <- s.wk.shape + s.wk.shape = nil } func (s *Session) InputParses(input string) (query.ParseResult, error) { - script, err := s.env.Compile("", input) + script, err := s.wk.env.Compile("", input) if err != nil { return query.ParseFail, err } @@ -90,30 +83,6 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) { return query.Parsed, nil } -func (s *Session) SendResult(r *Result) bool { - if s.limit >= 0 && s.limit == s.count { - return false - } - s.envLock.Lock() - kill := s.kill - s.envLock.Unlock() - select { - case <-kill: - return false - default: - } - if s.results != nil { - s.results <- r - s.count++ - if s.limit >= 0 && s.limit == s.count { - return false - } else { - return true - } - } - return false -} - func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { defer func() { if r := recover(); r != nil { @@ -126,7 +95,7 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { }() // Use buffered chan to prevent blocking. - s.env.Interrupt = make(chan func(), 1) + s.wk.env.Interrupt = make(chan func(), 1) ready := make(chan struct{}) done := make(chan struct{}) @@ -138,27 +107,27 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { case <-done: return default: - close(s.kill) - s.envLock.Lock() - defer s.envLock.Unlock() - s.kill = nil - if s.env != nil { - s.env.Interrupt <- func() { + close(s.wk.kill) + s.wk.envLock.Lock() + defer s.wk.envLock.Unlock() + s.wk.kill = nil + if s.wk.env != nil { + s.wk.env.Interrupt <- func() { panic(ErrKillTimeout) } - s.env = s.emptyEnv + s.wk.env = s.persist } return } }() } - s.envLock.Lock() - env := s.env - if s.kill == nil { - s.kill = make(chan struct{}) + s.wk.envLock.Lock() + env := s.wk.env + if s.wk.kill == nil { + s.wk.kill = make(chan struct{}) } - s.envLock.Unlock() + s.wk.envLock.Unlock() close(ready) out, err := env.Run(input) close(done) @@ -168,7 +137,7 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { func (s *Session) ExecInput(input string, out chan interface{}, limit int) { defer close(out) s.err = nil - s.results = out + s.wk.results = out var err error var value otto.Value if s.script == nil { @@ -181,11 +150,11 @@ func (s *Session) ExecInput(input string, out chan interface{}, limit int) { err: err, val: &value, } - s.results = nil + s.wk.results = nil s.script = nil - s.envLock.Lock() - s.env = s.emptyEnv - s.envLock.Unlock() + s.wk.envLock.Lock() + s.wk.env = s.persist + s.wk.envLock.Unlock() } func (s *Session) ToText(result interface{}) string { @@ -273,9 +242,9 @@ func (s *Session) GetJson() ([]interface{}, error) { if s.err != nil { return nil, s.err } - s.envLock.Lock() - kill := s.kill - s.envLock.Unlock() + s.wk.envLock.Lock() + kill := s.wk.kill + s.wk.envLock.Unlock() select { case <-kill: return nil, ErrKillTimeout diff --git a/query/gremlin/traversals.go b/query/gremlin/traversals.go index ff258dc..a013b01 100644 --- a/query/gremlin/traversals.go +++ b/query/gremlin/traversals.go @@ -21,26 +21,26 @@ import ( "github.com/robertkrimen/otto" ) -func (s *Session) embedTraversals(env *otto.Otto, obj *otto.Object) { - obj.Set("In", gremlinFunc("in", obj, env, s)) - obj.Set("Out", gremlinFunc("out", obj, env, s)) - obj.Set("Is", gremlinFunc("is", obj, env, s)) - obj.Set("Both", gremlinFunc("both", obj, env, s)) - obj.Set("Follow", gremlinFunc("follow", obj, env, s)) - obj.Set("FollowR", gremlinFollowR("followr", obj, env, s)) - obj.Set("And", gremlinFunc("and", obj, env, s)) - obj.Set("Intersect", gremlinFunc("and", obj, env, s)) - obj.Set("Union", gremlinFunc("or", obj, env, s)) - obj.Set("Or", gremlinFunc("or", obj, env, s)) - obj.Set("Back", gremlinBack("back", obj, env, s)) - obj.Set("Tag", gremlinFunc("tag", obj, env, s)) - obj.Set("As", gremlinFunc("tag", obj, env, s)) - obj.Set("Has", gremlinFunc("has", obj, env, s)) - obj.Set("Save", gremlinFunc("save", obj, env, s)) - obj.Set("SaveR", gremlinFunc("saver", obj, env, s)) +func (wk *worker) embedTraversals(env *otto.Otto, obj *otto.Object) { + obj.Set("In", wk.gremlinFunc("in", obj, env)) + obj.Set("Out", wk.gremlinFunc("out", obj, env)) + obj.Set("Is", wk.gremlinFunc("is", obj, env)) + obj.Set("Both", wk.gremlinFunc("both", obj, env)) + obj.Set("Follow", wk.gremlinFunc("follow", obj, env)) + obj.Set("FollowR", wk.gremlinFollowR("followr", obj, env)) + obj.Set("And", wk.gremlinFunc("and", obj, env)) + obj.Set("Intersect", wk.gremlinFunc("and", obj, env)) + obj.Set("Union", wk.gremlinFunc("or", obj, env)) + obj.Set("Or", wk.gremlinFunc("or", obj, env)) + obj.Set("Back", wk.gremlinBack("back", obj, env)) + obj.Set("Tag", wk.gremlinFunc("tag", obj, env)) + obj.Set("As", wk.gremlinFunc("tag", obj, env)) + obj.Set("Has", wk.gremlinFunc("has", obj, env)) + obj.Set("Save", wk.gremlinFunc("save", obj, env)) + obj.Set("SaveR", wk.gremlinFunc("saver", obj, env)) } -func gremlinFunc(kind string, prev *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value { +func (wk *worker) gremlinFunc(kind string, prev *otto.Object, env *otto.Otto) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { call.Otto.Run("var out = {}") out, _ := call.Otto.Object("out") @@ -51,15 +51,15 @@ func gremlinFunc(kind string, prev *otto.Object, env *otto.Otto, ses *Session) f if len(args) > 0 { out.Set("string_args", args) } - ses.embedTraversals(env, out) + wk.embedTraversals(env, out) if isVertexChain(call.This.Object()) { - ses.embedFinals(env, out) + wk.embedFinals(env, out) } return out.Value() } } -func gremlinBack(kind string, prev *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value { +func (wk *worker) gremlinBack(kind string, prev *otto.Object, env *otto.Otto) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { call.Otto.Run("var out = {}") out, _ := call.Otto.Object("out") @@ -78,16 +78,16 @@ func gremlinBack(kind string, prev *otto.Object, env *otto.Otto, ses *Session) f } out.Set("_gremlin_prev", thisObj) out.Set("_gremlin_back_chain", otherChain) - ses.embedTraversals(env, out) + wk.embedTraversals(env, out) if isVertexChain(call.This.Object()) { - ses.embedFinals(env, out) + wk.embedFinals(env, out) } return out.Value() } } -func gremlinFollowR(kind string, prev *otto.Object, env *otto.Otto, ses *Session) func(otto.FunctionCall) otto.Value { +func (wk *worker) gremlinFollowR(kind string, prev *otto.Object, env *otto.Otto) func(otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value { call.Otto.Run("var out = {}") out, _ := call.Otto.Object("out") @@ -107,9 +107,9 @@ func gremlinFollowR(kind string, prev *otto.Object, env *otto.Otto, ses *Session newChain, _ := reverseGremlinChainTo(call.Otto, arg.Object(), "") out.Set("_gremlin_prev", prev) out.Set("_gremlin_followr", newChain) - ses.embedTraversals(env, out) + wk.embedTraversals(env, out) if isVertexChain(call.This.Object()) { - ses.embedFinals(env, out) + wk.embedFinals(env, out) } return out.Value() } From ab685cfe04993ba35e719228f8eba0667046a1f7 Mon Sep 17 00:00:00 2001 From: kortschak Date: Tue, 26 Aug 2014 10:17:44 +0930 Subject: [PATCH 3/7] Mark limit as unused in gremlin and SEXP Used in SEXP. Currently there appears to be no way to limit the number of query returns from MQL. --- query/gremlin/session.go | 2 +- query/mql/session.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/query/gremlin/session.go b/query/gremlin/session.go index beac46f..2d1c359 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -134,7 +134,7 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { return out, err } -func (s *Session) ExecInput(input string, out chan interface{}, limit int) { +func (s *Session) ExecInput(input string, out chan interface{}, _ int) { defer close(out) s.err = nil s.wk.results = out diff --git a/query/mql/session.go b/query/mql/session.go index 63aa6ba..305caeb 100644 --- a/query/mql/session.go +++ b/query/mql/session.go @@ -72,7 +72,7 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) { return query.Parsed, nil } -func (s *Session) ExecInput(input string, c chan interface{}, limit int) { +func (s *Session) ExecInput(input string, c chan interface{}, _ int) { defer close(c) var mqlQuery interface{} err := json.Unmarshal([]byte(input), &mqlQuery) From 62e7037f2056c0e43d2a33fa4216a45510a15878 Mon Sep 17 00:00:00 2001 From: kortschak Date: Tue, 26 Aug 2014 11:19:22 +0930 Subject: [PATCH 4/7] Remove unnecessary indirection --- query/gremlin/finals.go | 4 ++-- query/gremlin/session.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index c1d7ca7..c4813c5 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -289,7 +289,7 @@ func (wk *worker) runIterator(it graph.Iterator) { } tags := make(map[string]graph.Value) it.TagResults(tags) - if !wk.send(&Result{actualResults: &tags}) { + if !wk.send(&Result{actualResults: tags}) { break } for it.NextPath() { @@ -300,7 +300,7 @@ func (wk *worker) runIterator(it graph.Iterator) { } tags := make(map[string]graph.Value) it.TagResults(tags) - if !wk.send(&Result{actualResults: &tags}) { + if !wk.send(&Result{actualResults: tags}) { break } } diff --git a/query/gremlin/session.go b/query/gremlin/session.go index 2d1c359..852dcce 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -59,7 +59,7 @@ type Result struct { metaresult bool err error val *otto.Value - actualResults *map[string]graph.Value + actualResults map[string]graph.Value } func (s *Session) ToggleDebug() { @@ -179,9 +179,9 @@ func (s *Session) ToText(result interface{}) string { out = fmt.Sprintln("****") if data.val == nil { tags := data.actualResults - tagKeys := make([]string, len(*tags)) + tagKeys := make([]string, len(tags)) i := 0 - for k, _ := range *tags { + for k, _ := range tags { tagKeys[i] = k i++ } @@ -190,7 +190,7 @@ func (s *Session) ToText(result interface{}) string { if k == "$_" { continue } - out += fmt.Sprintf("%s : %s\n", k, s.ts.NameOf((*tags)[k])) + out += fmt.Sprintf("%s : %s\n", k, s.ts.NameOf(tags[k])) } } else { if data.val.IsObject() { @@ -214,15 +214,15 @@ func (s *Session) BuildJson(result interface{}) { if data.val == nil { obj := make(map[string]string) tags := data.actualResults - tagKeys := make([]string, len(*tags)) + tagKeys := make([]string, len(tags)) i := 0 - for k, _ := range *tags { + for k, _ := range tags { tagKeys[i] = k i++ } sort.Strings(tagKeys) for _, k := range tagKeys { - obj[k] = s.ts.NameOf((*tags)[k]) + obj[k] = s.ts.NameOf(tags[k]) } s.dataOutput = append(s.dataOutput, obj) } else { From 73dbfc9461cc78fdb18c04e817513ed0bdc3c994 Mon Sep 17 00:00:00 2001 From: kortschak Date: Tue, 26 Aug 2014 14:20:37 +0930 Subject: [PATCH 5/7] Fix worker termination handling Fixes issue #102. --- query/gremlin/environ.go | 8 ++++---- query/gremlin/finals.go | 5 +---- query/gremlin/session.go | 50 ++++++++++++++++++++---------------------------- 3 files changed, 26 insertions(+), 37 deletions(-) diff --git a/query/gremlin/environ.go b/query/gremlin/environ.go index 6402bab..fb1d296 100644 --- a/query/gremlin/environ.go +++ b/query/gremlin/environ.go @@ -26,9 +26,9 @@ import ( ) type worker struct { - ts graph.TripleStore - env *otto.Otto - envLock sync.Mutex + ts graph.TripleStore + env *otto.Otto + sync.Mutex results chan interface{} shape map[string]interface{} @@ -36,7 +36,7 @@ type worker struct { count int limit int - kill chan struct{} + kill <-chan struct{} } func newWorker(ts graph.TripleStore) *worker { diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index c4813c5..8b91c45 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -251,11 +251,8 @@ func (wk *worker) send(r *Result) bool { if wk.limit >= 0 && wk.limit == wk.count { return false } - wk.envLock.Lock() - kill := wk.kill - wk.envLock.Unlock() select { - case <-kill: + case <-wk.kill: return false default: } diff --git a/query/gremlin/session.go b/query/gremlin/session.go index 852dcce..380d8d3 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -33,10 +33,12 @@ type Session struct { ts graph.TripleStore wk *worker - timeout time.Duration script *otto.Script persist *otto.Otto + timeout time.Duration + kill chan struct{} + debug bool dataOutput []interface{} @@ -84,10 +86,12 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) { } func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { + wk := s.wk defer func() { if r := recover(); r != nil { if r == ErrKillTimeout { s.err = ErrKillTimeout + wk.env = s.persist return } panic(r) @@ -95,43 +99,34 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { }() // Use buffered chan to prevent blocking. - s.wk.env.Interrupt = make(chan func(), 1) + wk.env.Interrupt = make(chan func(), 1) + s.kill = make(chan struct{}) + wk.kill = s.kill - ready := make(chan struct{}) done := make(chan struct{}) + defer close(done) if s.timeout >= 0 { go func() { time.Sleep(s.timeout) - <-ready select { case <-done: - return default: - close(s.wk.kill) - s.wk.envLock.Lock() - defer s.wk.envLock.Unlock() - s.wk.kill = nil - if s.wk.env != nil { - s.wk.env.Interrupt <- func() { + close(s.kill) + wk.Lock() + if wk.env != nil { + wk.env.Interrupt <- func() { panic(ErrKillTimeout) } - s.wk.env = s.persist } - return + wk.Unlock() } }() } - s.wk.envLock.Lock() - env := s.wk.env - if s.wk.kill == nil { - s.wk.kill = make(chan struct{}) - } - s.wk.envLock.Unlock() - close(ready) - out, err := env.Run(input) - close(done) - return out, err + wk.Lock() + env := wk.env + wk.Unlock() + return env.Run(input) } func (s *Session) ExecInput(input string, out chan interface{}, _ int) { @@ -152,9 +147,9 @@ func (s *Session) ExecInput(input string, out chan interface{}, _ int) { } s.wk.results = nil s.script = nil - s.wk.envLock.Lock() + s.wk.Lock() s.wk.env = s.persist - s.wk.envLock.Unlock() + s.wk.Unlock() } func (s *Session) ToText(result interface{}) string { @@ -242,11 +237,8 @@ func (s *Session) GetJson() ([]interface{}, error) { if s.err != nil { return nil, s.err } - s.wk.envLock.Lock() - kill := s.wk.kill - s.wk.envLock.Unlock() select { - case <-kill: + case <-s.kill: return nil, ErrKillTimeout default: return s.dataOutput, nil From 1faa8b172738b69ca3d5cec289105f5c27eb9f8d Mon Sep 17 00:00:00 2001 From: kortschak Date: Tue, 26 Aug 2014 14:30:10 +0930 Subject: [PATCH 6/7] Fix build --- query/gremlin/gremlin_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/query/gremlin/gremlin_test.go b/query/gremlin/gremlin_test.go index 566305a..6d9f0cf 100644 --- a/query/gremlin/gremlin_test.go +++ b/query/gremlin/gremlin_test.go @@ -257,7 +257,7 @@ func runQueryGetTag(g []quad.Quad, query string, tag string) []string { for res := range c { data := res.(*Result) if data.val == nil { - val := (*data.actualResults)[tag] + val := data.actualResults[tag] if val != nil { results = append(results, js.ts.NameOf(val)) } From 4345604a39be759267ca3d6ac1d21f8b85eefd81 Mon Sep 17 00:00:00 2001 From: kortschak Date: Tue, 26 Aug 2014 14:36:42 +0930 Subject: [PATCH 7/7] Inconsequential whitespace changes --- query/gremlin/finals.go | 1 - query/gremlin/traversals.go | 1 - 2 files changed, 2 deletions(-) diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index 8b91c45..64aa3fe 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -115,7 +115,6 @@ func (wk *worker) toValueFunc(env *otto.Otto, obj *otto.Object, withTags bool) f } else { return val } - } } diff --git a/query/gremlin/traversals.go b/query/gremlin/traversals.go index a013b01..2a2250b 100644 --- a/query/gremlin/traversals.go +++ b/query/gremlin/traversals.go @@ -83,7 +83,6 @@ func (wk *worker) gremlinBack(kind string, prev *otto.Object, env *otto.Otto) fu wk.embedFinals(env, out) } return out.Value() - } }