syncbase: various small improvements, needed for demo
- add retry loop to RunInBatch
- make RunInBatch and RunInTransaction mirror each other
- add txn conflict logging
- other minor tweaks, e.g. comment improvements
Change-Id: Ia81dc2599a25d78231ed7f285477be8a8c627272
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 5443660..a9e7ac9 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -208,6 +208,9 @@
delete(d.txs, *d.batchId)
d.mu.Unlock()
}
+ if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
+ return verror.New(wire.ErrConcurrentBatch, ctx, err)
+ }
return err
}
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index f3f755c..4f03650 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -13,6 +13,7 @@
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
)
// commitedTransaction is only used as an element of db.txEvents.
@@ -160,11 +161,13 @@
func (tx *transaction) validateReadSet() bool {
for _, key := range tx.reads.Keys {
if tx.d.txTable.get(key) > tx.seq {
+ vlog.VI(3).Infof("key conflict: %q", key)
return false
}
}
for _, r := range tx.reads.Ranges {
if tx.d.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
+ vlog.VI(3).Infof("range conflict: {%q, %q}", r.Start, r.Limit)
return false
}
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
index ac9a12e..aac934b 100644
--- a/services/syncbase/store/util.go
+++ b/services/syncbase/store/util.go
@@ -8,25 +8,30 @@
"v.io/v23/verror"
)
+// RunInTransaction runs the given fn in a transaction, managing retries and
+// commit/abort.
func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
- // TODO(rogulenko): We should eventually give up with
- // ErrConcurrentTransaction.
- // TODO(rogulenko): Fail on RPC errors.
- for {
+ // TODO(rogulenko): Make the number of attempts configurable.
+ // TODO(rogulenko): Change the default number of attempts to 3. Currently,
+ // some storage engine tests fail when the number of attempts is that low.
+ var err error
+ for i := 0; i < 100; i++ {
+ // TODO(sadovsky): Should NewTransaction return an error? If not, how will
+ // we deal with RPC errors when talking to remote storage engines? (Note,
+ // client-side BeginBatch returns an error.)
tx := st.NewTransaction()
- if err := fn(tx); err != nil {
+ if err = fn(tx); err != nil {
tx.Abort()
return err
}
- err := tx.Commit()
- if err == nil {
- return nil
+ // TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
+ // failure or ErrConcurrentTransaction. Depending on the cause of failure,
+ // it may be desirable to retry the Commit() and/or to call Abort().
+ if err = tx.Commit(); verror.ErrorID(err) != ErrConcurrentTransaction.ID {
+ return err
}
- if verror.ErrorID(err) == ErrConcurrentTransaction.ID {
- continue
- }
- return err
}
+ return err
}
// CopyBytes copies elements from a source slice into a destination slice.
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index e1ee16f..c21264f 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -11,12 +11,11 @@
import (
"flag"
+ "v.io/syncbase/x/ref/services/syncbase/server"
"v.io/v23"
"v.io/v23/security"
"v.io/v23/security/access"
"v.io/x/lib/vlog"
-
- "v.io/syncbase/x/ref/services/syncbase/server"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
_ "v.io/x/ref/runtime/factories/roaming"