syncbase/vsync:
Bug fixes for handling cases with conflicting updates.
Added an integration test to go with it.

Issues:
* Contention between initiator and watcher when receiving deltas.
* hasConflict when retried didn't correctly capture the change in DAG head.
* Link log records were not created where needed.
* When sync was stale compared to store, the retry error was not correctly handled.

Change-Id: I6fa1f13cd5aabb8290e6e2631ea8458b55bbfe7f
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 0633079..77e5d27 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"math/rand"
 	"strconv"
 	"strings"
 	"time"
@@ -160,6 +161,72 @@
 	tu.RunClient(t, client0Creds, runVerifyLocalAndRemoteData, "sync0")
 }
 
+// V23 TestSyncbasedExchangeDeltasWithConflicts tests the exchange of deltas
+// between two Syncbase instances and their clients in the presence of
+// conflicting updates. The 1st client creates a SyncGroup and puts some
+// database entries in it. The 2nd client joins that SyncGroup and reads the
+// database entries. Both clients then update a subset of existing keys
+// concurrently, and sync with each other. During sync, the following
+// possibilities arise: (1) Both clients make their local updates first, sync
+// with each other to detect conflicts. Resolution will cause one of the clients
+// to see a new value based on the timestamp. (2) One client's update is synced
+// before the other client has a chance to commit its update. The other client's
+// update will then not be a conflict but a valid update building on the first
+// one's change.
+//
+// Note that the verification done from the client side can have false positives
+// re. the test's success. Since we cannot accurately predict which client's
+// updates win, the test passes if we find either outcome. However, this could
+// also imply that the sync failed, and each client is merely reading its own
+// local value. The verification step mainly verifies that syncbased is still
+// responsive and that the data is not corrupt.
+//
+// TODO(hpucha): We could diff the states of the two clients and ensure they are
+// identical. Optionally we could expose inner state of syncbased via some
+// debug methods.
+func V23TestSyncbasedExchangeDeltasWithConflicts(t *v23tests.T) {
+	// Run it multiple times to exercise different interactions between sync
+	// and local updates that change every run due to timing.
+	for i := 0; i < 10; i++ {
+		testSyncbasedExchangeDeltasWithConflicts(t)
+	}
+}
+
+func testSyncbasedExchangeDeltasWithConflicts(t *v23tests.T) {
+	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+	server0Creds, _ := t.Shell().NewChildCredentials("s0")
+	client0Creds, _ := t.Shell().NewChildCredentials("c0")
+	cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+		`{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+	defer cleanSync0()
+
+	server1Creds, _ := t.Shell().NewChildCredentials("s1")
+	client1Creds, _ := t.Shell().NewChildCredentials("c1")
+	cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+		`{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+	defer cleanSync1()
+
+	sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+
+	tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+	tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
+	tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
+
+	tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+	tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
+	tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
+
+	go tu.RunClient(t, client0Creds, runUpdateData, "sync0", "5")
+	d := time.Duration(rand.Int63n(50)) * time.Millisecond
+	time.Sleep(d)
+	tu.RunClient(t, client1Creds, runUpdateData, "sync1", "5")
+
+	time.Sleep(10 * time.Second)
+
+	tu.RunClient(t, client0Creds, runVerifyConflictResolution, "sync0")
+	tu.RunClient(t, client1Creds, runVerifyConflictResolution, "sync1")
+}
+
 // V23TestNestedSyncGroups tests the exchange of deltas between two Syncbase
 // instances and their clients with nested SyncGroups. The 1st client creates
 // two SyncGroups at prefixes "f" and "foo" and puts some database entries in
@@ -405,7 +472,7 @@
 	for i := start; i < start+5; i++ {
 		key := fmt.Sprintf("foo%d", i)
 		r := tb.Row(key)
-		if err := r.Put(ctx, "testkey1"+key); err != nil {
+		if err := r.Put(ctx, "testkey"+args[0]+key); err != nil {
 			return fmt.Errorf("r.Put() failed: %v\n", err)
 		}
 	}
@@ -544,6 +611,47 @@
 	return nil
 }, "runVerifyDeletedData")
 
+var runVerifyConflictResolution = modules.Register(func(env *modules.Env, args ...string) error {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	a := syncbase.NewService(args[0]).App("a")
+	d := a.NoSQLDatabase("d", nil)
+	tb := d.Table("tb")
+
+	wantData := []struct {
+		start  uint64
+		count  uint64
+		valPfx []string
+	}{
+		{0, 5, []string{"testkey"}},
+		{5, 5, []string{"testkeysync0", "testkeysync1"}},
+	}
+
+	// Verify that all keys and values made it correctly.
+	for _, d := range wantData {
+		for i := d.start; i < d.start+d.count; i++ {
+			key := fmt.Sprintf("foo%d", i)
+			r := tb.Row(key)
+			var got string
+			if err := r.Get(ctx, &got); err != nil {
+				return fmt.Errorf("r.Get() failed: %v\n", err)
+			}
+			match := 0
+			for _, p := range d.valPfx {
+				want := p + key
+				if got == want {
+					match++
+				}
+			}
+			if match != 1 {
+				return fmt.Errorf("unexpected value: got %q, match %v, want %v\n", got, match, d.valPfx)
+			}
+		}
+	}
+	return nil
+}, "runVerifyConflictResolution")
+
 var runVerifyNonSyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
@@ -593,7 +701,7 @@
 		valPfx string
 	}{
 		{0, 5, "testkey"},
-		{5, 5, "testkey1"},
+		{5, 5, "testkeysync1"},
 		{10, 10, "testkey"},
 	}
 
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index 88e046d..d235b1b 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -41,6 +41,10 @@
 	v23tests.RunTest(t, V23TestSyncbasedExchangeDeltas)
 }
 
+func TestV23SyncbasedExchangeDeltasWithConflicts(t *testing.T) {
+	v23tests.RunTest(t, V23TestSyncbasedExchangeDeltasWithConflicts)
+}
+
 func TestV23NestedSyncGroups(t *testing.T) {
 	v23tests.RunTest(t, V23TestNestedSyncGroups)
 }
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 728cb0f..3850d5b 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -100,9 +100,9 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
-
 	"v.io/v23/context"
 	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
 )
 
 const (
@@ -141,11 +141,15 @@
 // graftInfo holds the state of an object's node grafting in the DAG.
 // It is ephemeral (in-memory), used during a single sync operation to track
 // where the new DAG fragments are attached to the existing DAG for the object:
-// - newNodes:   the set of newly added nodes; used to detect the type of edges
-//               between nodes (new-node to old-node or vice versa).
-// - newHeads:   the set of new candidate head nodes; used to detect conflicts.
-// - graftNodes: the set of old nodes on which new nodes were added, and their
-//               level in the DAG; used to find common ancestors for conflicts.
+// - newNodes:    the set of newly added nodes; used to detect the type of edges
+//                between nodes (new-node to old-node or vice versa).
+// - newHeads:    the set of new candidate head nodes; used to detect conflicts.
+// - graftNodes:  the set of old nodes on which new nodes were added, and their
+//                level in the DAG; used to find common ancestors for conflicts.
+// - oldHeadSnap: snapshot of the current local head known by sync, used in
+//                conflict detection, particularly when conflict detection needs
+//                to be retried due to sync dag state being stale compared
+//                to local store.
 //
 // After the received mutations are applied, if there are two heads in the
 // newHeads set, there is a conflict to be resolved for the object.  Otherwise,
@@ -155,9 +159,10 @@
 // TODO(rdaoud): support open DAGs to handle delayed conflict resolution by
 // tracking multiple dangling remote heads in addition to the local head node.
 type graftInfo struct {
-	newNodes   map[string]bool
-	newHeads   map[string]bool
-	graftNodes map[string]uint64
+	newNodes    map[string]bool
+	newHeads    map[string]bool
+	graftNodes  map[string]uint64
+	oldHeadSnap string
 }
 
 // newBatchInfo allocates and initializes a batch info entry.
@@ -460,9 +465,12 @@
 // - Yes: return (true, newHead, oldHead, ancestor)   -- from a common past
 // - Yes: return (true, newHead, oldHead, NoVersion)  -- from disjoint pasts
 // - No:  return (false, newHead, oldHead, NoVersion) -- no conflict
+//
 // A conflict exists when there are two new-head nodes in the graft structure.
 // It means the newly added object versions are not derived in part from this
-// device's current knowledge.  If there is a single new-head, the object
+// device's current knowledge. A conflict also exists if the snapshotted local
+// head is different from the current local head. If there is a single new-head
+// and the snapshot head is the same as the current local head, the object
 // changes were applied without triggering a conflict.
 func hasConflict(ctx *context.T, st store.StoreReader, oid string, graft graftMap) (isConflict bool, newHead, oldHead, ancestor string, err error) {
 	isConflict = false
@@ -493,23 +501,39 @@
 	// on this device and will not trigger a conflict.
 	oldHead, _ = getHead(ctx, st, oid)
 
-	// If there is only one new head node there is no conflict.  The new
-	// head is that single one, even if it might also be the same old node.
+	// If there is only one new head node and the snapshotted old head is
+	// still unchanged, there is no conflict. The new head is that single
+	// one, even if it might also be the same old node.
 	if numHeads == 1 {
 		for head := range info.newHeads {
 			newHead = head
 		}
-		return
+		if newHead == info.oldHeadSnap {
+			// Only link log records could've been received.
+			newHead = oldHead
+			return
+		} else if oldHead == info.oldHeadSnap {
+			return
+		}
 	}
 
-	// With two candidate head nodes, the new head is the non-old one.
+	// The new head is the non-old one.
 	for head := range info.newHeads {
-		if head != oldHead {
+		if head != info.oldHeadSnap {
 			newHead = head
 			break
 		}
 	}
 
+	// There wasn't a conflict at the old snapshot, but now there is. The
+	// snapshotted head is the common ancestor.
+	isConflict = true
+	if numHeads == 1 {
+		vlog.VI(4).Infof("sync: hasConflict: old graft snapshot %v, head %s", graft, oldHead)
+		ancestor = info.oldHeadSnap
+		return
+	}
+
 	// There is a conflict: the best choice ancestor is the graft node with
 	// the largest level (farthest from the root).  It is possible in some
 	// corner cases to have multiple graft nodes at the same level.  This
@@ -524,7 +548,6 @@
 	// graft nodes (empty set) and thus no common ancestor because the two
 	// DAG fragments were created from distinct root nodes.  The "NoVersion"
 	// value is returned as the ancestor.
-	isConflict = true
 	var maxLevel uint64
 	for node, level := range info.graftNodes {
 		if maxLevel < level || (maxLevel == level && ancestor < node) {
@@ -559,6 +582,7 @@
 	// If the object has a head node, include it in the set of new heads.
 	if head, err := getHead(ctx, st, oid); err == nil {
 		info.newHeads[head] = true
+		info.oldHeadSnap = head
 	}
 
 	graft[oid] = info
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index 18df27b..e237277 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -314,6 +314,9 @@
 //
 // TODO(hpucha): Refactor this code with computeDelta code in sync_state.go.
 func (iSt *initiationState) createLocalGenVec(ctx *context.T) error {
+	iSt.sync.thLock.Lock()
+	defer iSt.sync.thLock.Unlock()
+
 	// Freeze the most recent batch of local changes before fetching
 	// remote changes from a peer. This frozen state is used by the
 	// responder when responding to GetDeltas RPC.
@@ -386,6 +389,9 @@
 // resolution during replay.  This avoids resolving conflicts that have already
 // been resolved by other devices.
 func (iSt *initiationState) recvAndProcessDeltas(ctx *context.T) error {
+	iSt.sync.thLock.Lock()
+	defer iSt.sync.thLock.Unlock()
+
 	// TODO(hpucha): This works for now, but figure out a long term solution
 	// as this may be implementation dependent. It currently works because
 	// the RecvStream call is stateless, and grabbing a handle to it
@@ -585,11 +591,10 @@
 			return err
 		}
 
-		if err := iSt.updateDbAndSyncSt(ctx); err != nil {
-			return err
+		err := iSt.updateDbAndSyncSt(ctx)
+		if err == nil {
+			err = iSt.tx.Commit()
 		}
-
-		err := iSt.tx.Commit()
 		if err == nil {
 			committed = true
 			// Update in-memory genvector since commit is successful.
@@ -599,11 +604,13 @@
 			vlog.VI(3).Info("sync: processUpdatedObjects: end: changes committed")
 			return nil
 		}
-
 		if verror.ErrorID(err) != store.ErrConcurrentTransaction.ID {
 			return err
 		}
 
+		// Either updateDbAndSyncSt() or tx.Commit() detected a
+		// concurrent transaction. Retry processing the remote updates.
+		//
 		// TODO(hpucha): Sleeping and retrying is a temporary
 		// solution. Next iteration will have coordination with watch
 		// thread to intelligently retry. Hence this value is not a
@@ -626,7 +633,11 @@
 		}
 
 		if !confSt.isConflict {
-			confSt.res = &conflictResolution{ty: pickRemote}
+			if confSt.newHead == confSt.oldHead {
+				confSt.res = &conflictResolution{ty: pickLocal}
+			} else {
+				confSt.res = &conflictResolution{ty: pickRemote}
+			}
 		} else {
 			count++
 		}
@@ -639,76 +650,78 @@
 func (iSt *initiationState) updateDbAndSyncSt(ctx *context.T) error {
 	for objid, confSt := range iSt.updObjects {
 		// If the local version is picked, no further updates to the
-		// Database are needed.
-		if confSt.res.ty == pickLocal {
-			continue
-		}
+		// Database are needed. If the remote version is picked or if a
+		// new version is created, we put it in the Database.
+		if confSt.res.ty != pickLocal {
 
-		// If the remote version is picked or if a new version is
-		// created, we put it in the Database.
-
-		// TODO(hpucha): Hack right now. Need to change Database's
-		// handling of deleted objects.
-		oldVersDeleted := true
-		if confSt.oldHead != NoVersion {
-			oldDagNode, err := getNode(ctx, iSt.tx, objid, confSt.oldHead)
-			if err != nil {
-				return err
+			// TODO(hpucha): Hack right now. Need to change Database's
+			// handling of deleted objects.
+			oldVersDeleted := true
+			if confSt.oldHead != NoVersion {
+				oldDagNode, err := getNode(ctx, iSt.tx, objid, confSt.oldHead)
+				if err != nil {
+					return err
+				}
+				oldVersDeleted = oldDagNode.Deleted
 			}
-			oldVersDeleted = oldDagNode.Deleted
-		}
 
-		var newVersion string
-		var newVersDeleted bool
-		switch confSt.res.ty {
-		case pickRemote:
-			newVersion = confSt.newHead
-			newDagNode, err := getNode(ctx, iSt.tx, objid, newVersion)
-			if err != nil {
-				return err
+			var newVersion string
+			var newVersDeleted bool
+			switch confSt.res.ty {
+			case pickRemote:
+				newVersion = confSt.newHead
+				newDagNode, err := getNode(ctx, iSt.tx, objid, newVersion)
+				if err != nil {
+					return err
+				}
+				newVersDeleted = newDagNode.Deleted
+			case createNew:
+				newVersion = confSt.res.rec.Metadata.CurVers
+				newVersDeleted = confSt.res.rec.Metadata.Delete
 			}
-			newVersDeleted = newDagNode.Deleted
-		case createNew:
-			newVersion = confSt.res.rec.Metadata.CurVers
-			newVersDeleted = confSt.res.rec.Metadata.Delete
-		}
 
-		// Skip delete followed by a delete.
-		if oldVersDeleted && newVersDeleted {
-			continue
-		}
+			// Skip delete followed by a delete.
+			if oldVersDeleted && newVersDeleted {
+				continue
+			}
 
-		if !oldVersDeleted {
-			// Read current version to enter it in the readset of the transaction.
-			version, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid))
-			if err != nil {
-				return err
+			if !oldVersDeleted {
+				// Read current version to enter it in the readset of the transaction.
+				version, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid))
+				if err != nil {
+					return err
+				}
+				if string(version) != confSt.oldHead {
+					vlog.VI(4).Infof("sync: updateDbAndSyncSt: concurrent updates %s %s", version, confSt.oldHead)
+					return store.NewErrConcurrentTransaction(ctx)
+				}
+			} else {
+				// Ensure key doesn't exist.
+				if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
+					return store.NewErrConcurrentTransaction(ctx)
+				}
 			}
-			if string(version) != confSt.oldHead {
-				return store.NewErrConcurrentTransaction(ctx)
-			}
-		} else {
-			// Ensure key doesn't exist.
-			if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
-				return store.NewErrConcurrentTransaction(ctx)
-			}
-		}
 
-		if !newVersDeleted {
-			if confSt.res.ty == createNew {
-				if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(objid), confSt.res.val, []byte(newVersion)); err != nil {
+			if !newVersDeleted {
+				if confSt.res.ty == createNew {
+					vlog.VI(4).Infof("sync: updateDbAndSyncSt: PutAtVersion %s %s", objid, newVersion)
+					if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(objid), confSt.res.val, []byte(newVersion)); err != nil {
+						return err
+					}
+				}
+				vlog.VI(4).Infof("sync: updateDbAndSyncSt: PutVersion %s %s", objid, newVersion)
+				if err := watchable.PutVersion(ctx, iSt.tx, []byte(objid), []byte(newVersion)); err != nil {
+					return err
+				}
+			} else {
+				vlog.VI(4).Infof("sync: updateDbAndSyncSt: Deleting obj %s", objid)
+				if err := iSt.tx.Delete([]byte(objid)); err != nil {
 					return err
 				}
 			}
-			if err := watchable.PutVersion(ctx, iSt.tx, []byte(objid), []byte(newVersion)); err != nil {
-				return err
-			}
-		} else {
-			if err := iSt.tx.Delete([]byte(objid)); err != nil {
-				return err
-			}
 		}
-
+		// Always update sync state irrespective of local/remote/new
+		// versions being picked.
 		if err := iSt.updateLogAndDag(ctx, objid); err != nil {
 			return err
 		}
@@ -774,6 +787,8 @@
 func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, obj, vers, par string) *localLogRec {
 	gen, pos := iSt.sync.reserveGenAndPosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
 
+	vlog.VI(4).Infof("sync: createLocalLinkLogRec: obj %s vers %s par %s", obj, vers, par)
+
 	rec := &localLogRec{
 		Metadata: interfaces.LogRecMetadata{
 			Id:      iSt.sync.id,
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index d0322d4..6b017ee 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -33,6 +33,22 @@
 	sv     interfaces.Service
 	server rpc.Server
 
+	// High-level lock to serialize the watcher and the initiator. This lock is
+	// needed to handle the following cases: (a) When the initiator is
+	// cutting a local generation, it waits for the watcher to commit the
+	// latest local changes before including them in the checkpoint. (b)
+	// When the initiator is receiving updates, it reads the latest head of
+	// an object as per the DAG state in order to construct the in-memory
+	// graft map used for conflict detection. At the same time, if a watcher
+	// is processing local updates, it may move the object head. Hence the
+	// initiator and watcher contend on the DAG head of an object. Instead
+	// of retrying a transaction which causes the entire delta to be
+	// replayed, we use pessimistic locking to serialize the initiator and
+	// the watcher.
+	//
+	// TODO(hpucha): This is a temporary hack.
+	thLock sync.RWMutex
+
 	// State to coordinate shutdown of spawned goroutines.
 	pending sync.WaitGroup
 	closed  chan struct{}
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 3b745a8..e70d0d2 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -84,6 +84,9 @@
 // from starving others.  A batch is stored as a contiguous set of log records
 // ending with one record having the "continued" flag set to false.
 func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) {
+	s.thLock.Lock()
+	defer s.thLock.Unlock()
+
 	vlog.VI(2).Infof("sync: processDatabase: begin: %s, %s", appName, dbName)
 	defer vlog.VI(2).Infof("sync: processDatabase: end: %s, %s", appName, dbName)