query: wire up query's delete to syncbase.
Previous CLs imlemented delete in the query package. This change
wires it up to syncbase. The complicating factor is that, in
the non-batch case, we don't know if we should create a
snapshot (readonly) or a transaction (read/write) when
query/engine.Exec is called. It is only when the GetTable
function is called (on the database interface) that syncbase
knows whether it should create a snapshot or a transaction.
MultiPart: 1/2
Change-Id: Idd5637806da3ec547ed59dbe7e3a546c582efb2a
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 9307094..3ae6878 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -251,22 +251,43 @@
}
func (d *databaseReq) Exec(ctx *context.T, call wire.DatabaseExecServerCall, schemaVersion int32, q string) error {
+ // RunInTransaction() cannot be used here because we may or may not be creating a
+ // transaction. qe.Exec must be called and the statement must be parsed before
+ // we know if a snapshot or a transaction should be created. To duplicate the
+ // semantics of RunInTransaction, we attempt the Exec up to 100 times and retry
+ // on ErrConcurrentTransaction.
+ maxAttempts := 100
+ attempt := 0
+ for {
+ err := d.execInternal(ctx, call, schemaVersion, q)
+ if attempt >= maxAttempts || verror.ErrorID(err) != store.ErrConcurrentTransaction.ID {
+ return err
+ }
+ attempt++
+ }
+}
+
+func (d *databaseReq) execInternal(ctx *context.T, call wire.DatabaseExecServerCall, schemaVersion int32, q string) error {
if !d.exists {
return verror.New(verror.ErrNoExist, ctx, d.name)
}
if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- impl := func(sntx store.SnapshotOrTransaction) error {
+ impl := func() error {
db := &queryDb{
ctx: ctx,
call: call,
req: d,
- sntx: sntx,
+ sntx: nil, // Filled in later with existing or created sn/tx.
+ tx: nil, // Only filled in if new tx created.
}
headers, rs, err := engine.Create(db).Exec(q)
if err != nil {
- return err
+ return execCommitOrAbort(db, err)
+ }
+ if rs.Err() != nil {
+ return execCommitOrAbort(db, err)
}
sender := call.SendStream()
// Push the headers first -- the client will retrieve them and return
@@ -280,17 +301,30 @@
result := rs.Result()
if err := sender.Send(result); err != nil {
rs.Cancel()
- return err
+ return execCommitOrAbort(db, err)
}
}
- return rs.Err()
+ return execCommitOrAbort(db, rs.Err())
}
- if d.batchId != nil {
- return impl(d.batchReader())
- } else {
- sntx := d.st.NewSnapshot()
- defer sntx.Abort()
- return impl(sntx)
+ return impl()
+}
+
+func execCommitOrAbort(db *queryDb, err error) error {
+ if db.req.batchId != nil {
+ return err // part of an enclosing sn/tx
+ }
+ if err != nil {
+ if db.sntx != nil {
+ db.sntx.Abort()
+ }
+ return err
+ } else { // err is nil
+ if db.tx != nil {
+ return db.tx.Commit()
+ } else if db.sntx != nil {
+ return db.sntx.Abort()
+ }
+ return nil
}
}
@@ -444,6 +478,7 @@
call wire.DatabaseExecServerCall
req *databaseReq
sntx store.SnapshotOrTransaction
+ tx store.Transaction // If transaction, this will be same as sntx (else nil)
}
func (db *queryDb) GetContext() *context.T {
@@ -451,9 +486,11 @@
}
func (db *queryDb) GetTable(name string, writeAccessReq bool) (ds.Table, error) {
- if writeAccessReq {
- return nil, syncql.NewErrNotWritable(db.GetContext(), name)
- }
+ // At this point, when the query package calls GetTable with the writeAccessReq
+ // arg, we know whether or not we need a [writable] transaction or a snapshot.
+ // If batchId is already set, there's nothing to do; but if not, the
+ // writeAccessReq arg dictates whether a snapshot or a transaction is should
+ // be created.
tDb := &tableDb{
qdb: db,
req: &tableReq{
@@ -461,6 +498,32 @@
d: db.req,
},
}
+ if tDb.req.d.batchId != nil {
+ if writeAccessReq {
+ // We are in a batch (could be snapshot or transaction)
+ // and Write access is required. Attempt to get a
+ // transaction from the request.
+ var err error
+ tDb.qdb.tx, err = tDb.qdb.req.batchTransaction()
+ if err != nil {
+ // We are in a snapshot batch, write access cannot be provided.
+ // Return NotWritable.
+ return nil, syncql.NewErrNotWritable(tDb.qdb.GetContext(), tDb.req.name)
+ }
+ tDb.qdb.sntx = tDb.qdb.tx
+ } else {
+ tDb.qdb.sntx = tDb.qdb.req.batchReader()
+ }
+ } else {
+ // Now that we know if write access is required, create a snapshot
+ // or transaction.
+ if !writeAccessReq {
+ tDb.qdb.sntx = tDb.qdb.req.st.NewSnapshot()
+ } else { // writeAccessReq
+ tDb.qdb.tx = tDb.qdb.req.st.NewTransaction()
+ tDb.qdb.sntx = tDb.qdb.tx
+ }
+ }
// Now that we have a table, we need to check permissions.
if err := util.GetWithAuth(db.ctx, db.call, db.sntx, tDb.req.stKey(), &TableData{}); err != nil {
return nil, err
@@ -481,7 +544,15 @@
}
func (t *tableDb) Delete(k string) (bool, error) {
- return false, syncql.NewErrOperationNotSupported(t.qdb.ctx, "delete")
+ // Create a rowReq and call delete. Permissions will be checked.
+ rowReq := &rowReq{
+ key: k,
+ t: t.req,
+ }
+ if err := rowReq.delete(t.qdb.GetContext(), t.qdb.call, t.qdb.tx); err != nil {
+ return false, err
+ }
+ return true, nil
}
func (t *tableDb) Scan(indexRanges ...ds.IndexRanges) (ds.KeyValueStream, error) {