Merge pull request #96 from kortschak/gremlin

Gremlin related changes
This commit is contained in:
Barak Michener 2014-08-04 00:36:10 -04:00
commit 9944f0b574
10 changed files with 219 additions and 123 deletions

View file

@ -3,5 +3,5 @@
"db_path": "30kmoviedata.nq.gz", "db_path": "30kmoviedata.nq.gz",
"read_only": true, "read_only": true,
"load_size": 10000, "load_size": 10000,
"gremlin_timeout": 10 "timeout": 10
} }

View file

@ -17,6 +17,7 @@ package main
import ( import (
"sync" "sync"
"testing" "testing"
"time"
"github.com/google/cayley/config" "github.com/google/cayley/config"
"github.com/google/cayley/db" "github.com/google/cayley/db"
@ -292,9 +293,9 @@ var m2_actors = movie2.Save("name","movie2").Follow(filmToActor)
var ( var (
once sync.Once once sync.Once
cfg = &config.Config{ cfg = &config.Config{
DatabasePath: "30kmoviedata.nq.gz", DatabasePath: "30kmoviedata.nq.gz",
DatabaseType: "memstore", DatabaseType: "memstore",
GremlinTimeout: 300, Timeout: 300 * time.Second,
} }
ts graph.TripleStore ts graph.TripleStore
@ -316,7 +317,7 @@ func TestQueries(t *testing.T) {
if testing.Short() && test.long { if testing.Short() && test.long {
continue continue
} }
ses := gremlin.NewSession(ts, cfg.GremlinTimeout, true) ses := gremlin.NewSession(ts, cfg.Timeout, true)
_, err := ses.InputParses(test.query) _, err := ses.InputParses(test.query)
if err != nil { if err != nil {
t.Fatalf("Failed to parse benchmark gremlin %s: %v", test.message, err) t.Fatalf("Failed to parse benchmark gremlin %s: %v", test.message, err)
@ -333,7 +334,7 @@ func TestQueries(t *testing.T) {
if j == nil && err == nil { if j == nil && err == nil {
continue continue
} }
if err != nil && err.Error() == "Query Timeout" { if err == gremlin.ErrKillTimeout {
timedOut = true timedOut = true
continue continue
} }
@ -357,7 +358,7 @@ func runBench(n int, b *testing.B) {
b.Skip() b.Skip()
} }
prepare(b) prepare(b)
ses := gremlin.NewSession(ts, cfg.GremlinTimeout, true) ses := gremlin.NewSession(ts, cfg.Timeout, true)
_, err := ses.InputParses(benchmarkQueries[n].query) _, err := ses.InputParses(benchmarkQueries[n].query)
if err != nil { if err != nil {
b.Fatalf("Failed to parse benchmark gremlin %s: %v", benchmarkQueries[n].message, err) b.Fatalf("Failed to parse benchmark gremlin %s: %v", benchmarkQueries[n].message, err)

View file

@ -17,29 +17,112 @@ package config
import ( import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt"
"os" "os"
"strconv"
"time"
"github.com/barakmich/glog" "github.com/barakmich/glog"
) )
type Config struct { type Config struct {
DatabaseType string
DatabasePath string
DatabaseOptions map[string]interface{}
ListenHost string
ListenPort string
ReadOnly bool
Timeout time.Duration
LoadSize int
}
type config struct {
DatabaseType string `json:"database"` DatabaseType string `json:"database"`
DatabasePath string `json:"db_path"` DatabasePath string `json:"db_path"`
DatabaseOptions map[string]interface{} `json:"db_options"` DatabaseOptions map[string]interface{} `json:"db_options"`
ListenHost string `json:"listen_host"` ListenHost string `json:"listen_host"`
ListenPort string `json:"listen_port"` ListenPort string `json:"listen_port"`
ReadOnly bool `json:"read_only"` ReadOnly bool `json:"read_only"`
GremlinTimeout int `json:"gremlin_timeout"` Timeout duration `json:"timeout"`
LoadSize int `json:"load_size"` LoadSize int `json:"load_size"`
} }
var databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.") func (c *Config) UnmarshalJSON(data []byte) error {
var databaseBackend = flag.String("db", "memstore", "Database Backend.") var t config
var host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).") err := json.Unmarshal(data, &t)
var loadSize = flag.Int("load_size", 10000, "Size of triplesets to load") if err != nil {
var port = flag.String("port", "64210", "Port to listen on.") return err
var readOnly = flag.Bool("read_only", false, "Disable writing via HTTP.") }
var gremlinTimeout = flag.Int("gremlin_timeout", 30, "Number of seconds until an individual query times out.") *c = Config{
DatabaseType: t.DatabaseType,
DatabasePath: t.DatabasePath,
DatabaseOptions: t.DatabaseOptions,
ListenHost: t.ListenHost,
ListenPort: t.ListenPort,
ReadOnly: t.ReadOnly,
Timeout: time.Duration(t.Timeout),
LoadSize: t.LoadSize,
}
return nil
}
func (c *Config) MarshalJSON() ([]byte, error) {
return json.Marshal(config{
DatabaseType: c.DatabaseType,
DatabasePath: c.DatabasePath,
DatabaseOptions: c.DatabaseOptions,
ListenHost: c.ListenHost,
ListenPort: c.ListenPort,
ReadOnly: c.ReadOnly,
Timeout: duration(c.Timeout),
LoadSize: c.LoadSize,
})
}
// duration is a time.Duration that satisfies the
// json.UnMarshaler and json.Marshaler interfaces.
type duration time.Duration
// UnmarshalJSON unmarshals a duration according to the following scheme:
// * If the element is absent the duration is zero.
// * If the element is parsable as a time.Duration, the parsed value is kept.
// * If the element is parsable as a number, that number of seconds is kept.
func (d *duration) UnmarshalJSON(data []byte) error {
if len(data) == 0 {
*d = 0
return nil
}
text := string(data)
t, err := time.ParseDuration(text)
if err == nil {
*d = duration(t)
return nil
}
i, err := strconv.ParseInt(text, 10, 64)
if err == nil {
*d = duration(time.Duration(i) * time.Second)
return nil
}
// This hack is to get around strconv.ParseFloat
// not handling e-notation for integers.
f, err := strconv.ParseFloat(text, 64)
*d = duration(time.Duration(f) * time.Second)
return err
}
func (d *duration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("%q", *d)), nil
}
var (
databasePath = flag.String("dbpath", "/tmp/testdb", "Path to the database.")
databaseBackend = flag.String("db", "memstore", "Database Backend.")
host = flag.String("host", "0.0.0.0", "Host to listen on (defaults to all).")
loadSize = flag.Int("load_size", 10000, "Size of triplesets to load")
port = flag.String("port", "64210", "Port to listen on.")
readOnly = flag.Bool("read_only", false, "Disable writing via HTTP.")
timeout = flag.Duration("timeout", 30*time.Second, "Elapsed time until an individual query times out.")
)
func ParseConfigFromFile(filename string) *Config { func ParseConfigFromFile(filename string) *Config {
config := &Config{} config := &Config{}
@ -100,8 +183,8 @@ func ParseConfigFromFlagsAndFile(fileFlag string) *Config {
config.ListenPort = *port config.ListenPort = *port
} }
if config.GremlinTimeout == 0 { if config.Timeout == 0 {
config.GremlinTimeout = *gremlinTimeout config.Timeout = *timeout
} }
if config.LoadSize == 0 { if config.LoadSize == 0 {

View file

@ -72,7 +72,7 @@ func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) error
case "gremlin": case "gremlin":
fallthrough fallthrough
default: default:
ses = gremlin.NewSession(ts, cfg.GremlinTimeout, true) ses = gremlin.NewSession(ts, cfg.Timeout, true)
} }
buf := bufio.NewReader(os.Stdin) buf := bufio.NewReader(os.Stdin)
var line []byte var line []byte

View file

@ -72,12 +72,12 @@ All command line flags take precedence over the configuration file.
## Language Options ## Language Options
#### **`gremlin_timeout`** #### **`timeout`**
* Type: Integer * Type: Integer or String
* Default: 30 * Default: 30
The value in seconds of the maximum length of time the Javascript runtime should run until cancelling the query and returning a 408 Timeout. A negative value means no limit. The maximum length of time the Javascript runtime should run until cancelling the query and returning a 408 Timeout. When timeout is an integer is is interpretted as seconds, when it is a string it is [parsed](http://golang.org/pkg/time/#ParseDuration) as a Go time.Duration. A negative duration means no limit.
## Per-Database Options ## Per-Database Options

View file

@ -71,7 +71,7 @@ func (api *Api) ServeV1Query(w http.ResponseWriter, r *http.Request, params http
var ses query.HttpSession var ses query.HttpSession
switch params.ByName("query_lang") { switch params.ByName("query_lang") {
case "gremlin": case "gremlin":
ses = gremlin.NewSession(api.ts, api.config.GremlinTimeout, false) ses = gremlin.NewSession(api.ts, api.config.Timeout, false)
case "mql": case "mql":
ses = mql.NewSession(api.ts) ses = mql.NewSession(api.ts)
default: default:
@ -119,7 +119,7 @@ func (api *Api) ServeV1Shape(w http.ResponseWriter, r *http.Request, params http
var ses query.HttpSession var ses query.HttpSession
switch params.ByName("query_lang") { switch params.ByName("query_lang") {
case "gremlin": case "gremlin":
ses = gremlin.NewSession(api.ts, api.config.GremlinTimeout, false) ses = gremlin.NewSession(api.ts, api.config.Timeout, false)
case "mql": case "mql":
ses = mql.NewSession(api.ts) ses = mql.NewSession(api.ts)
default: default:

View file

@ -84,7 +84,7 @@ func setupGremlin(env *otto.Otto, ses *Session) {
graph.Set("Emit", func(call otto.FunctionCall) otto.Value { graph.Set("Emit", func(call otto.FunctionCall) otto.Value {
value := call.Argument(0) value := call.Argument(0)
if value.IsDefined() { if value.IsDefined() {
ses.SendResult(&GremlinResult{metaresult: false, err: "", val: &value, actualResults: nil}) ses.SendResult(&Result{val: &value})
} }
return otto.NullValue() return otto.NullValue()
}) })

View file

@ -148,8 +148,10 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string
count := 0 count := 0
it, _ = it.Optimize() it, _ = it.Optimize()
for { for {
if ses.doHalt { select {
case <-ses.kill:
return nil return nil
default:
} }
_, ok := graph.Next(it) _, ok := graph.Next(it)
if !ok { if !ok {
@ -163,8 +165,10 @@ func runIteratorToArray(it graph.Iterator, ses *Session, limit int) []map[string
break break
} }
for it.NextResult() == true { for it.NextResult() == true {
if ses.doHalt { select {
case <-ses.kill:
return nil return nil
default:
} }
tags := make(map[string]graph.Value) tags := make(map[string]graph.Value)
it.TagResults(tags) it.TagResults(tags)
@ -184,8 +188,10 @@ func runIteratorToArrayNoTags(it graph.Iterator, ses *Session, limit int) []stri
count := 0 count := 0
it, _ = it.Optimize() it, _ = it.Optimize()
for { for {
if ses.doHalt { select {
case <-ses.kill:
return nil return nil
default:
} }
val, ok := graph.Next(it) val, ok := graph.Next(it)
if !ok { if !ok {
@ -205,8 +211,10 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu
count := 0 count := 0
it, _ = it.Optimize() it, _ = it.Optimize()
for { for {
if ses.doHalt { select {
case <-ses.kill:
return return
default:
} }
_, ok := graph.Next(it) _, ok := graph.Next(it)
if !ok { if !ok {
@ -221,8 +229,10 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu
break break
} }
for it.NextResult() == true { for it.NextResult() == true {
if ses.doHalt { select {
case <-ses.kill:
return return
default:
} }
tags := make(map[string]graph.Value) tags := make(map[string]graph.Value)
it.TagResults(tags) it.TagResults(tags)
@ -238,16 +248,17 @@ func runIteratorWithCallback(it graph.Iterator, ses *Session, callback otto.Valu
} }
func runIteratorOnSession(it graph.Iterator, ses *Session) { func runIteratorOnSession(it graph.Iterator, ses *Session) {
if ses.lookingForQueryShape { if ses.wantShape {
iterator.OutputQueryShapeForIterator(it, ses.ts, ses.queryShape) iterator.OutputQueryShapeForIterator(it, ses.ts, ses.shape)
return return
} }
it, _ = it.Optimize() it, _ = it.Optimize()
glog.V(2).Infoln(it.DebugString(0)) glog.V(2).Infoln(it.DebugString(0))
for { for {
// TODO(barakmich): Better halting. select {
if ses.doHalt { case <-ses.kill:
return return
default:
} }
_, ok := graph.Next(it) _, ok := graph.Next(it)
if !ok { if !ok {
@ -255,18 +266,18 @@ func runIteratorOnSession(it graph.Iterator, ses *Session) {
} }
tags := make(map[string]graph.Value) tags := make(map[string]graph.Value)
it.TagResults(tags) it.TagResults(tags)
cont := ses.SendResult(&GremlinResult{metaresult: false, err: "", val: nil, actualResults: &tags}) if !ses.SendResult(&Result{actualResults: &tags}) {
if !cont {
break break
} }
for it.NextResult() == true { for it.NextResult() == true {
if ses.doHalt { select {
case <-ses.kill:
return return
default:
} }
tags := make(map[string]graph.Value) tags := make(map[string]graph.Value)
it.TagResults(tags) it.TagResults(tags)
cont := ses.SendResult(&GremlinResult{metaresult: false, err: "", val: nil, actualResults: &tags}) if !ses.SendResult(&Result{actualResults: &tags}) {
if !cont {
break break
} }
} }

View file

@ -20,8 +20,9 @@ import (
"testing" "testing"
"github.com/google/cayley/graph" "github.com/google/cayley/graph"
_ "github.com/google/cayley/graph/memstore"
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
_ "github.com/google/cayley/graph/memstore"
) )
// This is a simple test graph. // This is a simple test graph.
@ -252,7 +253,7 @@ func runQueryGetTag(g []*quad.Quad, query string, tag string) []string {
var results []string var results []string
for res := range c { for res := range c {
data := res.(*GremlinResult) data := res.(*Result)
if data.val == nil { if data.val == nil {
val := (*data.actualResults)[tag] val := (*data.actualResults)[tag]
if val != nil { if val != nil {

View file

@ -18,6 +18,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"sort" "sort"
"sync"
"time" "time"
"github.com/robertkrimen/otto" "github.com/robertkrimen/otto"
@ -26,45 +27,42 @@ import (
"github.com/google/cayley/query" "github.com/google/cayley/query"
) )
var ErrKillTimeout = errors.New("query timed out")
type Session struct { type Session struct {
ts graph.TripleStore ts graph.TripleStore
currentChannel chan interface{} results chan interface{}
env *otto.Otto env *otto.Otto
debug bool envLock sync.Mutex
limit int debug bool
count int limit int
dataOutput []interface{} count int
lookingForQueryShape bool dataOutput []interface{}
queryShape map[string]interface{} wantShape bool
err error shape map[string]interface{}
script *otto.Script err error
doHalt bool script *otto.Script
timeoutSec time.Duration kill chan struct{}
emptyEnv *otto.Otto timeout time.Duration
emptyEnv *otto.Otto
} }
func NewSession(inputTripleStore graph.TripleStore, timeoutSec int, persist bool) *Session { func NewSession(ts graph.TripleStore, timeout time.Duration, persist bool) *Session {
var g Session g := Session{
g.ts = inputTripleStore ts: ts,
limit: -1,
timeout: timeout,
}
g.env = BuildEnviron(&g) g.env = BuildEnviron(&g)
g.limit = -1
g.count = 0
g.lookingForQueryShape = false
if persist { if persist {
g.emptyEnv = g.env g.emptyEnv = g.env
} }
if timeoutSec < 0 {
g.timeoutSec = time.Duration(-1)
} else {
g.timeoutSec = time.Duration(timeoutSec)
}
g.ClearJson()
return &g return &g
} }
type GremlinResult struct { type Result struct {
metaresult bool metaresult bool
err string err error
val *otto.Value val *otto.Value
actualResults *map[string]graph.Value actualResults *map[string]graph.Value
} }
@ -73,13 +71,13 @@ func (s *Session) ToggleDebug() {
s.debug = !s.debug s.debug = !s.debug
} }
func (s *Session) GetQuery(input string, output_struct chan map[string]interface{}) { func (s *Session) GetQuery(input string, out chan map[string]interface{}) {
defer close(output_struct) defer close(out)
s.queryShape = make(map[string]interface{}) s.shape = make(map[string]interface{})
s.lookingForQueryShape = true s.wantShape = true
s.env.Run(input) s.env.Run(input)
output_struct <- s.queryShape out <- s.shape
s.queryShape = nil s.shape = nil
} }
func (s *Session) InputParses(input string) (query.ParseResult, error) { func (s *Session) InputParses(input string) (query.ParseResult, error) {
@ -91,15 +89,17 @@ func (s *Session) InputParses(input string) (query.ParseResult, error) {
return query.Parsed, nil return query.Parsed, nil
} }
func (s *Session) SendResult(result *GremlinResult) bool { func (s *Session) SendResult(r *Result) bool {
if s.limit >= 0 && s.limit == s.count { if s.limit >= 0 && s.limit == s.count {
return false return false
} }
if s.doHalt { select {
case <-s.kill:
return false return false
default:
} }
if s.currentChannel != nil { if s.results != nil {
s.currentChannel <- result s.results <- r
s.count++ s.count++
if s.limit >= 0 && s.limit == s.count { if s.limit >= 0 && s.limit == s.count {
return false return false
@ -110,42 +110,46 @@ func (s *Session) SendResult(result *GremlinResult) bool {
return false return false
} }
var halt = errors.New("Query Timeout")
func (s *Session) runUnsafe(input interface{}) (otto.Value, error) { func (s *Session) runUnsafe(input interface{}) (otto.Value, error) {
s.doHalt = false s.kill = make(chan struct{})
defer func() { defer func() {
if caught := recover(); caught != nil { if r := recover(); r != nil {
if caught == halt { if r == ErrKillTimeout {
s.err = halt s.err = ErrKillTimeout
return return
} }
panic(caught) // Something else happened, repanic! panic(r)
} }
}() }()
s.env.Interrupt = make(chan func(), 1) // The buffer prevents blocking // Use buffered chan to prevent blocking.
s.env.Interrupt = make(chan func(), 1)
if s.timeoutSec != -1 { if s.timeout >= 0 {
go func() { go func() {
time.Sleep(s.timeoutSec * time.Second) // Stop after two seconds time.Sleep(s.timeout)
s.doHalt = true close(s.kill)
s.envLock.Lock()
defer s.envLock.Unlock()
if s.env != nil { if s.env != nil {
s.env.Interrupt <- func() { s.env.Interrupt <- func() {
panic(halt) panic(ErrKillTimeout)
} }
s.env = s.emptyEnv s.env = s.emptyEnv
} }
}() }()
} }
return s.env.Run(input) // Here be dragons (risky code) s.envLock.Lock()
env := s.env
s.envLock.Unlock()
return env.Run(input)
} }
func (s *Session) ExecInput(input string, out chan interface{}, limit int) { func (s *Session) ExecInput(input string, out chan interface{}, limit int) {
defer close(out) defer close(out)
s.err = nil s.err = nil
s.currentChannel = out s.results = out
var err error var err error
var value otto.Value var value otto.Value
if s.script == nil { if s.script == nil {
@ -153,28 +157,23 @@ func (s *Session) ExecInput(input string, out chan interface{}, limit int) {
} else { } else {
value, err = s.runUnsafe(s.script) value, err = s.runUnsafe(s.script)
} }
if err != nil { out <- &Result{
out <- &GremlinResult{metaresult: true, metaresult: true,
err: err.Error(), err: err,
val: &value, val: &value,
actualResults: nil}
} else {
out <- &GremlinResult{metaresult: true,
err: "",
val: &value,
actualResults: nil}
} }
s.currentChannel = nil s.results = nil
s.script = nil s.script = nil
s.envLock.Lock()
s.env = s.emptyEnv s.env = s.emptyEnv
return s.envLock.Unlock()
} }
func (s *Session) ToText(result interface{}) string { func (s *Session) ToText(result interface{}) string {
data := result.(*GremlinResult) data := result.(*Result)
if data.metaresult { if data.metaresult {
if data.err != "" { if data.err != nil {
return fmt.Sprintln("Error: ", data.err) return fmt.Sprintf("Error: %v\n", data.err)
} }
if data.val != nil { if data.val != nil {
s, _ := data.val.Export() s, _ := data.val.Export()
@ -221,8 +220,8 @@ func (s *Session) ToText(result interface{}) string {
} }
// Web stuff // Web stuff
func (ses *Session) BuildJson(result interface{}) { func (s *Session) BuildJson(result interface{}) {
data := result.(*GremlinResult) data := result.(*Result)
if !data.metaresult { if !data.metaresult {
if data.val == nil { if data.val == nil {
obj := make(map[string]string) obj := make(map[string]string)
@ -235,33 +234,34 @@ func (ses *Session) BuildJson(result interface{}) {
} }
sort.Strings(tagKeys) sort.Strings(tagKeys)
for _, k := range tagKeys { for _, k := range tagKeys {
obj[k] = ses.ts.NameOf((*tags)[k]) obj[k] = s.ts.NameOf((*tags)[k])
} }
ses.dataOutput = append(ses.dataOutput, obj) s.dataOutput = append(s.dataOutput, obj)
} else { } else {
if data.val.IsObject() { if data.val.IsObject() {
export, _ := data.val.Export() export, _ := data.val.Export()
ses.dataOutput = append(ses.dataOutput, export) s.dataOutput = append(s.dataOutput, export)
} else { } else {
strVersion, _ := data.val.ToString() strVersion, _ := data.val.ToString()
ses.dataOutput = append(ses.dataOutput, strVersion) s.dataOutput = append(s.dataOutput, strVersion)
} }
} }
} }
} }
func (ses *Session) GetJson() ([]interface{}, error) { func (s *Session) GetJson() ([]interface{}, error) {
defer ses.ClearJson() defer s.ClearJson()
if ses.err != nil { if s.err != nil {
return nil, ses.err return nil, s.err
} }
if ses.doHalt { select {
return nil, halt case <-s.kill:
return nil, ErrKillTimeout
default:
return s.dataOutput, nil
} }
return ses.dataOutput, nil
} }
func (ses *Session) ClearJson() { func (s *Session) ClearJson() {
ses.dataOutput = nil s.dataOutput = nil
} }