Use error returns and interface type for parsing

Fixes issue #72

This change simplifies interactions with parsing N-Quads and makes
reading datasets more robust. Changes made while here also improve
performance:

benchmark           old ns/op     new ns/op     delta
BenchmarkParser     1058          667           -36.96%

We still use string concatenation which I'm not wildly happy about, but
I think this can be left for a later change.

Initial changes towards idiomatic error handling have been made. More
significant changes are needed, but these have subtle design implication
and need to be thought about more.

30kmoviesdata.nt.gz has been altered to properly escape double quotes.
This was done mechanically and with manual curation to pick up
straglers.
This commit is contained in:
kortschak 2014-07-22 19:55:18 +09:30
parent abdd649c82
commit 0e0e382d2b
11 changed files with 260 additions and 226 deletions

Binary file not shown.

View file

@ -15,14 +15,12 @@
package cayleyappengine
import (
"os"
"github.com/barakmich/glog"
"github.com/google/cayley/config"
"github.com/google/cayley/db"
"github.com/google/cayley/graph"
"github.com/google/cayley/http"
"github.com/google/cayley/nquads"
_ "github.com/google/cayley/graph/memstore"
)
@ -32,37 +30,6 @@ func init() {
cfg := config.ParseConfigFromFile("cayley_appengine.cfg")
ts, _ := graph.NewTripleStore("memstore", "", nil)
glog.Errorln(cfg)
LoadTriplesFromFileInto(ts, cfg.DatabasePath, cfg.LoadSize)
db.Load(ts, cfg, cfg.DatabasePath)
http.SetupRoutes(ts, cfg)
}
func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) {
f, err := os.Open(tripleFile)
if err != nil {
glog.Fatalln("Couldn't open file", tripleFile)
}
defer func() {
if err := f.Close(); err != nil {
glog.Fatalln(err)
}
}()
nquads.ReadNQuadsFromReader(c, f)
}
func LoadTriplesFromFileInto(ts graph.TripleStore, filename string, loadSize int) {
tChan := make(chan *graph.Triple)
go ReadTriplesFromFile(tChan, filename)
tripleblock := make([]*graph.Triple, loadSize)
i := 0
for t := range tChan {
tripleblock[i] = t
i++
if i == loadSize {
ts.AddTripleSet(tripleblock)
i = 0
}
}
ts.AddTripleSet(tripleblock[0:i])
}

View file

@ -58,37 +58,62 @@ func main() {
Usage()
os.Exit(1)
}
cmd := os.Args[1]
newargs := make([]string, 0)
newargs = append(newargs, os.Args[0])
newargs = append(newargs, os.Args[2:]...)
os.Args = newargs
flag.Parse()
var ts graph.TripleStore
cfg := config.ParseConfigFromFlagsAndFile(*configFile)
if os.Getenv("GOMAXPROCS") == "" {
runtime.GOMAXPROCS(runtime.NumCPU())
glog.Infoln("Setting GOMAXPROCS to", runtime.NumCPU())
} else {
glog.Infoln("GOMAXPROCS currently", os.Getenv("GOMAXPROCS"), " -- not adjusting")
}
var (
ts graph.TripleStore
err error
)
switch cmd {
case "init":
db.Init(cfg, *tripleFile)
err = db.Init(cfg, *tripleFile)
case "load":
ts = db.Open(cfg)
db.Load(ts, cfg, *tripleFile)
ts, err = db.Open(cfg)
if err != nil {
break
}
err = db.Load(ts, cfg, *tripleFile)
if err != nil {
break
}
ts.Close()
case "repl":
ts = db.Open(cfg)
db.Repl(ts, *queryLanguage, cfg)
ts, err = db.Open(cfg)
if err != nil {
break
}
err = db.Repl(ts, *queryLanguage, cfg)
if err != nil {
break
}
ts.Close()
case "http":
ts = db.Open(cfg)
ts, err = db.Open(cfg)
if err != nil {
break
}
http.Serve(ts, cfg)
ts.Close()
default:
fmt.Println("No command", cmd)
flag.Usage()
}
if err != nil {
glog.Fatalln(err)
}
}

View file

@ -19,16 +19,21 @@ import (
"github.com/google/cayley/graph"
)
func Init(cfg *config.Config, triplePath string) bool {
func Init(cfg *config.Config, triplePath string) error {
err := graph.InitTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
if err != nil {
return false
return err
}
if triplePath != "" {
ts := Open(cfg)
Load(ts, cfg, triplePath)
ts, err := Open(cfg)
if err != nil {
return err
}
err = Load(ts, cfg, triplePath)
if err != nil {
return err
}
ts.Close()
}
return true
return err
}

View file

@ -15,58 +15,54 @@
package db
import (
"fmt"
"io"
"os"
"github.com/barakmich/glog"
"github.com/google/cayley/config"
"github.com/google/cayley/graph"
"github.com/google/cayley/nquads"
)
func Load(ts graph.TripleStore, cfg *config.Config, triplePath string) {
tChan := make(chan *graph.Triple)
go ReadTriplesFromFile(tChan, triplePath)
func Load(ts graph.TripleStore, cfg *config.Config, path string) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("could not open file %q: %v", path, err)
}
defer f.Close()
dec := nquads.NewDecoder(f)
bulker, canBulk := ts.(graph.BulkLoader)
if canBulk {
err := bulker.BulkLoad(tChan)
err = bulker.BulkLoad(dec)
if err == nil {
return
return nil
}
if err != graph.ErrCannotBulkLoad {
glog.Errorln("Error attempting to bulk load: ", err)
if err == graph.ErrCannotBulkLoad {
err = nil
}
}
LoadTriplesInto(tChan, ts, cfg.LoadSize)
}
func ReadTriplesFromFile(c chan *graph.Triple, tripleFile string) {
f, err := os.Open(tripleFile)
if err != nil {
glog.Fatalln("Couldn't open file", tripleFile)
return err
}
defer func() {
if err := f.Close(); err != nil {
glog.Fatalln(err)
block := make([]*graph.Triple, 0, cfg.LoadSize)
for {
t, err := dec.Unmarshal()
if err != nil {
if err == io.EOF {
break
}
}()
return err
}
block = append(block, t)
if len(block) == cap(block) {
ts.AddTripleSet(block)
block = block[:0]
}
}
ts.AddTripleSet(block)
nquads.ReadNQuadsFromReader(c, f)
}
func LoadTriplesInto(tChan chan *graph.Triple, ts graph.TripleStore, loadSize int) {
tripleblock := make([]*graph.Triple, loadSize)
i := 0
for t := range tChan {
tripleblock[i] = t
i++
if i == loadSize {
ts.AddTripleSet(tripleblock)
i = 0
}
}
ts.AddTripleSet(tripleblock[0:i])
return nil
}

View file

@ -21,17 +21,20 @@ import (
"github.com/google/cayley/graph"
)
func Open(cfg *config.Config) graph.TripleStore {
func Open(cfg *config.Config) (graph.TripleStore, error) {
glog.Infof("Opening database \"%s\" at %s", cfg.DatabaseType, cfg.DatabasePath)
ts, err := graph.NewTripleStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
if err != nil {
glog.Fatalln(err.Error())
return nil, err
}
// Memstore is not persistent, so it MUST be loaded.
if cfg.DatabaseType == "memstore" {
Load(ts, cfg, cfg.DatabasePath)
err = Load(ts, cfg, cfg.DatabasePath)
if err != nil {
return nil, err
}
}
return ts
return ts, nil
}

View file

@ -16,10 +16,11 @@ package db
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/google/cayley/config"
@ -60,7 +61,7 @@ func Run(query string, ses graph.Session) {
}
}
func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) {
func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) error {
var ses graph.Session
switch queryLanguage {
case "sexp":
@ -72,72 +73,75 @@ func Repl(ts graph.TripleStore, queryLanguage string, cfg *config.Config) {
default:
ses = gremlin.NewSession(ts, cfg.GremlinTimeout, true)
}
inputBf := bufio.NewReader(os.Stdin)
line := ""
buf := bufio.NewReader(os.Stdin)
var line []byte
for {
if line == "" {
if len(line) == 0 {
fmt.Print("cayley> ")
} else {
fmt.Print("... ")
}
l, pre, err := inputBf.ReadLine()
l, prefix, err := buf.ReadLine()
if err == io.EOF {
if line != "" {
line = ""
if len(line) != 0 {
line = line[:0]
} else {
break
return nil
}
}
if err != nil {
line = ""
line = line[:0]
}
if pre {
panic("Line too long")
if prefix {
return errors.New("line too long")
}
line += string(l)
if line == "" {
line = append(line, l...)
if len(line) == 0 {
continue
}
if strings.HasPrefix(line, ":debug") {
if bytes.HasPrefix(line, []byte(":debug")) {
ses.ToggleDebug()
fmt.Println("Debug Toggled")
line = ""
line = line[:0]
continue
}
if strings.HasPrefix(line, ":a") {
if bytes.HasPrefix(line, []byte(":a")) {
var tripleStmt = line[3:]
triple := nquads.Parse(tripleStmt)
triple, err := nquads.Parse(string(tripleStmt))
if triple == nil {
fmt.Println("Not a valid triple.")
line = ""
if err != nil {
fmt.Printf("not a valid triple: %v\n", err)
}
line = line[:0]
continue
}
ts.AddTriple(triple)
line = ""
line = line[:0]
continue
}
if strings.HasPrefix(line, ":d") {
if bytes.HasPrefix(line, []byte(":d")) {
var tripleStmt = line[3:]
triple := nquads.Parse(tripleStmt)
triple, err := nquads.Parse(string(tripleStmt))
if triple == nil {
fmt.Println("Not a valid triple.")
line = ""
if err != nil {
fmt.Printf("not a valid triple: %v\n", err)
}
line = line[:0]
continue
}
ts.RemoveTriple(triple)
line = ""
line = line[:0]
continue
}
result, err := ses.InputParses(line)
result, err := ses.InputParses(string(line))
switch result {
case graph.Parsed:
Run(line, ses)
line = ""
Run(string(line), ses)
line = line[:0]
case graph.ParseFail:
fmt.Println("Error: ", err)
line = ""
line = line[:0]
case graph.ParseMore:
default:
}
}
}

View file

@ -122,10 +122,14 @@ func (d Options) StringKey(key string) (string, bool) {
var ErrCannotBulkLoad = errors.New("triplestore: cannot bulk load")
type BulkLoader interface {
// BulkLoad loads Triples from a channel in bulk to the TripleStore. It
// returns ErrCannotBulkLoad if bulk loading is not possible (i.e. if you
// cannot load in bulk to a non-empty database, and the db is non-empty)
BulkLoad(chan *Triple) error
// BulkLoad loads Triples from a TripleUnmarshaler in bulk to the TripleStore.
// It returns ErrCannotBulkLoad if bulk loading is not possible. For example if
// you cannot load in bulk to a non-empty database, and the db is non-empty.
BulkLoad(TripleUnmarshaler) error
}
type TripleUnmarshaler interface {
Unmarshal() (*Triple, error)
}
type NewStoreFunc func(string, Options) (TripleStore, error)

View file

@ -17,6 +17,7 @@ package http
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
@ -77,22 +78,32 @@ func (api *Api) ServeV1WriteNQuad(w http.ResponseWriter, r *http.Request, params
blockSize = int64(api.config.LoadSize)
}
tChan := make(chan *graph.Triple)
go nquads.ReadNQuadsFromReader(tChan, formFile)
tripleblock := make([]*graph.Triple, blockSize)
nTriples := 0
i := int64(0)
for t := range tChan {
tripleblock[i] = t
i++
nTriples++
if i == blockSize {
api.ts.AddTripleSet(tripleblock)
i = 0
dec := nquads.NewDecoder(formFile)
var (
n int
block = make([]*graph.Triple, 0, blockSize)
)
for {
t, err := dec.Unmarshal()
if err != nil {
if err == io.EOF {
break
}
panic("what can do this here?") // FIXME(kortschak)
}
block = append(block, t)
n++
if len(block) == cap(block) {
api.ts.AddTripleSet(block)
block = block[:0]
}
}
api.ts.AddTripleSet(tripleblock[0:i])
fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", nTriples)
api.ts.AddTripleSet(block)
fmt.Fprintf(w, "{\"result\": \"Successfully wrote %d triples.\"}", n)
return 200
}

View file

@ -16,109 +16,112 @@ package nquads
import (
"bufio"
"errors"
"fmt"
"io"
"strings"
"github.com/barakmich/glog"
"github.com/google/cayley/graph"
)
func isWhitespace(s uint8) bool {
return (s == '\t' || s == '\r' || s == ' ')
}
func Parse(str string) *graph.Triple {
var (
ErrAbsentSubject = errors.New("absent subject")
ErrAbsentPredicate = errors.New("absent predicate")
ErrAbsentObject = errors.New("absent object")
ErrUnterminated = errors.New("unterminated quad")
)
func Parse(str string) (*graph.Triple, error) {
// Skip leading whitespace.
str = skipWhitespace(str)
str = trimSpace(str)
// Check for a comment
if str != "" && str[0] == '#' {
return nil
return nil, nil
}
sub, remainder := getTripleComponent(str)
if sub == nil {
return nil
if sub == "" {
return nil, ErrAbsentSubject
}
str = skipWhitespace(remainder)
str = trimSpace(remainder)
pred, remainder := getTripleComponent(str)
if pred == nil {
return nil
if pred == "" {
return nil, ErrAbsentPredicate
}
str = skipWhitespace(remainder)
str = trimSpace(remainder)
obj, remainder := getTripleComponent(str)
if obj == nil {
return nil
if obj == "" {
return nil, ErrAbsentObject
}
str = skipWhitespace(remainder)
prov_ptr, remainder := getTripleComponent(str)
var prov string
if prov_ptr == nil {
prov = ""
} else {
prov = *prov_ptr
}
str = skipWhitespace(remainder)
str = trimSpace(remainder)
prov, remainder := getTripleComponent(str)
str = trimSpace(remainder)
if str != "" && str[0] == '.' {
return &graph.Triple{*sub, *pred, *obj, prov}
return &graph.Triple{sub, pred, obj, prov}, nil
}
return nil
return nil, ErrUnterminated
}
func skipWhitespace(str string) string {
func isSpace(s uint8) bool {
return s == ' ' || s == '\t' || s == '\r'
}
func trimSpace(str string) string {
i := 0
for i < len(str) && isWhitespace(str[i]) {
for i < len(str) && isSpace(str[i]) {
i += 1
}
return str[i:]
}
func getTripleComponent(str string) (*string, string) {
func getTripleComponent(str string) (head, tail string) {
if len(str) == 0 {
return nil, str
return "", str
}
if str[0] == '<' {
return getUriPart(str[1:])
} else if str[0] == '"' {
return getQuotedPart(str[1:])
} else if str[0] == '.' {
return nil, str
return "", str
} else {
// Technically not part of the spec. But we do it anyway for convenience.
return getUnquotedPart(str)
}
}
func getUriPart(str string) (*string, string) {
func getUriPart(str string) (head, tail string) {
i := 0
for i < len(str) && str[i] != '>' {
i += 1
}
if i == len(str) {
return nil, str
return "", str
}
part := str[0:i]
return &part, str[i+1:]
head = str[0:i]
return head, str[i+1:]
}
func getQuotedPart(str string) (*string, string) {
i := 0
start := 0
out := ""
func getQuotedPart(str string) (head, tail string) {
var (
i int
start int
)
for i < len(str) && str[i] != '"' {
if str[i] == '\\' {
out += str[start:i]
head += str[start:i]
switch str[i+1] {
case '\\':
out += "\\"
head += "\\"
case 'r':
out += "\r"
head += "\r"
case 'n':
out += "\n"
head += "\n"
case 't':
out += "\t"
head += "\t"
case '"':
out += "\""
head += "\""
default:
return nil, str
return "", str
}
i += 2
start = i
@ -127,70 +130,74 @@ func getQuotedPart(str string) (*string, string) {
i += 1
}
if i == len(str) {
return nil, str
return "", str
}
out += str[start:i]
head += str[start:i]
i += 1
var remainder string
if strings.HasPrefix(str[i:], "^^<") {
switch {
case strings.HasPrefix(str[i:], "^^<"):
// Ignore type, for now
_, remainder = getUriPart(str[i+3:])
} else if strings.HasPrefix(str[i:], "@") {
_, remainder = getUnquotedPart(str[i+1:])
} else {
remainder = str[i:]
_, tail = getUriPart(str[i+3:])
case str[i] == '@':
_, tail = getUnquotedPart(str[i+1:])
default:
tail = str[i:]
}
return &out, remainder
return head, tail
}
func getUnquotedPart(str string) (*string, string) {
i := 0
initStr := str
out := ""
start := 0
for i < len(str) && !isWhitespace(str[i]) {
func getUnquotedPart(str string) (head, tail string) {
var (
i int
initStr = str
start int
)
for i < len(str) && !isSpace(str[i]) {
if str[i] == '"' {
part, remainder := getQuotedPart(str[i+1:])
if part == nil {
if part == "" {
return part, initStr
}
out += str[start:i]
head += str[start:i]
str = remainder
i = 0
start = 0
out += *part
head += part
}
i += 1
}
out += str[start:i]
return &out, str[i:]
head += str[start:i]
return head, str[i:]
}
func ReadNQuadsFromReader(c chan *graph.Triple, reader io.Reader) {
bf := bufio.NewReader(reader)
type Decoder struct {
r *bufio.Reader
line []byte
}
nTriples := 0
line := ""
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: bufio.NewReader(r)}
}
func (dec *Decoder) Unmarshal() (*graph.Triple, error) {
dec.line = dec.line[:0]
for {
l, pre, err := bf.ReadLine()
if err == io.EOF {
l, pre, err := dec.r.ReadLine()
if err != nil {
return nil, err
}
dec.line = append(dec.line, l...)
if !pre {
break
}
}
triple, err := Parse(string(dec.line))
if err != nil {
glog.Fatalln("Something bad happened while reading file " + err.Error())
return nil, fmt.Errorf("failed to parse %q: %v", dec.line, err)
}
line += string(l)
if pre {
continue
if triple == nil {
return dec.Unmarshal()
}
triple := Parse(line)
line = ""
if triple != nil {
nTriples++
c <- triple
}
}
glog.Infoln("Read", nTriples, "triples")
close(c)
return triple, nil
}

View file

@ -25,17 +25,26 @@ var testNTriples = []struct {
message string
input string
expect *graph.Triple
err error
}{
// NTriple tests.
{
message: "not parse invalid triples",
input: "invalid",
expect: nil,
err: ErrAbsentPredicate,
},
{
message: "invalid internal quote",
input: `":103032" "/film/performance/character" "Walter "Teacher" Cole" .`,
expect: nil,
err: ErrUnterminated,
},
{
message: "not parse comments",
input: "# nominally valid triple .",
expect: nil,
err: nil,
},
{
message: "parse simple triples",
@ -110,7 +119,10 @@ var testNTriples = []struct {
func TestParse(t *testing.T) {
for _, test := range testNTriples {
got := Parse(test.input)
got, err := Parse(test.input)
if err != test.err {
t.Errorf("Unexpected error when %s: got:%v expect:%v", test.message, err, test.err)
}
if !reflect.DeepEqual(got, test.expect) {
t.Errorf("Failed to %s, %q, got:%q expect:%q", test.message, test.input, got, test.expect)
}
@ -121,6 +133,6 @@ var result *graph.Triple
func BenchmarkParser(b *testing.B) {
for n := 0; n < b.N; n++ {
result = Parse("<http://example/s> <http://example/p> \"object of some real\\tlength\"@en . # comment")
result, _ = Parse("<http://example/s> <http://example/p> \"object of some real\\tlength\"@en . # comment")
}
}