Merge "Fix flakyness of Conflict Resolution integration tests."
diff --git a/syncbase/featuretests/cr_v23_test.go b/syncbase/featuretests/cr_v23_test.go
index b7f84b1..52365d8 100644
--- a/syncbase/featuretests/cr_v23_test.go
+++ b/syncbase/featuretests/cr_v23_test.go
@@ -18,6 +18,7 @@
"v.io/v23/verror"
"v.io/x/ref/lib/v23test"
"v.io/x/ref/services/syncbase/server/util"
+ "v.io/x/ref/test/testutil"
)
// Tests the conflict resolution configuration rules.
@@ -33,10 +34,6 @@
// TODO(jlodhia): Add more rules based on value type and combination of key
// prefix and value type once its implemented.
func TestV23CRRuleConfig(t *testing.T) {
- // TODO(jlodhia): Re-enable test after following issue is resolved.
- // https://github.com/vanadium/issues/issues/1027
- t.Skip()
-
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, v23test.Opts{})
defer sh.Cleanup()
@@ -52,7 +49,8 @@
ok(t, updateData(client1Ctx, "s1", 0, 10, "concurrentUpdate"))
schemaKeyPrefix := "foo0"
- runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaKeyPrefix, 2, func() {
+ expectedOnConflictCallCount := 1
+ runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaKeyPrefix, expectedOnConflictCallCount, func() {
// Re enable sync between the two syncbases and wait for a bit to let the
// syncbases sync and call conflict resolution.
ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -157,10 +155,6 @@
// 2) 5 rows written as a single batch on both syncbases resulting into a
// single conflict for the batch.
func TestV23CRAppResolved(t *testing.T) {
- // TODO(jlodhia): Re-enable test after following issue is resolved.
- // https://github.com/vanadium/issues/issues/1027
- t.Skip()
-
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, v23test.Opts{})
defer sh.Cleanup()
@@ -180,9 +174,8 @@
schemaPrefix := "foo"
keyPrefix := "foo"
- // TODO(jlodhia): change the expected num conflicts from 12 to 6 once
- // sync's cr code handles duplicate resolutions internally.
- runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 12, func() {
+ expectedOnConflictCallCount := 6
+ runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
// Re enable sync between the two syncbases and wait for a bit to let the
// syncbases sync and call conflict resolution.
ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -228,9 +221,8 @@
schemaPrefix := "foo1"
keyPrefix := "foo"
- // TODO(jlodhia): change the expected num conflicts from 2 to 1 once
- // sync's cr code handles duplicate resolutions internally.
- runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 2, func() {
+ expectedOnConflictCallCount := 1
+ runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
// Re enable sync between the two syncbases and wait for a bit to let the
// syncbases sync and call conflict resolution.
ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -282,9 +274,8 @@
schemaPrefix := "foo"
keyPrefix := "foo"
- // TODO(jlodhia): change the expected num conflicts from 2 to 1 once
- // sync's cr code handles duplicate resolutions internally.
- runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 2, func() {
+ expectedOnConflictCallCount := 1
+ runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
// Re enable sync between the two syncbases and wait for a bit to let the
// syncbases sync and call conflict resolution.
ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -321,7 +312,29 @@
// harder to follow. It would be better to define two helper functions: one
// to do the stuff before fn(), and another to do the stuff after fn(). (Also,
// switching to channel-based signalling should simplify things substantially.)
-func runWithAppBasedResolver(t *testing.T, client0Ctx, client1Ctx *context.T, schemaPrefix string, maxCallCount int, fn func()) {
+func runWithAppBasedResolver(t *testing.T, client0Ctx, client1Ctx *context.T, schemaPrefix string, expectedCallCount int, fn func()) {
+ // 1) During tests, S0 and S1 may sync with each other at the same time. Hence
+ // they end up doing conflict resolution for the same objects on both sides
+ // creating new resolved objects independently. These resolved versions
+ // get synced again creating another set of conflicts causing 2X the amount
+ // of OnConflict calls. TODO(jlodhia): Once Sync handles duplicate
+ // resolutions internally, this will no longer be true.
+ //
+ // 2) When the CPU load goes high, the watcher can fall behind in updating
+ // sync dag with the latest updates. This leads to scenarios where S0's dag
+ // may catch up with all N updates and send them to S1 while S1's watcher
+ // has only caught up to N - k updates. This means that S1 will only
+ // recognize N - k conflicts instead of N conflicts. These conflicts result
+ // in up to N - k OnConflict calls. When S1 tries to commit the resolved
+ // updates, the store rejects the commit on account of k objects it has
+ // not yet caught up on. Initiator will sleep for 1 sec and then retry
+ // processing the N objects received from S0, causing up to N more
+ // OnConflict calls and finally succeeding in committing the result.
+ //
+ // Due to (1) and (2), the total number of calls to OnConflict can be up to
+ // 3X the expected call count.
+ maxCallCount := 3 * expectedCallCount
+
// Create and hold a conflict resolution connection on s0 and s1 to receive
// future conflicts. The expected call count is 2 * the number of batches
// because each batch is being concurrently resolved on s0 and s1 creating new
@@ -443,18 +456,15 @@
r := tb.Row(key)
want := valuePrefix + key
- // Wait up to 5 seconds for the correct key and value to appear.
- sleepTimeMs, maxAttempts := 50, 100
var value string
- for i := 0; i < maxAttempts; i++ {
+ return testutil.RetryFor(10 * time.Second, func() error {
if err := r.Get(ctx, &value); (err == nil) && (value == want) {
- return nil
+ return nil // Value found.
} else if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
return fmt.Errorf("Syncbase Error while fetching key %v: %v", key, err)
}
- time.Sleep(time.Duration(sleepTimeMs) * time.Millisecond)
- }
- return fmt.Errorf("Timed out waiting for value %v but found %v after %d milliseconds.", want, value, maxAttempts*sleepTimeMs)
+ return testutil.TryAgain(fmt.Errorf("Value expected: %v, found: %v", want, value))
+ })
}
func endTest(ctx *context.T, syncbaseName, prefix, signalKey string) error {
@@ -478,9 +488,7 @@
r := tb.Row(signalKey)
var end bool
- sleepTimeMs, maxAttempts := 50, 100 // Max wait time of 5 seconds.
- for cnt := 0; cnt < maxAttempts; cnt++ {
- time.Sleep(time.Duration(sleepTimeMs) * time.Millisecond)
+ return testutil.RetryFor(10 * time.Second, func() error {
if err := r.Get(ctx, &end); err != nil {
if verror.ErrorID(err) != verror.ErrNoExist.ID {
return fmt.Errorf("r.Get() for endkey failed: %v", err)
@@ -489,8 +497,8 @@
if end {
return nil
}
- }
- return fmt.Errorf("Timed out waiting for signal %v after %d milliseconds.", signalKey, maxAttempts*sleepTimeMs)
+ return testutil.TryAgain(fmt.Errorf("Signal %v not received", signalKey))
+ })
}
////////////////////////////////////////////////////////
diff --git a/syncbase/nosql/database.go b/syncbase/nosql/database.go
index c491203..9deef44 100644
--- a/syncbase/nosql/database.go
+++ b/syncbase/nosql/database.go
@@ -296,7 +296,7 @@
childCtx := context.WithValue(ctx, reconnectionCount, count)
// listenForConflicts is a blocking method that returns only when the
// conflict stream is broken.
- if err := d.listenForConflicts(childCtx); err != nil {
+ if err := d.listenForConflicts(childCtx); err != nil && !d.crState.isDisconnected() {
vlog.Errorf("Conflict resolution connection ended with error: %v", err)
}
@@ -349,7 +349,7 @@
count++
ri := toResolutionInfo(v, count != size)
if err := stream.Send(ri); err != nil {
- vlog.Error("Error while sending resolution")
+ vlog.Errorf("Error while sending resolution: %v", err)
return err
}
}