diff --git a/query/gremlin/environ.go b/query/gremlin/environ.go index 6402bab..fb1d296 100644 --- a/query/gremlin/environ.go +++ b/query/gremlin/environ.go @@ -26,9 +26,9 @@ import ( ) type worker struct { - ts graph.TripleStore - env *otto.Otto - envLock sync.Mutex + ts graph.TripleStore + env *otto.Otto + sync.Mutex results chan interface{} shape map[string]interface{} @@ -36,7 +36,7 @@ type worker struct { count int limit int - kill chan struct{} + kill <-chan struct{} } func newWorker(ts graph.TripleStore) *worker { diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index c4813c5..8b91c45 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -251,11 +251,8 @@ func (wk *worker) send(r *Result) bool { if wk.limit >= 0 && wk.limit == wk.count { return false } - wk.envLock.Lock() - kill := wk.kill - wk.envLock.Unlock() select { - case <-kill: + case <-wk.kill: return false default: } diff --git a/query/gremlin/session.go b/query/gremlin/session.go index 852dcce..380d8d3 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -33,10 +33,12 @@ type Session struct { ts graph.TripleStore wk *worker - timeout time.Duration script *otto.Script persist *otto.Otto + timeout time.Duration + kill chan struct{} + debug bool dataOutput []interface{} @@ -84,10 +86,12 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) { } func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { + wk := s.wk defer func() { if r := recover(); r != nil { if r == ErrKillTimeout { s.err = ErrKillTimeout + wk.env = s.persist return } panic(r) @@ -95,43 +99,34 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { }() // Use buffered chan to prevent blocking. - s.wk.env.Interrupt = make(chan func(), 1) + wk.env.Interrupt = make(chan func(), 1) + s.kill = make(chan struct{}) + wk.kill = s.kill - ready := make(chan struct{}) done := make(chan struct{}) + defer close(done) if s.timeout >= 0 { go func() { time.Sleep(s.timeout) - <-ready select { case <-done: - return default: - close(s.wk.kill) - s.wk.envLock.Lock() - defer s.wk.envLock.Unlock() - s.wk.kill = nil - if s.wk.env != nil { - s.wk.env.Interrupt <- func() { + close(s.kill) + wk.Lock() + if wk.env != nil { + wk.env.Interrupt <- func() { panic(ErrKillTimeout) } - s.wk.env = s.persist } - return + wk.Unlock() } }() } - s.wk.envLock.Lock() - env := s.wk.env - if s.wk.kill == nil { - s.wk.kill = make(chan struct{}) - } - s.wk.envLock.Unlock() - close(ready) - out, err := env.Run(input) - close(done) - return out, err + wk.Lock() + env := wk.env + wk.Unlock() + return env.Run(input) } func (s *Session) ExecInput(input string, out chan interface{}, _ int) { @@ -152,9 +147,9 @@ func (s *Session) ExecInput(input string, out chan interface{}, _ int) { } s.wk.results = nil s.script = nil - s.wk.envLock.Lock() + s.wk.Lock() s.wk.env = s.persist - s.wk.envLock.Unlock() + s.wk.Unlock() } func (s *Session) ToText(result interface{}) string { @@ -242,11 +237,8 @@ func (s *Session) GetJson() ([]interface{}, error) { if s.err != nil { return nil, s.err } - s.wk.envLock.Lock() - kill := s.wk.kill - s.wk.envLock.Unlock() select { - case <-kill: + case <-s.kill: return nil, ErrKillTimeout default: return s.dataOutput, nil