Added functionality so quadstore is generated per request (if needed) for the new appengine backend \n CR : nobody \n Tests run: unit tests

This commit is contained in:
= 2014-11-27 15:32:46 +01:00 committed by panamafrancis
parent 2c74cb1657
commit 35ccfe7677
8 changed files with 130 additions and 51 deletions

View file

@ -24,29 +24,31 @@ import (
// Config defines the behavior of cayley database instances. // Config defines the behavior of cayley database instances.
type Config struct { type Config struct {
DatabaseType string DatabaseType string
DatabasePath string DatabasePath string
DatabaseOptions map[string]interface{} DatabaseOptions map[string]interface{}
ReplicationType string ReplicationType string
ReplicationOptions map[string]interface{} ReplicationOptions map[string]interface{}
ListenHost string ListenHost string
ListenPort string ListenPort string
ReadOnly bool ReadOnly bool
Timeout time.Duration Timeout time.Duration
LoadSize int LoadSize int
RequiresHTTPRequestContext bool
} }
type config struct { type config struct {
DatabaseType string `json:"database"` DatabaseType string `json:"database"`
DatabasePath string `json:"db_path"` DatabasePath string `json:"db_path"`
DatabaseOptions map[string]interface{} `json:"db_options"` DatabaseOptions map[string]interface{} `json:"db_options"`
ReplicationType string `json:"replication"` ReplicationType string `json:"replication"`
ReplicationOptions map[string]interface{} `json:"replication_options"` ReplicationOptions map[string]interface{} `json:"replication_options"`
ListenHost string `json:"listen_host"` ListenHost string `json:"listen_host"`
ListenPort string `json:"listen_port"` ListenPort string `json:"listen_port"`
ReadOnly bool `json:"read_only"` ReadOnly bool `json:"read_only"`
Timeout duration `json:"timeout"` Timeout duration `json:"timeout"`
LoadSize int `json:"load_size"` LoadSize int `json:"load_size"`
RequiresHTTPRequestContext bool `json:"http_request_context"`
} }
func (c *Config) UnmarshalJSON(data []byte) error { func (c *Config) UnmarshalJSON(data []byte) error {
@ -56,16 +58,17 @@ func (c *Config) UnmarshalJSON(data []byte) error {
return err return err
} }
*c = Config{ *c = Config{
DatabaseType: t.DatabaseType, DatabaseType: t.DatabaseType,
DatabasePath: t.DatabasePath, DatabasePath: t.DatabasePath,
DatabaseOptions: t.DatabaseOptions, DatabaseOptions: t.DatabaseOptions,
ReplicationType: t.ReplicationType, ReplicationType: t.ReplicationType,
ReplicationOptions: t.ReplicationOptions, ReplicationOptions: t.ReplicationOptions,
ListenHost: t.ListenHost, ListenHost: t.ListenHost,
ListenPort: t.ListenPort, ListenPort: t.ListenPort,
ReadOnly: t.ReadOnly, ReadOnly: t.ReadOnly,
Timeout: time.Duration(t.Timeout), Timeout: time.Duration(t.Timeout),
LoadSize: t.LoadSize, LoadSize: t.LoadSize,
RequiresHTTPRequestContext: t.RequiresHTTPRequestContext,
} }
return nil return nil
} }

View file

@ -33,7 +33,7 @@ import (
) )
func init() { func init() {
graph.RegisterQuadStore("bolt", true, newQuadStore, createNewBolt) graph.RegisterQuadStore("bolt", true, newQuadStore, createNewBolt, nil)
} }
var ( var (
@ -44,6 +44,10 @@ var (
localFillPercent = 0.7 localFillPercent = 0.7
) )
const (
QuadStoreType = "bolt"
)
type Token struct { type Token struct {
bucket []byte bucket []byte
key []byte key []byte
@ -515,3 +519,7 @@ func compareTokens(a, b graph.Value) bool {
func (qs *QuadStore) FixedIterator() graph.FixedIterator { func (qs *QuadStore) FixedIterator() graph.FixedIterator {
return iterator.NewFixed(compareTokens) return iterator.NewFixed(compareTokens)
} }
func (qs *QuadStore) GetType() string {
return QuadStoreType
}

View file

@ -35,12 +35,13 @@ import (
) )
func init() { func init() {
graph.RegisterQuadStore("leveldb", true, newQuadStore, createNewLevelDB) graph.RegisterQuadStore(QuadStoreType, true, newQuadStore, createNewLevelDB, nil)
} }
const ( const (
DefaultCacheSize = 2 DefaultCacheSize = 2
DefaultWriteBufferSize = 20 DefaultWriteBufferSize = 20
QuadStoreType = "leveldb"
) )
var ( var (
@ -495,3 +496,7 @@ func compareBytes(a, b graph.Value) bool {
func (qs *QuadStore) FixedIterator() graph.FixedIterator { func (qs *QuadStore) FixedIterator() graph.FixedIterator {
return iterator.NewFixed(compareBytes) return iterator.NewFixed(compareBytes)
} }
func (qs *QuadStore) GetType() string {
return QuadStoreType
}

View file

@ -27,10 +27,12 @@ import (
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
) )
const QuadStoreType = "memstore"
func init() { func init() {
graph.RegisterQuadStore("memstore", false, func(string, graph.Options) (graph.QuadStore, error) { graph.RegisterQuadStore(QuadStoreType, false, func(string, graph.Options) (graph.QuadStore, error) {
return newQuadStore(), nil return newQuadStore(), nil
}, nil) }, nil, nil)
} }
type QuadDirectionIndex struct { type QuadDirectionIndex struct {
@ -270,3 +272,7 @@ func (qs *QuadStore) NodesAllIterator() graph.Iterator {
} }
func (qs *QuadStore) Close() {} func (qs *QuadStore) Close() {}
func (qs *QuadStore) GetType() string {
return QuadStoreType
}

View file

@ -30,11 +30,12 @@ import (
"github.com/google/cayley/quad" "github.com/google/cayley/quad"
) )
func init() {
graph.RegisterQuadStore("mongo", true, newQuadStore, createNewMongoGraph)
}
const DefaultDBName = "cayley" const DefaultDBName = "cayley"
const QuadStoreType = "mongo"
func init() {
graph.RegisterQuadStore(QuadStoreType, true, newQuadStore, createNewMongoGraph, nil)
}
var ( var (
hashPool = sync.Pool{ hashPool = sync.Pool{
@ -366,3 +367,7 @@ func (qs *QuadStore) QuadDirection(in graph.Value, d quad.Direction) graph.Value
} }
// TODO(barakmich): Rewrite bulk loader. For now, iterating around blocks is the way we'll go about it. // TODO(barakmich): Rewrite bulk loader. For now, iterating around blocks is the way we'll go about it.
func (qs *QuadStore) GetType() string {
return QuadStoreType
}

View file

@ -95,6 +95,10 @@ type QuadStore interface {
// qs.ValueOf(qs.Quad(id).Get(dir)) // qs.ValueOf(qs.Quad(id).Get(dir))
// //
QuadDirection(id Value, d quad.Direction) Value QuadDirection(id Value, d quad.Direction) Value
// Get the type of QuadStore
//TODO replace this using reflection
GetType() string
} }
type Options map[string]interface{} type Options map[string]interface{}
@ -146,23 +150,26 @@ type BulkLoader interface {
type NewStoreFunc func(string, Options) (QuadStore, error) type NewStoreFunc func(string, Options) (QuadStore, error)
type InitStoreFunc func(string, Options) error type InitStoreFunc func(string, Options) error
type NewStoreForRequestFunc func(QuadStore, Options) (QuadStore, error)
type register struct { type register struct {
newFunc NewStoreFunc newFunc NewStoreFunc
initFunc InitStoreFunc newForRequestFunc NewStoreForRequestFunc
isPersistent bool initFunc InitStoreFunc
isPersistent bool
} }
var storeRegistry = make(map[string]register) var storeRegistry = make(map[string]register)
func RegisterQuadStore(name string, persists bool, newFunc NewStoreFunc, initFunc InitStoreFunc) { func RegisterQuadStore(name string, persists bool, newFunc NewStoreFunc, initFunc InitStoreFunc, newForRequestFunc NewStoreForRequestFunc) {
if _, found := storeRegistry[name]; found { if _, found := storeRegistry[name]; found {
panic("already registered QuadStore " + name) panic("already registered QuadStore " + name)
} }
storeRegistry[name] = register{ storeRegistry[name] = register{
newFunc: newFunc, newFunc: newFunc,
initFunc: initFunc, initFunc: initFunc,
isPersistent: persists, newForRequestFunc: newForRequestFunc,
isPersistent: persists,
} }
} }
@ -182,6 +189,14 @@ func InitQuadStore(name, dbpath string, opts Options) error {
return errors.New("quadstore: name '" + name + "' is not registered") return errors.New("quadstore: name '" + name + "' is not registered")
} }
func NewQuadStoreForRequest(qs QuadStore, opts Options) (QuadStore, error) {
r, registered := storeRegistry[qs.GetType()]
if registered {
return r.newForRequestFunc(qs, opts)
}
return nil, errors.New("QuadStore does not support Per Request construction, check config")
}
func IsPersistent(name string) bool { func IsPersistent(name string) bool {
return storeRegistry[name].isPersistent return storeRegistry[name].isPersistent
} }

View file

@ -26,6 +26,7 @@ import (
"github.com/julienschmidt/httprouter" "github.com/julienschmidt/httprouter"
"github.com/google/cayley/config" "github.com/google/cayley/config"
"github.com/google/cayley/db"
"github.com/google/cayley/graph" "github.com/google/cayley/graph"
) )
@ -55,6 +56,10 @@ func findAssetsPath() string {
return "." return "."
} }
if hasAssets("..") {
return ".."
}
gopathPath := os.ExpandEnv("$GOPATH/src/github.com/google/cayley") gopathPath := os.ExpandEnv("$GOPATH/src/github.com/google/cayley")
if hasAssets(gopathPath) { if hasAssets(gopathPath) {
return gopathPath return gopathPath
@ -105,6 +110,25 @@ type API struct {
handle *graph.Handle handle *graph.Handle
} }
func (api *API) GetHandleForRequest(r *http.Request) (*graph.Handle, error) {
if !api.config.RequiresHTTPRequestContext {
return api.handle, nil
}
opts := make(graph.Options)
opts["HTTPRequest"] = r
qs, err := graph.NewQuadStoreForRequest(api.handle.QuadStore, opts)
if err != nil {
return nil, err
}
qw, err := db.OpenQuadWriter(qs, api.config)
if err != nil {
return nil, err
}
return &graph.Handle{QuadStore: qs, QuadWriter: qw}, nil
}
func (api *API) APIv1(r *httprouter.Router) { func (api *API) APIv1(r *httprouter.Router) {
r.POST("/api/v1/query/:query_lang", LogRequest(api.ServeV1Query)) r.POST("/api/v1/query/:query_lang", LogRequest(api.ServeV1Query))
r.POST("/api/v1/shape/:query_lang", LogRequest(api.ServeV1Shape)) r.POST("/api/v1/shape/:query_lang", LogRequest(api.ServeV1Shape))

View file

@ -55,7 +55,12 @@ func (api *API) ServeV1Write(w http.ResponseWriter, r *http.Request, _ httproute
if err != nil { if err != nil {
return jsonResponse(w, 400, err) return jsonResponse(w, 400, err)
} }
api.handle.QuadWriter.AddQuadSet(quads) h, err := api.GetHandleForRequest(r)
if err != nil {
return jsonResponse(w, 400, err)
}
h.QuadWriter.AddQuadSet(quads)
fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d quads.\"}", len(quads)) fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d quads.\"}", len(quads))
return 200 return 200
} }
@ -81,9 +86,13 @@ func (api *API) ServeV1WriteNQuad(w http.ResponseWriter, r *http.Request, params
// TODO(kortschak) Make this configurable from the web UI. // TODO(kortschak) Make this configurable from the web UI.
dec := cquads.NewDecoder(formFile) dec := cquads.NewDecoder(formFile)
var ( h, err := api.GetHandleForRequest(r)
n int if err != nil {
return jsonResponse(w, 400, err)
}
var (
n int
block = make([]quad.Quad, 0, blockSize) block = make([]quad.Quad, 0, blockSize)
) )
for { for {
@ -97,11 +106,11 @@ func (api *API) ServeV1WriteNQuad(w http.ResponseWriter, r *http.Request, params
block = append(block, t) block = append(block, t)
n++ n++
if len(block) == cap(block) { if len(block) == cap(block) {
api.handle.QuadWriter.AddQuadSet(block) h.QuadWriter.AddQuadSet(block)
block = block[:0] block = block[:0]
} }
} }
api.handle.QuadWriter.AddQuadSet(block) h.QuadWriter.AddQuadSet(block)
fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d quads.\"}", n) fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d quads.\"}", n)
@ -120,9 +129,13 @@ func (api *API) ServeV1Delete(w http.ResponseWriter, r *http.Request, params htt
if err != nil { if err != nil {
return jsonResponse(w, 400, err) return jsonResponse(w, 400, err)
} }
h, err := api.GetHandleForRequest(r)
if err != nil {
return jsonResponse(w, 400, err)
}
count := 0 count := 0
for _, q := range quads { for _, q := range quads {
api.handle.QuadWriter.RemoveQuad(q) h.QuadWriter.RemoveQuad(q)
count++ count++
} }
fmt.Fprintf(w, "{\"result\": \"Successfully deleted %d quads.\"}", count) fmt.Fprintf(w, "{\"result\": \"Successfully deleted %d quads.\"}", count)