Merge "Fix flakyness of Conflict Resolution integration tests."
diff --git a/syncbase/featuretests/cr_v23_test.go b/syncbase/featuretests/cr_v23_test.go
index b7f84b1..52365d8 100644
--- a/syncbase/featuretests/cr_v23_test.go
+++ b/syncbase/featuretests/cr_v23_test.go
@@ -18,6 +18,7 @@
 	"v.io/v23/verror"
 	"v.io/x/ref/lib/v23test"
 	"v.io/x/ref/services/syncbase/server/util"
+	"v.io/x/ref/test/testutil"
 )
 
 // Tests the conflict resolution configuration rules.
@@ -33,10 +34,6 @@
 // TODO(jlodhia): Add more rules based on value type and combination of key
 // prefix and value type once its implemented.
 func TestV23CRRuleConfig(t *testing.T) {
-	// TODO(jlodhia): Re-enable test after following issue is resolved.
-	// https://github.com/vanadium/issues/issues/1027
-	t.Skip()
-
 	v23test.SkipUnlessRunningIntegrationTests(t)
 	sh := v23test.NewShell(t, v23test.Opts{})
 	defer sh.Cleanup()
@@ -52,7 +49,8 @@
 	ok(t, updateData(client1Ctx, "s1", 0, 10, "concurrentUpdate"))
 
 	schemaKeyPrefix := "foo0"
-	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaKeyPrefix, 2, func() {
+	expectedOnConflictCallCount := 1
+	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaKeyPrefix, expectedOnConflictCallCount, func() {
 		// Re enable sync between the two syncbases and wait for a bit to let the
 		// syncbases sync and call conflict resolution.
 		ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -157,10 +155,6 @@
 // 2) 5 rows written as a single batch on both syncbases resulting into a
 //    single conflict for the batch.
 func TestV23CRAppResolved(t *testing.T) {
-	// TODO(jlodhia): Re-enable test after following issue is resolved.
-	// https://github.com/vanadium/issues/issues/1027
-	t.Skip()
-
 	v23test.SkipUnlessRunningIntegrationTests(t)
 	sh := v23test.NewShell(t, v23test.Opts{})
 	defer sh.Cleanup()
@@ -180,9 +174,8 @@
 
 	schemaPrefix := "foo"
 	keyPrefix := "foo"
-	// TODO(jlodhia): change the expected num conflicts from 12 to 6 once
-	// sync's cr code handles duplicate resolutions internally.
-	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 12, func() {
+	expectedOnConflictCallCount := 6
+	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
 		// Re enable sync between the two syncbases and wait for a bit to let the
 		// syncbases sync and call conflict resolution.
 		ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -228,9 +221,8 @@
 
 	schemaPrefix := "foo1"
 	keyPrefix := "foo"
-	// TODO(jlodhia): change the expected num conflicts from 2 to 1 once
-	// sync's cr code handles duplicate resolutions internally.
-	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 2, func() {
+	expectedOnConflictCallCount := 1
+	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
 		// Re enable sync between the two syncbases and wait for a bit to let the
 		// syncbases sync and call conflict resolution.
 		ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -282,9 +274,8 @@
 
 	schemaPrefix := "foo"
 	keyPrefix := "foo"
-	// TODO(jlodhia): change the expected num conflicts from 2 to 1 once
-	// sync's cr code handles duplicate resolutions internally.
-	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 2, func() {
+	expectedOnConflictCallCount := 1
+	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
 		// Re enable sync between the two syncbases and wait for a bit to let the
 		// syncbases sync and call conflict resolution.
 		ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -321,7 +312,29 @@
 // harder to follow. It would be better to define two helper functions: one
 // to do the stuff before fn(), and another to do the stuff after fn(). (Also,
 // switching to channel-based signalling should simplify things substantially.)
-func runWithAppBasedResolver(t *testing.T, client0Ctx, client1Ctx *context.T, schemaPrefix string, maxCallCount int, fn func()) {
+func runWithAppBasedResolver(t *testing.T, client0Ctx, client1Ctx *context.T, schemaPrefix string, expectedCallCount int, fn func()) {
+	// 1) During tests, S0 and S1 may sync with each other at the same time. Hence
+	// they end up doing conflict resolution for the same objects on both sides
+	// creating new resolved objects independently. These resolved versions
+	// get synced again creating another set of conflicts causing 2X the amount
+	// of OnConflict calls. TODO(jlodhia): Once Sync handles duplicate
+	// resolutions internally, this will no longer be true.
+	//
+	// 2) When the CPU load goes high, the watcher can fall behind in updating
+	// sync dag with the latest updates. This leads to scenarios where S0's dag
+	// may catch up with all N updates and send them to S1 while S1's watcher
+	// has only caught up to N - k updates. This means that S1 will only
+	// recognize N - k conflicts instead of N conflicts. These conflicts result
+	// in up to N - k OnConflict calls. When S1 tries to commit the resolved
+	// updates, the store rejects the commit on account of k objects it has
+	// not yet caught up on. Initiator will sleep for 1 sec and then retry
+	// processing the N objects received from S0, causing up to N more
+	// OnConflict calls and finally succeeding in committing the result.
+	//
+	// Due to (1) and (2), the total number of calls to OnConflict can be up to
+	// 3X the expected call count.
+	maxCallCount := 3 * expectedCallCount
+
 	// Create and hold a conflict resolution connection on s0 and s1 to receive
 	// future conflicts. The expected call count is 2 * the number of batches
 	// because each batch is being concurrently resolved on s0 and s1 creating new
@@ -443,18 +456,15 @@
 	r := tb.Row(key)
 	want := valuePrefix + key
 
-	// Wait up to 5 seconds for the correct key and value to appear.
-	sleepTimeMs, maxAttempts := 50, 100
 	var value string
-	for i := 0; i < maxAttempts; i++ {
+	return testutil.RetryFor(10 * time.Second, func() error {
 		if err := r.Get(ctx, &value); (err == nil) && (value == want) {
-			return nil
+			return nil  // Value found.
 		} else if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
 			return fmt.Errorf("Syncbase Error while fetching key %v: %v", key, err)
 		}
-		time.Sleep(time.Duration(sleepTimeMs) * time.Millisecond)
-	}
-	return fmt.Errorf("Timed out waiting for value %v but found %v after %d milliseconds.", want, value, maxAttempts*sleepTimeMs)
+		return testutil.TryAgain(fmt.Errorf("Value expected: %v, found: %v", want, value))
+	})
 }
 
 func endTest(ctx *context.T, syncbaseName, prefix, signalKey string) error {
@@ -478,9 +488,7 @@
 	r := tb.Row(signalKey)
 
 	var end bool
-	sleepTimeMs, maxAttempts := 50, 100 // Max wait time of 5 seconds.
-	for cnt := 0; cnt < maxAttempts; cnt++ {
-		time.Sleep(time.Duration(sleepTimeMs) * time.Millisecond)
+	return testutil.RetryFor(10 * time.Second, func() error {
 		if err := r.Get(ctx, &end); err != nil {
 			if verror.ErrorID(err) != verror.ErrNoExist.ID {
 				return fmt.Errorf("r.Get() for endkey failed: %v", err)
@@ -489,8 +497,8 @@
 		if end {
 			return nil
 		}
-	}
-	return fmt.Errorf("Timed out waiting for signal %v after %d milliseconds.", signalKey, maxAttempts*sleepTimeMs)
+		return testutil.TryAgain(fmt.Errorf("Signal %v not received", signalKey))
+	})
 }
 
 ////////////////////////////////////////////////////////
diff --git a/syncbase/nosql/database.go b/syncbase/nosql/database.go
index c491203..9deef44 100644
--- a/syncbase/nosql/database.go
+++ b/syncbase/nosql/database.go
@@ -296,7 +296,7 @@
 		childCtx := context.WithValue(ctx, reconnectionCount, count)
 		// listenForConflicts is a blocking method that returns only when the
 		// conflict stream is broken.
-		if err := d.listenForConflicts(childCtx); err != nil {
+		if err := d.listenForConflicts(childCtx); err != nil && !d.crState.isDisconnected() {
 			vlog.Errorf("Conflict resolution connection ended with error: %v", err)
 		}
 
@@ -349,7 +349,7 @@
 		count++
 		ri := toResolutionInfo(v, count != size)
 		if err := stream.Send(ri); err != nil {
-			vlog.Error("Error while sending resolution")
+			vlog.Errorf("Error while sending resolution: %v", err)
 			return err
 		}
 	}