Merge "syncbase/vsync: Bug fix. Cleanup to prevent running out of fds."
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index beaa8d5..0fc865e 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -313,4 +313,5 @@
 	BoundToBatch() {"en": "bound to batch"}
 	NotBoundToBatch() {"en": "not bound to batch"}
 	ReadOnlyBatch() {"en": "batch is read-only"}
+	ConcurrentBatch() {"en": "concurrent batch"}
 )
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 8f85cc1..c5af13f 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -28,12 +28,14 @@
 	ErrBoundToBatch    = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.BoundToBatch", verror.NoRetry, "{1:}{2:} bound to batch")
 	ErrNotBoundToBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.NotBoundToBatch", verror.NoRetry, "{1:}{2:} not bound to batch")
 	ErrReadOnlyBatch   = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.ReadOnlyBatch", verror.NoRetry, "{1:}{2:} batch is read-only")
+	ErrConcurrentBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.ConcurrentBatch", verror.NoRetry, "{1:}{2:} concurrent batch")
 )
 
 func init() {
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBoundToBatch.ID), "{1:}{2:} bound to batch")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNotBoundToBatch.ID), "{1:}{2:} not bound to batch")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrReadOnlyBatch.ID), "{1:}{2:} batch is read-only")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConcurrentBatch.ID), "{1:}{2:} concurrent batch")
 }
 
 // NewErrBoundToBatch returns an error with the ErrBoundToBatch ID.
@@ -51,6 +53,11 @@
 	return verror.New(ErrReadOnlyBatch, ctx)
 }
 
+// NewErrConcurrentBatch returns an error with the ErrConcurrentBatch ID.
+func NewErrConcurrentBatch(ctx *context.T) error {
+	return verror.New(ErrConcurrentBatch, ctx)
+}
+
 // DatabaseWatcherClientMethods is the client interface
 // containing DatabaseWatcher methods.
 //
diff --git a/v23/syncbase/nosql/batch.go b/v23/syncbase/nosql/batch.go
index 7655a6c..ea7290e 100644
--- a/v23/syncbase/nosql/batch.go
+++ b/v23/syncbase/nosql/batch.go
@@ -7,6 +7,7 @@
 import (
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/v23/context"
+	"v.io/v23/verror"
 )
 
 type batch struct {
@@ -25,23 +26,25 @@
 	b.c.Abort(ctx)
 }
 
-// TODO(sadovsky): Add retry loop.
+// RunInBatch runs the given fn in a batch, managing retries and commit/abort.
 func RunInBatch(ctx *context.T, d Database, opts wire.BatchOptions, fn func(b BatchDatabase) error) error {
-	b, err := d.BeginBatch(ctx, opts)
-	if err != nil {
-		return err
-	}
-	if err := fn(b); err != nil {
-		b.Abort(ctx)
-		return err
-	}
-	if err := b.Commit(ctx); err != nil {
+	// TODO(sadovsky): Make the number of attempts configurable.
+	var err error
+	for i := 0; i < 3; i++ {
+		b, err := d.BeginBatch(ctx, opts)
+		if err != nil {
+			return err
+		}
+		if err = fn(b); err != nil {
+			b.Abort(ctx)
+			return err
+		}
 		// TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
 		// failure or ErrConcurrentTransaction. Depending on the cause of failure,
-		// it may be desirable to retry the Commit() and/or to call Abort(). For
-		// now, we always abort on a failed commit.
-		b.Abort(ctx)
-		return err
+		// it may be desirable to retry the Commit() and/or to call Abort().
+		if err = b.Commit(ctx); verror.ErrorID(err) != wire.ErrConcurrentBatch.ID {
+			return err
+		}
 	}
-	return nil
+	return err
 }
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index dab14ee..13c3cc6 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -184,8 +184,8 @@
 	// is "", all rows with keys >= start are included.
 	// Concurrency semantics: It is legal to perform writes concurrently with
 	// Scan. The returned stream reads from a consistent snapshot taken at the
-	// time of the RPC, and will not reflect subsequent writes to keys not yet
-	// reached by the stream.
+	// time of the RPC (or at the time of BeginBatch, if in a batch), and will not
+	// reflect subsequent writes to keys not yet reached by the stream.
 	// See helpers nosql.Prefix(), nosql.Range(), nosql.SingleRow().
 	Scan(ctx *context.T, r RowRange) Stream
 
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 5443660..a9e7ac9 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -208,6 +208,9 @@
 		delete(d.txs, *d.batchId)
 		d.mu.Unlock()
 	}
+	if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
+		return verror.New(wire.ErrConcurrentBatch, ctx, err)
+	}
 	return err
 }
 
diff --git a/x/ref/services/syncbase/store/leveldb/transaction.go b/x/ref/services/syncbase/store/leveldb/transaction.go
index f3f755c..4f03650 100644
--- a/x/ref/services/syncbase/store/leveldb/transaction.go
+++ b/x/ref/services/syncbase/store/leveldb/transaction.go
@@ -13,6 +13,7 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
 )
 
 // commitedTransaction is only used as an element of db.txEvents.
@@ -160,11 +161,13 @@
 func (tx *transaction) validateReadSet() bool {
 	for _, key := range tx.reads.Keys {
 		if tx.d.txTable.get(key) > tx.seq {
+			vlog.VI(3).Infof("key conflict: %q", key)
 			return false
 		}
 	}
 	for _, r := range tx.reads.Ranges {
 		if tx.d.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
+			vlog.VI(3).Infof("range conflict: {%q, %q}", r.Start, r.Limit)
 			return false
 		}
 
diff --git a/x/ref/services/syncbase/store/util.go b/x/ref/services/syncbase/store/util.go
index ac9a12e..aac934b 100644
--- a/x/ref/services/syncbase/store/util.go
+++ b/x/ref/services/syncbase/store/util.go
@@ -8,25 +8,30 @@
 	"v.io/v23/verror"
 )
 
+// RunInTransaction runs the given fn in a transaction, managing retries and
+// commit/abort.
 func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
-	// TODO(rogulenko): We should eventually give up with
-	// ErrConcurrentTransaction.
-	// TODO(rogulenko): Fail on RPC errors.
-	for {
+	// TODO(rogulenko): Make the number of attempts configurable.
+	// TODO(rogulenko): Change the default number of attempts to 3. Currently,
+	// some storage engine tests fail when the number of attempts is that low.
+	var err error
+	for i := 0; i < 100; i++ {
+		// TODO(sadovsky): Should NewTransaction return an error? If not, how will
+		// we deal with RPC errors when talking to remote storage engines? (Note,
+		// client-side BeginBatch returns an error.)
 		tx := st.NewTransaction()
-		if err := fn(tx); err != nil {
+		if err = fn(tx); err != nil {
 			tx.Abort()
 			return err
 		}
-		err := tx.Commit()
-		if err == nil {
-			return nil
+		// TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
+		// failure or ErrConcurrentTransaction. Depending on the cause of failure,
+		// it may be desirable to retry the Commit() and/or to call Abort().
+		if err = tx.Commit(); verror.ErrorID(err) != ErrConcurrentTransaction.ID {
+			return err
 		}
-		if verror.ErrorID(err) == ErrConcurrentTransaction.ID {
-			continue
-		}
-		return err
 	}
+	return err
 }
 
 // CopyBytes copies elements from a source slice into a destination slice.
diff --git a/x/ref/services/syncbase/syncbased/main.go b/x/ref/services/syncbase/syncbased/main.go
index e1ee16f..c21264f 100644
--- a/x/ref/services/syncbase/syncbased/main.go
+++ b/x/ref/services/syncbase/syncbased/main.go
@@ -11,12 +11,11 @@
 import (
 	"flag"
 
+	"v.io/syncbase/x/ref/services/syncbase/server"
 	"v.io/v23"
 	"v.io/v23/security"
 	"v.io/v23/security/access"
 	"v.io/x/lib/vlog"
-
-	"v.io/syncbase/x/ref/services/syncbase/server"
 	"v.io/x/ref/lib/security/securityflag"
 	"v.io/x/ref/lib/signals"
 	_ "v.io/x/ref/runtime/factories/roaming"