diff --git a/cayley_appengine.cfg b/cayley_appengine.cfg index 2ecd797..a762b5f 100644 --- a/cayley_appengine.cfg +++ b/cayley_appengine.cfg @@ -3,5 +3,5 @@ "db_path": "30kmoviedata.nq.gz", "read_only": true, "load_size": 10000, -"gremlin_timeout": 10 +"timeout": 10 } diff --git a/cayley_test.go b/cayley_test.go index af77610..9bda12a 100644 --- a/cayley_test.go +++ b/cayley_test.go @@ -17,6 +17,7 @@ package main import ( "sync" "testing" + "time" "github.com/google/cayley/config" "github.com/google/cayley/db" @@ -292,9 +293,9 @@ var m2_actors = movie2.Save("name","movie2").Follow(filmToActor) var ( once sync.Once cfg = &config.Config{ - DatabasePath: "30kmoviedata.nq.gz", - DatabaseType: "memstore", - GremlinTimeout: 300, + DatabasePath: "30kmoviedata.nq.gz", + DatabaseType: "memstore", + Timeout: 300 * time.Second, } ts graph.TripleStore @@ -316,7 +317,7 @@ func TestQueries(t *testing.T) { if testing.Short() && test.long { continue } - ses := gremlin.NewSession(ts, cfg.GremlinTimeout, true) + ses := gremlin.NewSession(ts, cfg.Timeout, true) _, err := ses.InputParses(test.query) if err != nil { t.Fatalf("Failed to parse benchmark gremlin %s: %v", test.message, err) @@ -333,7 +334,7 @@ func TestQueries(t *testing.T) { if j == nil && err == nil { continue } - if err != nil && err.Error() == "Query Timeout" { + if err == gremlin.ErrKillTimeout { timedOut = true continue } @@ -357,7 +358,7 @@ func runBench(n int, b *testing.B) { b.Skip() } prepare(b) - ses := gremlin.NewSession(ts, cfg.GremlinTimeout, true) + ses := gremlin.NewSession(ts, cfg.Timeout, true) _, err := ses.InputParses(benchmarkQueries[n].query) if err != nil { b.Fatalf("Failed to parse benchmark gremlin %s: %v", benchmarkQueries[n].message, err) diff --git a/config/config.go b/config/config.go index d521043..d8cb4d8 100644 --- a/config/config.go +++ b/config/config.go @@ -17,29 +17,112 @@ package config import ( "encoding/json" "flag" + "fmt" "os" + "strconv" + "time" "github.com/barakmich/glog" ) type Config struct { + DatabaseType string + DatabasePath string + DatabaseOptions map[string]interface{} + ListenHost string + ListenPort string + ReadOnly bool + Timeout time.Duration + LoadSize int +} + +type config struct { DatabaseType string `json:"database"` DatabasePath string `json:"db_path"` DatabaseOptions map[string]interface{} `json:"db_options"` ListenHost string `json:"listen_host"` ListenPort string `json:"listen_port"` ReadOnly bool `json:"read_only"` - GremlinTimeout int `json:"gremlin_timeout"` + Timeout duration `json:"timeout"` LoadSize int `json:"load_size"` } -var databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.") -var databaseBackend = flag.String("db", "memstore", "Database Backend.") -var host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).") -var loadSize = flag.Int("load_size", 10000, "Size of triplesets to load") -var port = flag.String("port", "64210", "Port to listen on.") -var readOnly = flag.Bool("read_only", false, "Disable writing via HTTP.") -var gremlinTimeout = flag.Int("gremlin_timeout", 30, "Number of seconds until an individual query times out.") +func (c *Config) UnmarshalJSON(data []byte) error { + var t config + err := json.Unmarshal(data, &t) + if err != nil { + return err + } + *c = Config{ + DatabaseType: t.DatabaseType, + DatabasePath: t.DatabasePath, + DatabaseOptions: t.DatabaseOptions, + ListenHost: t.ListenHost, + ListenPort: t.ListenPort, + ReadOnly: t.ReadOnly, + Timeout: time.Duration(t.Timeout), + LoadSize: t.LoadSize, + } + return nil +} + +func (c *Config) MarshalJSON() ([]byte, error) { + return json.Marshal(config{ + DatabaseType: c.DatabaseType, + DatabasePath: c.DatabasePath, + DatabaseOptions: c.DatabaseOptions, + ListenHost: c.ListenHost, + ListenPort: c.ListenPort, + ReadOnly: c.ReadOnly, + Timeout: duration(c.Timeout), + LoadSize: c.LoadSize, + }) +} + +// duration is a time.Duration that satisfies the +// json.UnMarshaler and json.Marshaler interfaces. +type duration time.Duration + +// UnmarshalJSON unmarshals a duration according to the following scheme: +// * If the element is absent the duration is zero. +// * If the element is parsable as a time.Duration, the parsed value is kept. +// * If the element is parsable as a number, that number of seconds is kept. +func (d *duration) UnmarshalJSON(data []byte) error { + if len(data) == 0 { + *d = 0 + return nil + } + text := string(data) + t, err := time.ParseDuration(text) + if err == nil { + *d = duration(t) + return nil + } + i, err := strconv.ParseInt(text, 10, 64) + if err == nil { + *d = duration(time.Duration(i) * time.Second) + return nil + } + // This hack is to get around strconv.ParseFloat + // not handling e-notation for integers. + f, err := strconv.ParseFloat(text, 64) + *d = duration(time.Duration(f) * time.Second) + return err +} + +func (d *duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%q", *d)), nil +} + +var ( + databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.") + databaseBackend = flag.String("db", "memstore", "Database Backend.") + host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).") + loadSize = flag.Int("load_size", 10000, "Size of triplesets to load") + port = flag.String("port", "64210", "Port to listen on.") + readOnly = flag.Bool("read_only", false, "Disable writing via HTTP.") + timeout = flag.Duration("timeout", 30*time.Second, "Elapsed time until an individual query times out.") +) func ParseConfigFromFile(filename string) *Config { config := &Config{} @@ -100,8 +183,8 @@ func ParseConfigFromFlagsAndFile(fileFlag string) *Config { config.ListenPort = *port } - if config.GremlinTimeout == 0 { - config.GremlinTimeout = *gremlinTimeout + if config.Timeout == 0 { + config.Timeout = *timeout } if config.LoadSize == 0 { diff --git a/db/repl.go b/db/repl.go index 2ef9429..c1bb5ea 100644 --- a/db/repl.go +++ b/db/repl.go @@ -72,7 +72,7 @@ func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) error case "gremlin": fallthrough default: - ses = gremlin.NewSession(ts, cfg.GremlinTimeout, true) + ses = gremlin.NewSession(ts, cfg.Timeout, true) } buf := bufio.NewReader(os.Stdin) var line []byte diff --git a/docs/Configuration.md b/docs/Configuration.md index 792b476..b464adb 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -72,12 +72,12 @@ All command line flags take precedence over the configuration file. ## Language Options -#### **`gremlin_timeout`** +#### **`timeout`** - * Type: Integer + * Type: Integer or String * Default: 30 -The value in seconds of the maximum length of time the Javascript runtime should run until cancelling the query and returning a 408 Timeout. A negative value means no limit. +The maximum length of time the Javascript runtime should run until cancelling the query and returning a 408 Timeout. When timeout is an integer is is interpretted as seconds, when it is a string it is [parsed](http://golang.org/pkg/time/#ParseDuration) as a Go time.Duration. A negative duration means no limit. ## Per-Database Options diff --git a/http/query.go b/http/query.go index e8b5d72..f4f5a34 100644 --- a/http/query.go +++ b/http/query.go @@ -71,7 +71,7 @@ func (api *Api) ServeV1Query(w http.ResponseWriter, r *http.Request, params http var ses query.HttpSession switch params.ByName("query_lang") { case "gremlin": - ses = gremlin.NewSession(api.ts, api.config.GremlinTimeout, false) + ses = gremlin.NewSession(api.ts, api.config.Timeout, false) case "mql": ses = mql.NewSession(api.ts) default: @@ -119,7 +119,7 @@ func (api *Api) ServeV1Shape(w http.ResponseWriter, r *http.Request, params http var ses query.HttpSession switch params.ByName("query_lang") { case "gremlin": - ses = gremlin.NewSession(api.ts, api.config.GremlinTimeout, false) + ses = gremlin.NewSession(api.ts, api.config.Timeout, false) case "mql": ses = mql.NewSession(api.ts) default: diff --git a/query/gremlin/environ.go b/query/gremlin/environ.go index 503ad4f..7918a21 100644 --- a/query/gremlin/environ.go +++ b/query/gremlin/environ.go @@ -84,7 +84,7 @@ func setupGremlin(env *otto.Otto, ses *Session) { graph.Set("Emit", func(call otto.FunctionCall) otto.Value { value := call.Argument(0) if value.IsDefined() { - ses.SendResult(&GremlinResult{metaresult: false, err: "", val: &value, actualResults: nil}) + ses.SendResult(&Result{val: &value}) } return otto.NullValue() }) diff --git a/query/gremlin/finals.go b/query/gremlin/finals.go index 022a394..916cb26 100644 --- a/query/gremlin/finals.go +++ b/query/gremlin/finals.go @@ -148,8 +148,10 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string count := 0 it, _ = it.Optimize() for { - if ses.doHalt { + select { + case <-ses.kill: return nil + default: } _, ok := graph.Next(it) if !ok { @@ -163,8 +165,10 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string break } for it.NextResult() == true { - if ses.doHalt { + select { + case <-ses.kill: return nil + default: } tags := make(map[string]graph.Value) it.TagResults(tags) @@ -184,8 +188,10 @@ func runIteratorToArrayNoTags(it graph.Iterator, ses *Session, limit int) []stri count := 0 it, _ = it.Optimize() for { - if ses.doHalt { + select { + case <-ses.kill: return nil + default: } val, ok := graph.Next(it) if !ok { @@ -205,8 +211,10 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu count := 0 it, _ = it.Optimize() for { - if ses.doHalt { + select { + case <-ses.kill: return + default: } _, ok := graph.Next(it) if !ok { @@ -221,8 +229,10 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu break } for it.NextResult() == true { - if ses.doHalt { + select { + case <-ses.kill: return + default: } tags := make(map[string]graph.Value) it.TagResults(tags) @@ -238,16 +248,17 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu } func runIteratorOnSession(it graph.Iterator, ses *Session) { - if ses.lookingForQueryShape { - iterator.OutputQueryShapeForIterator(it, ses.ts, ses.queryShape) + if ses.wantShape { + iterator.OutputQueryShapeForIterator(it, ses.ts, ses.shape) return } it, _ = it.Optimize() glog.V(2).Infoln(it.DebugString(0)) for { - // TODO(barakmich): Better halting. - if ses.doHalt { + select { + case <-ses.kill: return + default: } _, ok := graph.Next(it) if !ok { @@ -255,18 +266,18 @@ func runIteratorOnSession(it graph.Iterator, ses *Session) { } tags := make(map[string]graph.Value) it.TagResults(tags) - cont := ses.SendResult(&GremlinResult{metaresult: false, err: "", val: nil, actualResults: &tags}) - if !cont { + if !ses.SendResult(&Result{actualResults: &tags}) { break } for it.NextResult() == true { - if ses.doHalt { + select { + case <-ses.kill: return + default: } tags := make(map[string]graph.Value) it.TagResults(tags) - cont := ses.SendResult(&GremlinResult{metaresult: false, err: "", val: nil, actualResults: &tags}) - if !cont { + if !ses.SendResult(&Result{actualResults: &tags}) { break } } diff --git a/query/gremlin/gremlin_test.go b/query/gremlin/gremlin_test.go index 453ff81..dbd115a 100644 --- a/query/gremlin/gremlin_test.go +++ b/query/gremlin/gremlin_test.go @@ -20,8 +20,9 @@ import ( "testing" "github.com/google/cayley/graph" - _ "github.com/google/cayley/graph/memstore" "github.com/google/cayley/quad" + + _ "github.com/google/cayley/graph/memstore" ) // This is a simple test graph. @@ -252,7 +253,7 @@ func runQueryGetTag(g []*quad.Quad, query string, tag string) []string { var results []string for res := range c { - data := res.(*GremlinResult) + data := res.(*Result) if data.val == nil { val := (*data.actualResults)[tag] if val != nil { diff --git a/query/gremlin/session.go b/query/gremlin/session.go index 00532b8..8000c17 100644 --- a/query/gremlin/session.go +++ b/query/gremlin/session.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "sort" + "sync" "time" "github.com/robertkrimen/otto" @@ -26,45 +27,42 @@ import ( "github.com/google/cayley/query" ) +var ErrKillTimeout = errors.New("query timed out") + type Session struct { - ts graph.TripleStore - currentChannel chan interface{} - env *otto.Otto - debug bool - limit int - count int - dataOutput []interface{} - lookingForQueryShape bool - queryShape map[string]interface{} - err error - script *otto.Script - doHalt bool - timeoutSec time.Duration - emptyEnv *otto.Otto + ts graph.TripleStore + results chan interface{} + env *otto.Otto + envLock sync.Mutex + debug bool + limit int + count int + dataOutput []interface{} + wantShape bool + shape map[string]interface{} + err error + script *otto.Script + kill chan struct{} + timeout time.Duration + emptyEnv *otto.Otto } -func NewSession(inputTripleStore graph.TripleStore, timeoutSec int, persist bool) *Session { - var g Session - g.ts = inputTripleStore +func NewSession(ts graph.TripleStore, timeout time.Duration, persist bool) *Session { + g := Session{ + ts: ts, + limit: -1, + timeout: timeout, + } g.env = BuildEnviron(&g) - g.limit = -1 - g.count = 0 - g.lookingForQueryShape = false if persist { g.emptyEnv = g.env } - if timeoutSec < 0 { - g.timeoutSec = time.Duration(-1) - } else { - g.timeoutSec = time.Duration(timeoutSec) - } - g.ClearJson() return &g } -type GremlinResult struct { +type Result struct { metaresult bool - err string + err error val *otto.Value actualResults *map[string]graph.Value } @@ -73,13 +71,13 @@ func (s *Session) ToggleDebug() { s.debug = !s.debug } -func (s *Session) GetQuery(input string, output_struct chan map[string]interface{}) { - defer close(output_struct) - s.queryShape = make(map[string]interface{}) - s.lookingForQueryShape = true +func (s *Session) GetQuery(input string, out chan map[string]interface{}) { + defer close(out) + s.shape = make(map[string]interface{}) + s.wantShape = true s.env.Run(input) - output_struct <- s.queryShape - s.queryShape = nil + out <- s.shape + s.shape = nil } func (s *Session) InputParses(input string) (query.ParseResult, error) { @@ -91,15 +89,17 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) { return query.Parsed, nil } -func (s *Session) SendResult(result *GremlinResult) bool { +func (s *Session) SendResult(r *Result) bool { if s.limit >= 0 && s.limit == s.count { return false } - if s.doHalt { + select { + case <-s.kill: return false + default: } - if s.currentChannel != nil { - s.currentChannel <- result + if s.results != nil { + s.results <- r s.count++ if s.limit >= 0 && s.limit == s.count { return false @@ -110,42 +110,46 @@ func (s *Session) SendResult(result *GremlinResult) bool { return false } -var halt = errors.New("Query Timeout") - func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { - s.doHalt = false + s.kill = make(chan struct{}) defer func() { - if caught := recover(); caught != nil { - if caught == halt { - s.err = halt + if r := recover(); r != nil { + if r == ErrKillTimeout { + s.err = ErrKillTimeout return } - panic(caught) // Something else happened, repanic! + panic(r) } }() - s.env.Interrupt = make(chan func(), 1) // The buffer prevents blocking + // Use buffered chan to prevent blocking. + s.env.Interrupt = make(chan func(), 1) - if s.timeoutSec != -1 { + if s.timeout >= 0 { go func() { - time.Sleep(s.timeoutSec * time.Second) // Stop after two seconds - s.doHalt = true + time.Sleep(s.timeout) + close(s.kill) + s.envLock.Lock() + defer s.envLock.Unlock() if s.env != nil { s.env.Interrupt <- func() { - panic(halt) + panic(ErrKillTimeout) } s.env = s.emptyEnv } }() } - return s.env.Run(input) // Here be dragons (risky code) + s.envLock.Lock() + env := s.env + s.envLock.Unlock() + return env.Run(input) } func (s *Session) ExecInput(input string, out chan interface{}, limit int) { defer close(out) s.err = nil - s.currentChannel = out + s.results = out var err error var value otto.Value if s.script == nil { @@ -153,28 +157,23 @@ func (s *Session) ExecInput(input string, out chan interface{}, limit int) { } else { value, err = s.runUnsafe(s.script) } - if err != nil { - out <- &GremlinResult{metaresult: true, - err: err.Error(), - val: &value, - actualResults: nil} - } else { - out <- &GremlinResult{metaresult: true, - err: "", - val: &value, - actualResults: nil} + out <- &Result{ + metaresult: true, + err: err, + val: &value, } - s.currentChannel = nil + s.results = nil s.script = nil + s.envLock.Lock() s.env = s.emptyEnv - return + s.envLock.Unlock() } func (s *Session) ToText(result interface{}) string { - data := result.(*GremlinResult) + data := result.(*Result) if data.metaresult { - if data.err != "" { - return fmt.Sprintln("Error: ", data.err) + if data.err != nil { + return fmt.Sprintf("Error: %v\n", data.err) } if data.val != nil { s, _ := data.val.Export() @@ -221,8 +220,8 @@ func (s *Session) ToText(result interface{}) string { } // Web stuff -func (ses *Session) BuildJson(result interface{}) { - data := result.(*GremlinResult) +func (s *Session) BuildJson(result interface{}) { + data := result.(*Result) if !data.metaresult { if data.val == nil { obj := make(map[string]string) @@ -235,33 +234,34 @@ func (ses *Session) BuildJson(result interface{}) { } sort.Strings(tagKeys) for _, k := range tagKeys { - obj[k] = ses.ts.NameOf((*tags)[k]) + obj[k] = s.ts.NameOf((*tags)[k]) } - ses.dataOutput = append(ses.dataOutput, obj) + s.dataOutput = append(s.dataOutput, obj) } else { if data.val.IsObject() { export, _ := data.val.Export() - ses.dataOutput = append(ses.dataOutput, export) + s.dataOutput = append(s.dataOutput, export) } else { strVersion, _ := data.val.ToString() - ses.dataOutput = append(ses.dataOutput, strVersion) + s.dataOutput = append(s.dataOutput, strVersion) } } } - } -func (ses *Session) GetJson() ([]interface{}, error) { - defer ses.ClearJson() - if ses.err != nil { - return nil, ses.err +func (s *Session) GetJson() ([]interface{}, error) { + defer s.ClearJson() + if s.err != nil { + return nil, s.err } - if ses.doHalt { - return nil, halt + select { + case <-s.kill: + return nil, ErrKillTimeout + default: + return s.dataOutput, nil } - return ses.dataOutput, nil } -func (ses *Session) ClearJson() { - ses.dataOutput = nil +func (s *Session) ClearJson() { + s.dataOutput = nil }