syncbase: various small improvements, needed for demo

- add retry loop to RunInBatch
- make RunInBatch and RunInTransaction mirror each other
- add txn conflict logging
- other minor tweaks, e.g. comment improvements

Change-Id: Ia81dc2599a25d78231ed7f285477be8a8c627272
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 5443660..a9e7ac9 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -208,6 +208,9 @@
 		delete(d.txs, *d.batchId)
 		d.mu.Unlock()
 	}
+	if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
+		return verror.New(wire.ErrConcurrentBatch, ctx, err)
+	}
 	return err
 }
 
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index f3f755c..4f03650 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -13,6 +13,7 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
 )
 
 // commitedTransaction is only used as an element of db.txEvents.
@@ -160,11 +161,13 @@
 func (tx *transaction) validateReadSet() bool {
 	for _, key := range tx.reads.Keys {
 		if tx.d.txTable.get(key) > tx.seq {
+			vlog.VI(3).Infof("key conflict: %q", key)
 			return false
 		}
 	}
 	for _, r := range tx.reads.Ranges {
 		if tx.d.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
+			vlog.VI(3).Infof("range conflict: {%q, %q}", r.Start, r.Limit)
 			return false
 		}
 
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
index ac9a12e..aac934b 100644
--- a/services/syncbase/store/util.go
+++ b/services/syncbase/store/util.go
@@ -8,25 +8,30 @@
 	"v.io/v23/verror"
 )
 
+// RunInTransaction runs the given fn in a transaction, managing retries and
+// commit/abort.
 func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
-	// TODO(rogulenko): We should eventually give up with
-	// ErrConcurrentTransaction.
-	// TODO(rogulenko): Fail on RPC errors.
-	for {
+	// TODO(rogulenko): Make the number of attempts configurable.
+	// TODO(rogulenko): Change the default number of attempts to 3. Currently,
+	// some storage engine tests fail when the number of attempts is that low.
+	var err error
+	for i := 0; i < 100; i++ {
+		// TODO(sadovsky): Should NewTransaction return an error? If not, how will
+		// we deal with RPC errors when talking to remote storage engines? (Note,
+		// client-side BeginBatch returns an error.)
 		tx := st.NewTransaction()
-		if err := fn(tx); err != nil {
+		if err = fn(tx); err != nil {
 			tx.Abort()
 			return err
 		}
-		err := tx.Commit()
-		if err == nil {
-			return nil
+		// TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
+		// failure or ErrConcurrentTransaction. Depending on the cause of failure,
+		// it may be desirable to retry the Commit() and/or to call Abort().
+		if err = tx.Commit(); verror.ErrorID(err) != ErrConcurrentTransaction.ID {
+			return err
 		}
-		if verror.ErrorID(err) == ErrConcurrentTransaction.ID {
-			continue
-		}
-		return err
 	}
+	return err
 }
 
 // CopyBytes copies elements from a source slice into a destination slice.
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index e1ee16f..c21264f 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -11,12 +11,11 @@
 import (
 	"flag"
 
+	"v.io/syncbase/x/ref/services/syncbase/server"
 	"v.io/v23"
 	"v.io/v23/security"
 	"v.io/v23/security/access"
 	"v.io/x/lib/vlog"
-
-	"v.io/syncbase/x/ref/services/syncbase/server"
 	"v.io/x/ref/lib/security/securityflag"
 	"v.io/x/ref/lib/signals"
 	_ "v.io/x/ref/runtime/factories/roaming"