Merge "syncbase/vsync: Bug fix. Cleanup to prevent running out of fds."
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index beaa8d5..0fc865e 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -313,4 +313,5 @@
BoundToBatch() {"en": "bound to batch"}
NotBoundToBatch() {"en": "not bound to batch"}
ReadOnlyBatch() {"en": "batch is read-only"}
+ ConcurrentBatch() {"en": "concurrent batch"}
)
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 8f85cc1..c5af13f 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -28,12 +28,14 @@
ErrBoundToBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.BoundToBatch", verror.NoRetry, "{1:}{2:} bound to batch")
ErrNotBoundToBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.NotBoundToBatch", verror.NoRetry, "{1:}{2:} not bound to batch")
ErrReadOnlyBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.ReadOnlyBatch", verror.NoRetry, "{1:}{2:} batch is read-only")
+ ErrConcurrentBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.ConcurrentBatch", verror.NoRetry, "{1:}{2:} concurrent batch")
)
func init() {
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBoundToBatch.ID), "{1:}{2:} bound to batch")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNotBoundToBatch.ID), "{1:}{2:} not bound to batch")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrReadOnlyBatch.ID), "{1:}{2:} batch is read-only")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConcurrentBatch.ID), "{1:}{2:} concurrent batch")
}
// NewErrBoundToBatch returns an error with the ErrBoundToBatch ID.
@@ -51,6 +53,11 @@
return verror.New(ErrReadOnlyBatch, ctx)
}
+// NewErrConcurrentBatch returns an error with the ErrConcurrentBatch ID.
+func NewErrConcurrentBatch(ctx *context.T) error {
+ return verror.New(ErrConcurrentBatch, ctx)
+}
+
// DatabaseWatcherClientMethods is the client interface
// containing DatabaseWatcher methods.
//
diff --git a/v23/syncbase/nosql/batch.go b/v23/syncbase/nosql/batch.go
index 7655a6c..ea7290e 100644
--- a/v23/syncbase/nosql/batch.go
+++ b/v23/syncbase/nosql/batch.go
@@ -7,6 +7,7 @@
import (
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/v23/context"
+ "v.io/v23/verror"
)
type batch struct {
@@ -25,23 +26,25 @@
b.c.Abort(ctx)
}
-// TODO(sadovsky): Add retry loop.
+// RunInBatch runs the given fn in a batch, managing retries and commit/abort.
func RunInBatch(ctx *context.T, d Database, opts wire.BatchOptions, fn func(b BatchDatabase) error) error {
- b, err := d.BeginBatch(ctx, opts)
- if err != nil {
- return err
- }
- if err := fn(b); err != nil {
- b.Abort(ctx)
- return err
- }
- if err := b.Commit(ctx); err != nil {
+ // TODO(sadovsky): Make the number of attempts configurable.
+ var err error
+ for i := 0; i < 3; i++ {
+ b, err := d.BeginBatch(ctx, opts)
+ if err != nil {
+ return err
+ }
+ if err = fn(b); err != nil {
+ b.Abort(ctx)
+ return err
+ }
// TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
// failure or ErrConcurrentTransaction. Depending on the cause of failure,
- // it may be desirable to retry the Commit() and/or to call Abort(). For
- // now, we always abort on a failed commit.
- b.Abort(ctx)
- return err
+ // it may be desirable to retry the Commit() and/or to call Abort().
+ if err = b.Commit(ctx); verror.ErrorID(err) != wire.ErrConcurrentBatch.ID {
+ return err
+ }
}
- return nil
+ return err
}
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index dab14ee..13c3cc6 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -184,8 +184,8 @@
// is "", all rows with keys >= start are included.
// Concurrency semantics: It is legal to perform writes concurrently with
// Scan. The returned stream reads from a consistent snapshot taken at the
- // time of the RPC, and will not reflect subsequent writes to keys not yet
- // reached by the stream.
+ // time of the RPC (or at the time of BeginBatch, if in a batch), and will not
+ // reflect subsequent writes to keys not yet reached by the stream.
// See helpers nosql.Prefix(), nosql.Range(), nosql.SingleRow().
Scan(ctx *context.T, r RowRange) Stream
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 5443660..a9e7ac9 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -208,6 +208,9 @@
delete(d.txs, *d.batchId)
d.mu.Unlock()
}
+ if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
+ return verror.New(wire.ErrConcurrentBatch, ctx, err)
+ }
return err
}
diff --git a/x/ref/services/syncbase/store/leveldb/transaction.go b/x/ref/services/syncbase/store/leveldb/transaction.go
index f3f755c..4f03650 100644
--- a/x/ref/services/syncbase/store/leveldb/transaction.go
+++ b/x/ref/services/syncbase/store/leveldb/transaction.go
@@ -13,6 +13,7 @@
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
)
// commitedTransaction is only used as an element of db.txEvents.
@@ -160,11 +161,13 @@
func (tx *transaction) validateReadSet() bool {
for _, key := range tx.reads.Keys {
if tx.d.txTable.get(key) > tx.seq {
+ vlog.VI(3).Infof("key conflict: %q", key)
return false
}
}
for _, r := range tx.reads.Ranges {
if tx.d.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
+ vlog.VI(3).Infof("range conflict: {%q, %q}", r.Start, r.Limit)
return false
}
diff --git a/x/ref/services/syncbase/store/util.go b/x/ref/services/syncbase/store/util.go
index ac9a12e..aac934b 100644
--- a/x/ref/services/syncbase/store/util.go
+++ b/x/ref/services/syncbase/store/util.go
@@ -8,25 +8,30 @@
"v.io/v23/verror"
)
+// RunInTransaction runs the given fn in a transaction, managing retries and
+// commit/abort.
func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
- // TODO(rogulenko): We should eventually give up with
- // ErrConcurrentTransaction.
- // TODO(rogulenko): Fail on RPC errors.
- for {
+ // TODO(rogulenko): Make the number of attempts configurable.
+ // TODO(rogulenko): Change the default number of attempts to 3. Currently,
+ // some storage engine tests fail when the number of attempts is that low.
+ var err error
+ for i := 0; i < 100; i++ {
+ // TODO(sadovsky): Should NewTransaction return an error? If not, how will
+ // we deal with RPC errors when talking to remote storage engines? (Note,
+ // client-side BeginBatch returns an error.)
tx := st.NewTransaction()
- if err := fn(tx); err != nil {
+ if err = fn(tx); err != nil {
tx.Abort()
return err
}
- err := tx.Commit()
- if err == nil {
- return nil
+ // TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
+ // failure or ErrConcurrentTransaction. Depending on the cause of failure,
+ // it may be desirable to retry the Commit() and/or to call Abort().
+ if err = tx.Commit(); verror.ErrorID(err) != ErrConcurrentTransaction.ID {
+ return err
}
- if verror.ErrorID(err) == ErrConcurrentTransaction.ID {
- continue
- }
- return err
}
+ return err
}
// CopyBytes copies elements from a source slice into a destination slice.
diff --git a/x/ref/services/syncbase/syncbased/main.go b/x/ref/services/syncbase/syncbased/main.go
index e1ee16f..c21264f 100644
--- a/x/ref/services/syncbase/syncbased/main.go
+++ b/x/ref/services/syncbase/syncbased/main.go
@@ -11,12 +11,11 @@
import (
"flag"
+ "v.io/syncbase/x/ref/services/syncbase/server"
"v.io/v23"
"v.io/v23/security"
"v.io/v23/security/access"
"v.io/x/lib/vlog"
-
- "v.io/syncbase/x/ref/services/syncbase/server"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
_ "v.io/x/ref/runtime/factories/roaming"