Fix worker termination handling

Fixes issue #102.
This commit is contained in:
kortschak 2014-08-26 14:20:37 +09:30
parent 62e7037f20
commit 73dbfc9461
3 changed files with 26 additions and 37 deletions

View file

@ -26,9 +26,9 @@ import (
)
type worker struct {
ts graph.TripleStore
env *otto.Otto
envLock sync.Mutex
ts graph.TripleStore
env *otto.Otto
sync.Mutex
results chan interface{}
shape map[string]interface{}
@ -36,7 +36,7 @@ type worker struct {
count int
limit int
kill chan struct{}
kill <-chan struct{}
}
func newWorker(ts graph.TripleStore) *worker {

View file

@ -251,11 +251,8 @@ func (wk *worker) send(r *Result) bool {
if wk.limit >= 0 && wk.limit == wk.count {
return false
}
wk.envLock.Lock()
kill := wk.kill
wk.envLock.Unlock()
select {
case <-kill:
case <-wk.kill:
return false
default:
}

View file

@ -33,10 +33,12 @@ type Session struct {
ts graph.TripleStore
wk *worker
timeout time.Duration
script *otto.Script
persist *otto.Otto
timeout time.Duration
kill chan struct{}
debug bool
dataOutput []interface{}
@ -84,10 +86,12 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) {
}
func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
wk := s.wk
defer func() {
if r := recover(); r != nil {
if r == ErrKillTimeout {
s.err = ErrKillTimeout
wk.env = s.persist
return
}
panic(r)
@ -95,43 +99,34 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
}()
// Use buffered chan to prevent blocking.
s.wk.env.Interrupt = make(chan func(), 1)
wk.env.Interrupt = make(chan func(), 1)
s.kill = make(chan struct{})
wk.kill = s.kill
ready := make(chan struct{})
done := make(chan struct{})
defer close(done)
if s.timeout >= 0 {
go func() {
time.Sleep(s.timeout)
<-ready
select {
case <-done:
return
default:
close(s.wk.kill)
s.wk.envLock.Lock()
defer s.wk.envLock.Unlock()
s.wk.kill = nil
if s.wk.env != nil {
s.wk.env.Interrupt <- func() {
close(s.kill)
wk.Lock()
if wk.env != nil {
wk.env.Interrupt <- func() {
panic(ErrKillTimeout)
}
s.wk.env = s.persist
}
return
wk.Unlock()
}
}()
}
s.wk.envLock.Lock()
env := s.wk.env
if s.wk.kill == nil {
s.wk.kill = make(chan struct{})
}
s.wk.envLock.Unlock()
close(ready)
out, err := env.Run(input)
close(done)
return out, err
wk.Lock()
env := wk.env
wk.Unlock()
return env.Run(input)
}
func (s *Session) ExecInput(input string, out chan interface{}, _ int) {
@ -152,9 +147,9 @@ func (s *Session) ExecInput(input string, out chan interface{}, _ int) {
}
s.wk.results = nil
s.script = nil
s.wk.envLock.Lock()
s.wk.Lock()
s.wk.env = s.persist
s.wk.envLock.Unlock()
s.wk.Unlock()
}
func (s *Session) ToText(result interface{}) string {
@ -242,11 +237,8 @@ func (s *Session) GetJson() ([]interface{}, error) {
if s.err != nil {
return nil, s.err
}
s.wk.envLock.Lock()
kill := s.wk.kill
s.wk.envLock.Unlock()
select {
case <-kill:
case <-s.kill:
return nil, ErrKillTimeout
default:
return s.dataOutput, nil