diff --git a/query/gremlin/environ.go b/query/gremlin/environ.go index 9e00594..670b6b6 100644 --- a/query/gremlin/environ.go +++ b/query/gremlin/environ.go @@ -84,7 +84,7 @@ func setupGremlin(env *otto.Otto, ses *Session) { graph.Set("Emit", func(call otto.FunctionCall) otto.Value { value := call.Argument(0) if value.IsDefined() { - ses.SendResult(&GremlinResult{metaresult: false, err: nil, val: &value, actualResults: nil}) + ses.SendResult(&GremlinResult{val: &value}) } return otto.NullValue() }) diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index a58f5ac..6658502 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -248,8 +248,8 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu } func runIteratorOnSession(it graph.Iterator, ses *Session) { - if ses.lookingForQueryShape { - iterator.OutputQueryShapeForIterator(it, ses.ts, ses.queryShape) + if ses.wantShape { + iterator.OutputQueryShapeForIterator(it, ses.ts, ses.shape) return } it, _ = it.Optimize() @@ -267,8 +267,7 @@ func runIteratorOnSession(it graph.Iterator, ses *Session) { } tags := make(map[string]graph.Value) it.TagResults(tags) - cont := ses.SendResult(&GremlinResult{metaresult: false, err: nil, val: nil, actualResults: &tags}) - if !cont { + if !ses.SendResult(&GremlinResult{actualResults: &tags}) { break } for it.NextResult() == true { @@ -279,8 +278,7 @@ func runIteratorOnSession(it graph.Iterator, ses *Session) { } tags := make(map[string]graph.Value) it.TagResults(tags) - cont := ses.SendResult(&GremlinResult{metaresult: false, err: nil, val: nil, actualResults: &tags}) - if !cont { + if !ses.SendResult(&GremlinResult{actualResults: &tags}) { break } } diff --git a/query/gremlin/session.go b/query/gremlin/session.go index c0684dc..8223c37 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -27,40 +27,36 @@ import ( "github.com/google/cayley/query" ) +var ErrKillTimeout = errors.New("query timed out") + type Session struct { - ts graph.TripleStore - currentChannel chan interface{} - env *otto.Otto - envLock sync.Mutex - debug bool - limit int - count int - dataOutput []interface{} - lookingForQueryShape bool - queryShape map[string]interface{} - err error - script *otto.Script - kill chan struct{} - timeoutSec time.Duration - emptyEnv *otto.Otto + ts graph.TripleStore + results chan interface{} + env *otto.Otto + envLock sync.Mutex + debug bool + limit int + count int + dataOutput []interface{} + wantShape bool + shape map[string]interface{} + err error + script *otto.Script + kill chan struct{} + timeoutSec time.Duration + emptyEnv *otto.Otto } func NewSession(inputTripleStore graph.TripleStore, timeoutSec int, persist bool) *Session { - var g Session - g.ts = inputTripleStore + g := Session{ + ts: inputTripleStore, + limit: -1, + timeoutSec: time.Duration(timeoutSec), + } g.env = BuildEnviron(&g) - g.limit = -1 - g.count = 0 - g.lookingForQueryShape = false if persist { g.emptyEnv = g.env } - if timeoutSec < 0 { - g.timeoutSec = time.Duration(-1) - } else { - g.timeoutSec = time.Duration(timeoutSec) - } - g.ClearJson() return &g } @@ -75,13 +71,13 @@ func (s *Session) ToggleDebug() { s.debug = !s.debug } -func (s *Session) GetQuery(input string, output_struct chan map[string]interface{}) { - defer close(output_struct) - s.queryShape = make(map[string]interface{}) - s.lookingForQueryShape = true +func (s *Session) GetQuery(input string, out chan map[string]interface{}) { + defer close(out) + s.shape = make(map[string]interface{}) + s.wantShape = true s.env.Run(input) - output_struct <- s.queryShape - s.queryShape = nil + out <- s.shape + s.shape = nil } func (s *Session) InputParses(input string) (query.ParseResult, error) { @@ -102,8 +98,8 @@ func (s *Session) SendResult(result *GremlinResult) bool { return false default: } - if s.currentChannel != nil { - s.currentChannel <- result + if s.results != nil { + s.results <- result s.count++ if s.limit >= 0 && s.limit == s.count { return false @@ -114,8 +110,6 @@ func (s *Session) SendResult(result *GremlinResult) bool { return false } -var ErrKillTimeout = errors.New("query timed out") - func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { s.kill = make(chan struct{}) defer func() { @@ -128,11 +122,12 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { } }() - s.env.Interrupt = make(chan func(), 1) // The buffer prevents blocking + // Use buffered chan to prevent blocking. + s.env.Interrupt = make(chan func(), 1) - if s.timeoutSec != -1 { + if s.timeoutSec >= 0 { go func() { - time.Sleep(s.timeoutSec * time.Second) // Stop after two seconds + time.Sleep(s.timeoutSec * time.Second) close(s.kill) s.envLock.Lock() defer s.envLock.Unlock() @@ -147,13 +142,13 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { s.envLock.Lock() defer s.envLock.Unlock() - return s.env.Run(input) // Here be dragons (risky code) + return s.env.Run(input) } func (s *Session) ExecInput(input string, out chan interface{}, limit int) { defer close(out) s.err = nil - s.currentChannel = out + s.results = out var err error var value otto.Value if s.script == nil { @@ -162,24 +157,22 @@ func (s *Session) ExecInput(input string, out chan interface{}, limit int) { value, err = s.runUnsafe(s.script) } out <- &GremlinResult{ - metaresult: true, - err: err, - val: &value, - actualResults: nil, + metaresult: true, + err: err, + val: &value, } - s.currentChannel = nil + s.results = nil s.script = nil s.envLock.Lock() s.env = s.emptyEnv s.envLock.Unlock() - return } func (s *Session) ToText(result interface{}) string { data := result.(*GremlinResult) if data.metaresult { if data.err != nil { - return fmt.Sprintln("Error: ", data.err) + return fmt.Sprintf("Error: %v\n", data.err) } if data.val != nil { s, _ := data.val.Export() @@ -226,7 +219,7 @@ func (s *Session) ToText(result interface{}) string { } // Web stuff -func (ses *Session) BuildJson(result interface{}) { +func (s *Session) BuildJson(result interface{}) { data := result.(*GremlinResult) if !data.metaresult { if data.val == nil { @@ -240,35 +233,34 @@ func (ses *Session) BuildJson(result interface{}) { } sort.Strings(tagKeys) for _, k := range tagKeys { - obj[k] = ses.ts.NameOf((*tags)[k]) + obj[k] = s.ts.NameOf((*tags)[k]) } - ses.dataOutput = append(ses.dataOutput, obj) + s.dataOutput = append(s.dataOutput, obj) } else { if data.val.IsObject() { export, _ := data.val.Export() - ses.dataOutput = append(ses.dataOutput, export) + s.dataOutput = append(s.dataOutput, export) } else { strVersion, _ := data.val.ToString() - ses.dataOutput = append(ses.dataOutput, strVersion) + s.dataOutput = append(s.dataOutput, strVersion) } } } - } -func (ses *Session) GetJson() ([]interface{}, error) { - defer ses.ClearJson() - if ses.err != nil { - return nil, ses.err +func (s *Session) GetJson() ([]interface{}, error) { + defer s.ClearJson() + if s.err != nil { + return nil, s.err } select { - case <-ses.kill: + case <-s.kill: return nil, ErrKillTimeout default: - return ses.dataOutput, nil + return s.dataOutput, nil } } -func (ses *Session) ClearJson() { - ses.dataOutput = nil +func (s *Session) ClearJson() { + s.dataOutput = nil }