Respect IgnoreMissing, which SQL does silently. Fixes barakmich/psql #10
This commit is contained in:
parent
fc6f7b3ea7
commit
fb7e200551
2 changed files with 20 additions and 7 deletions
|
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/sha1"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"sync"
|
||||
|
|
@ -168,7 +169,7 @@ func (qs *QuadStore) copyFrom(tx *sql.Tx, in []graph.Delta) error {
|
|||
return stmt.Close()
|
||||
}
|
||||
|
||||
func (qs *QuadStore) buildTxPostgres(tx *sql.Tx, in []graph.Delta) error {
|
||||
func (qs *QuadStore) runTxPostgres(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error {
|
||||
allAdds := true
|
||||
for _, d := range in {
|
||||
if d.Action != graph.Add {
|
||||
|
|
@ -180,6 +181,7 @@ func (qs *QuadStore) buildTxPostgres(tx *sql.Tx, in []graph.Delta) error {
|
|||
}
|
||||
|
||||
insert, err := tx.Prepare(`INSERT INTO quads(subject, predicate, object, label, id, ts, subject_hash, predicate_hash, object_hash, label_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`)
|
||||
defer insert.Close()
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot prepare insert statement: %v", err)
|
||||
return err
|
||||
|
|
@ -204,10 +206,17 @@ func (qs *QuadStore) buildTxPostgres(tx *sql.Tx, in []graph.Delta) error {
|
|||
return err
|
||||
}
|
||||
case graph.Delete:
|
||||
_, err := tx.Exec(`DELETE FROM quads WHERE subject=$1 and predicate=$2 and object=$3 and label=$4;`,
|
||||
result, err := tx.Exec(`DELETE FROM quads WHERE subject=$1 and predicate=$2 and object=$3 and label=$4;`,
|
||||
d.Quad.Subject, d.Quad.Predicate, d.Quad.Object, d.Quad.Label)
|
||||
if err != nil {
|
||||
glog.Errorf("couldn't prepare DELETE statement: %v", err)
|
||||
glog.Errorf("couldn't exec DELETE statement: %v", err)
|
||||
}
|
||||
affected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
glog.Errorf("couldn't get DELETE RowsAffected: %v", err)
|
||||
}
|
||||
if affected != 1 && !opts.IgnoreMissing {
|
||||
return errors.New("deleting non-existent triple; rolling back")
|
||||
}
|
||||
default:
|
||||
panic("unknown action")
|
||||
|
|
@ -216,8 +225,8 @@ func (qs *QuadStore) buildTxPostgres(tx *sql.Tx, in []graph.Delta) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (qs *QuadStore) ApplyDeltas(in []graph.Delta, _ graph.IgnoreOpts) error {
|
||||
// TODO(barakmich): Support ignoreOpts? "ON CONFLICT IGNORE"
|
||||
func (qs *QuadStore) ApplyDeltas(in []graph.Delta, opts graph.IgnoreOpts) error {
|
||||
// TODO(barakmich): Support more ignoreOpts? "ON CONFLICT IGNORE"
|
||||
tx, err := qs.db.Begin()
|
||||
if err != nil {
|
||||
glog.Errorf("couldn't begin write transaction: %v", err)
|
||||
|
|
@ -225,8 +234,9 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, _ graph.IgnoreOpts) error {
|
|||
}
|
||||
switch qs.sqlFlavor {
|
||||
case "postgres":
|
||||
err = qs.buildTxPostgres(tx, in)
|
||||
err = qs.runTxPostgres(tx, in, opts)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
default:
|
||||
|
|
|
|||
|
|
@ -161,7 +161,10 @@ func Repl(h *graph.Handle, queryLanguage string, cfg *config.Config) error {
|
|||
fmt.Printf("Error: not a valid quad: %v\n", err)
|
||||
continue
|
||||
}
|
||||
h.QuadWriter.RemoveQuad(quad)
|
||||
err = h.QuadWriter.RemoveQuad(quad)
|
||||
if err != nil {
|
||||
fmt.Printf("error deleting: %v\n", err)
|
||||
}
|
||||
continue
|
||||
|
||||
case "exit":
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue