Refactor work out into worker type
This commit is contained in:
parent
95170eb8ed
commit
8df21cd8d9
5 changed files with 224 additions and 203 deletions
|
|
@ -18,7 +18,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/robertkrimen/otto"
|
||||
|
|
@ -31,32 +30,27 @@ import (
|
|||
var ErrKillTimeout = errors.New("query timed out")
|
||||
|
||||
type Session struct {
|
||||
ts graph.TripleStore
|
||||
results chan interface{}
|
||||
env *otto.Otto
|
||||
envLock sync.Mutex
|
||||
ts graph.TripleStore
|
||||
|
||||
wk *worker
|
||||
timeout time.Duration
|
||||
script *otto.Script
|
||||
persist *otto.Otto
|
||||
|
||||
debug bool
|
||||
limit int
|
||||
count int
|
||||
dataOutput []interface{}
|
||||
wantShape bool
|
||||
shape map[string]interface{}
|
||||
err error
|
||||
script *otto.Script
|
||||
kill chan struct{}
|
||||
timeout time.Duration
|
||||
emptyEnv *otto.Otto
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func NewSession(ts graph.TripleStore, timeout time.Duration, persist bool) *Session {
|
||||
g := Session{
|
||||
ts: ts,
|
||||
limit: -1,
|
||||
wk: newWorker(ts),
|
||||
timeout: timeout,
|
||||
}
|
||||
g.env = g.setup(otto.New())
|
||||
if persist {
|
||||
g.emptyEnv = g.env
|
||||
g.persist = g.wk.env
|
||||
}
|
||||
return &g
|
||||
}
|
||||
|
|
@ -74,15 +68,14 @@ func (s *Session) ToggleDebug() {
|
|||
|
||||
func (s *Session) GetQuery(input string, out chan map[string]interface{}) {
|
||||
defer close(out)
|
||||
s.shape = make(map[string]interface{})
|
||||
s.wantShape = true
|
||||
s.env.Run(input)
|
||||
out <- s.shape
|
||||
s.shape = nil
|
||||
s.wk.shape = make(map[string]interface{})
|
||||
s.wk.env.Run(input)
|
||||
out <- s.wk.shape
|
||||
s.wk.shape = nil
|
||||
}
|
||||
|
||||
func (s *Session) InputParses(input string) (query.ParseResult, error) {
|
||||
script, err := s.env.Compile("", input)
|
||||
script, err := s.wk.env.Compile("", input)
|
||||
if err != nil {
|
||||
return query.ParseFail, err
|
||||
}
|
||||
|
|
@ -90,30 +83,6 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) {
|
|||
return query.Parsed, nil
|
||||
}
|
||||
|
||||
func (s *Session) SendResult(r *Result) bool {
|
||||
if s.limit >= 0 && s.limit == s.count {
|
||||
return false
|
||||
}
|
||||
s.envLock.Lock()
|
||||
kill := s.kill
|
||||
s.envLock.Unlock()
|
||||
select {
|
||||
case <-kill:
|
||||
return false
|
||||
default:
|
||||
}
|
||||
if s.results != nil {
|
||||
s.results <- r
|
||||
s.count++
|
||||
if s.limit >= 0 && s.limit == s.count {
|
||||
return false
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
|
|
@ -126,7 +95,7 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
|
|||
}()
|
||||
|
||||
// Use buffered chan to prevent blocking.
|
||||
s.env.Interrupt = make(chan func(), 1)
|
||||
s.wk.env.Interrupt = make(chan func(), 1)
|
||||
|
||||
ready := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
|
|
@ -138,27 +107,27 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
|
|||
case <-done:
|
||||
return
|
||||
default:
|
||||
close(s.kill)
|
||||
s.envLock.Lock()
|
||||
defer s.envLock.Unlock()
|
||||
s.kill = nil
|
||||
if s.env != nil {
|
||||
s.env.Interrupt <- func() {
|
||||
close(s.wk.kill)
|
||||
s.wk.envLock.Lock()
|
||||
defer s.wk.envLock.Unlock()
|
||||
s.wk.kill = nil
|
||||
if s.wk.env != nil {
|
||||
s.wk.env.Interrupt <- func() {
|
||||
panic(ErrKillTimeout)
|
||||
}
|
||||
s.env = s.emptyEnv
|
||||
s.wk.env = s.persist
|
||||
}
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
s.envLock.Lock()
|
||||
env := s.env
|
||||
if s.kill == nil {
|
||||
s.kill = make(chan struct{})
|
||||
s.wk.envLock.Lock()
|
||||
env := s.wk.env
|
||||
if s.wk.kill == nil {
|
||||
s.wk.kill = make(chan struct{})
|
||||
}
|
||||
s.envLock.Unlock()
|
||||
s.wk.envLock.Unlock()
|
||||
close(ready)
|
||||
out, err := env.Run(input)
|
||||
close(done)
|
||||
|
|
@ -168,7 +137,7 @@ func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
|
|||
func (s *Session) ExecInput(input string, out chan interface{}, limit int) {
|
||||
defer close(out)
|
||||
s.err = nil
|
||||
s.results = out
|
||||
s.wk.results = out
|
||||
var err error
|
||||
var value otto.Value
|
||||
if s.script == nil {
|
||||
|
|
@ -181,11 +150,11 @@ func (s *Session) ExecInput(input string, out chan interface{}, limit int) {
|
|||
err: err,
|
||||
val: &value,
|
||||
}
|
||||
s.results = nil
|
||||
s.wk.results = nil
|
||||
s.script = nil
|
||||
s.envLock.Lock()
|
||||
s.env = s.emptyEnv
|
||||
s.envLock.Unlock()
|
||||
s.wk.envLock.Lock()
|
||||
s.wk.env = s.persist
|
||||
s.wk.envLock.Unlock()
|
||||
}
|
||||
|
||||
func (s *Session) ToText(result interface{}) string {
|
||||
|
|
@ -273,9 +242,9 @@ func (s *Session) GetJson() ([]interface{}, error) {
|
|||
if s.err != nil {
|
||||
return nil, s.err
|
||||
}
|
||||
s.envLock.Lock()
|
||||
kill := s.kill
|
||||
s.envLock.Unlock()
|
||||
s.wk.envLock.Lock()
|
||||
kill := s.wk.kill
|
||||
s.wk.envLock.Unlock()
|
||||
select {
|
||||
case <-kill:
|
||||
return nil, ErrKillTimeout
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue