query: wire up query's delete to syncbase.

Previous CLs imlemented delete in the query package.  This change
wires it up to syncbase.  The complicating factor is that, in
the non-batch case, we don't know if we should create a
snapshot (readonly) or a transaction (read/write) when
query/engine.Exec is called.  It is only when the GetTable
function is called (on the database interface) that syncbase
knows whether it should create a snapshot or a transaction.

MultiPart: 1/2
Change-Id: Idd5637806da3ec547ed59dbe7e3a546c582efb2a
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 9307094..3ae6878 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -251,22 +251,43 @@
 }
 
 func (d *databaseReq) Exec(ctx *context.T, call wire.DatabaseExecServerCall, schemaVersion int32, q string) error {
+	// RunInTransaction() cannot be used here because we may or may not be creating a
+	// transaction.  qe.Exec must be called and the statement must be parsed before
+	// we know if a snapshot or a transaction should be created.  To duplicate the
+	// semantics of RunInTransaction, we attempt the Exec up to 100 times and retry
+	// on ErrConcurrentTransaction.
+	maxAttempts := 100
+	attempt := 0
+	for {
+		err := d.execInternal(ctx, call, schemaVersion, q)
+		if attempt >= maxAttempts || verror.ErrorID(err) != store.ErrConcurrentTransaction.ID {
+			return err
+		}
+		attempt++
+	}
+}
+
+func (d *databaseReq) execInternal(ctx *context.T, call wire.DatabaseExecServerCall, schemaVersion int32, q string) error {
 	if !d.exists {
 		return verror.New(verror.ErrNoExist, ctx, d.name)
 	}
 	if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 		return err
 	}
-	impl := func(sntx store.SnapshotOrTransaction) error {
+	impl := func() error {
 		db := &queryDb{
 			ctx:  ctx,
 			call: call,
 			req:  d,
-			sntx: sntx,
+			sntx: nil, // Filled in later with existing or created sn/tx.
+			tx:   nil, // Only filled in if new tx created.
 		}
 		headers, rs, err := engine.Create(db).Exec(q)
 		if err != nil {
-			return err
+			return execCommitOrAbort(db, err)
+		}
+		if rs.Err() != nil {
+			return execCommitOrAbort(db, err)
 		}
 		sender := call.SendStream()
 		// Push the headers first -- the client will retrieve them and return
@@ -280,17 +301,30 @@
 			result := rs.Result()
 			if err := sender.Send(result); err != nil {
 				rs.Cancel()
-				return err
+				return execCommitOrAbort(db, err)
 			}
 		}
-		return rs.Err()
+		return execCommitOrAbort(db, rs.Err())
 	}
-	if d.batchId != nil {
-		return impl(d.batchReader())
-	} else {
-		sntx := d.st.NewSnapshot()
-		defer sntx.Abort()
-		return impl(sntx)
+	return impl()
+}
+
+func execCommitOrAbort(db *queryDb, err error) error {
+	if db.req.batchId != nil {
+		return err // part of an enclosing sn/tx
+	}
+	if err != nil {
+		if db.sntx != nil {
+			db.sntx.Abort()
+		}
+		return err
+	} else { // err is nil
+		if db.tx != nil {
+			return db.tx.Commit()
+		} else if db.sntx != nil {
+			return db.sntx.Abort()
+		}
+		return nil
 	}
 }
 
@@ -444,6 +478,7 @@
 	call wire.DatabaseExecServerCall
 	req  *databaseReq
 	sntx store.SnapshotOrTransaction
+	tx   store.Transaction // If transaction, this will be same as sntx (else nil)
 }
 
 func (db *queryDb) GetContext() *context.T {
@@ -451,9 +486,11 @@
 }
 
 func (db *queryDb) GetTable(name string, writeAccessReq bool) (ds.Table, error) {
-	if writeAccessReq {
-		return nil, syncql.NewErrNotWritable(db.GetContext(), name)
-	}
+	// At this point, when the query package calls GetTable with the writeAccessReq
+	// arg, we know whether or not we need a [writable] transaction or a snapshot.
+	// If batchId is already set, there's nothing to do; but if not, the
+	// writeAccessReq arg dictates whether a snapshot or a transaction is should
+	// be created.
 	tDb := &tableDb{
 		qdb: db,
 		req: &tableReq{
@@ -461,6 +498,32 @@
 			d:    db.req,
 		},
 	}
+	if tDb.req.d.batchId != nil {
+		if writeAccessReq {
+			// We are in a batch (could be snapshot or transaction)
+			// and Write access is required.  Attempt to get a
+			// transaction from the request.
+			var err error
+			tDb.qdb.tx, err = tDb.qdb.req.batchTransaction()
+			if err != nil {
+				// We are in a snapshot batch, write access cannot be provided.
+				// Return NotWritable.
+				return nil, syncql.NewErrNotWritable(tDb.qdb.GetContext(), tDb.req.name)
+			}
+			tDb.qdb.sntx = tDb.qdb.tx
+		} else {
+			tDb.qdb.sntx = tDb.qdb.req.batchReader()
+		}
+	} else {
+		// Now that we know if write access is required, create a snapshot
+		// or transaction.
+		if !writeAccessReq {
+			tDb.qdb.sntx = tDb.qdb.req.st.NewSnapshot()
+		} else { // writeAccessReq
+			tDb.qdb.tx = tDb.qdb.req.st.NewTransaction()
+			tDb.qdb.sntx = tDb.qdb.tx
+		}
+	}
 	// Now that we have a table, we need to check permissions.
 	if err := util.GetWithAuth(db.ctx, db.call, db.sntx, tDb.req.stKey(), &TableData{}); err != nil {
 		return nil, err
@@ -481,7 +544,15 @@
 }
 
 func (t *tableDb) Delete(k string) (bool, error) {
-	return false, syncql.NewErrOperationNotSupported(t.qdb.ctx, "delete")
+	// Create a rowReq and call delete.  Permissions will be checked.
+	rowReq := &rowReq{
+		key: k,
+		t:   t.req,
+	}
+	if err := rowReq.delete(t.qdb.GetContext(), t.qdb.call, t.qdb.tx); err != nil {
+		return false, err
+	}
+	return true, nil
 }
 
 func (t *tableDb) Scan(indexRanges ...ds.IndexRanges) (ds.KeyValueStream, error) {