From f605e1138dd798625b78c66c37880b64fa89b2e3 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sat, 16 Aug 2014 16:52:09 -0400 Subject: [PATCH 1/2] kortschak's fix --- query/gremlin/session.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/query/gremlin/session.go b/query/gremlin/session.go index 8000c17..c178785 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -111,7 +111,6 @@ func (s *Session) SendResult(r *Result) bool { } func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { - s.kill = make(chan struct{}) defer func() { if r := recover(); r != nil { if r == ErrKillTimeout { @@ -125,9 +124,12 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { // Use buffered chan to prevent blocking. s.env.Interrupt = make(chan func(), 1) + ready := make(chan struct{}) + s.kill = make(chan struct{}) if s.timeout >= 0 { go func() { time.Sleep(s.timeout) + <-ready close(s.kill) s.envLock.Lock() defer s.envLock.Unlock() @@ -143,6 +145,7 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { s.envLock.Lock() env := s.env s.envLock.Unlock() + close(ready) return env.Run(input) } From c1ff6ce1aa584a6354fe29793230d3b9556526c8 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Sat, 16 Aug 2014 18:03:05 -0400 Subject: [PATCH 2/2] lock s.kill, send done signal --- query/gremlin/session.go | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/query/gremlin/session.go b/query/gremlin/session.go index c178785..afd77b6 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -93,8 +93,11 @@ func (s *Session) SendResult(r *Result) bool { if s.limit >= 0 && s.limit == s.count { return false } + s.envLock.Lock() + kill := s.kill + s.envLock.Unlock() select { - case <-s.kill: + case <-kill: return false default: } @@ -125,28 +128,40 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { s.env.Interrupt = make(chan func(), 1) ready := make(chan struct{}) - s.kill = make(chan struct{}) + done := make(chan struct{}) if s.timeout >= 0 { go func() { time.Sleep(s.timeout) <-ready - close(s.kill) - s.envLock.Lock() - defer s.envLock.Unlock() - if s.env != nil { - s.env.Interrupt <- func() { - panic(ErrKillTimeout) + select { + case <-done: + return + default: + close(s.kill) + s.envLock.Lock() + defer s.envLock.Unlock() + s.kill = nil + if s.env != nil { + s.env.Interrupt <- func() { + panic(ErrKillTimeout) + } + s.env = s.emptyEnv } - s.env = s.emptyEnv + return } }() } s.envLock.Lock() env := s.env + if s.kill == nil { + s.kill = make(chan struct{}) + } s.envLock.Unlock() close(ready) - return env.Run(input) + out, err := env.Run(input) + close(done) + return out, err } func (s *Session) ExecInput(input string, out chan interface{}, limit int) { @@ -257,8 +272,11 @@ func (s *Session) GetJson() ([]interface{}, error) { if s.err != nil { return nil, s.err } + s.envLock.Lock() + kill := s.kill + s.envLock.Unlock() select { - case <-s.kill: + case <-kill: return nil, ErrKillTimeout default: return s.dataOutput, nil