syncbase/vsync: conflict w/o ancestor

Change the initiator's conflict resolver to support the case of conflict
between two versions that have no common ancestor.  This happens when
two Syncbases create the same object (key) before they synchronize.

Change-Id: Ic7c426507d59916dd07c122eef78486ea49705ba
diff --git a/x/ref/services/syncbase/vsync/conflict_resolution.go b/x/ref/services/syncbase/vsync/conflict_resolution.go
index 39d63f1..a8e41f6 100644
--- a/x/ref/services/syncbase/vsync/conflict_resolution.go
+++ b/x/ref/services/syncbase/vsync/conflict_resolution.go
@@ -76,10 +76,19 @@
 		return nil, err
 	}
 
+	// The local and remote records must exist, however it is valid for the
+	// common ancestor to not exist.  This happens when two Syncbases create
+	// separately their first versions for the same object (key).
+	locRec, remRec, ancRec := lrecs[0], lrecs[1], lrecs[2]
+	if locRec == nil || remRec == nil {
+		vlog.Fatalf("sync: resolveObjConflict: oid %s: invalid local (%s: %v) or remote recs (%s: %v)",
+			oid, local, locRec, remote, remRec)
+	}
+
 	// Resolve the conflict according to the resolution policy.
 	switch conflictResolutionPolicy {
 	case useTime:
-		return iSt.resolveObjConflictByTime(ctx, oid, lrecs[0], lrecs[1], lrecs[2])
+		return iSt.resolveObjConflictByTime(ctx, oid, locRec, remRec, ancRec)
 	default:
 		return nil, verror.New(verror.ErrInternal, ctx, "unknown conflict resolution policy", conflictResolutionPolicy)
 	}
@@ -113,6 +122,11 @@
 func (iSt *initiationState) getLogRecsBatch(ctx *context.T, obj string, versions []string) ([]*localLogRec, error) {
 	lrecs := make([]*localLogRec, len(versions))
 	for p, v := range versions {
+		if v == NoVersion {
+			lrecs[p] = nil
+			continue
+		}
+
 		logKey, err := getLogRecKey(ctx, iSt.tx, obj, v)
 		if err != nil {
 			return nil, err
diff --git a/x/ref/services/syncbase/vsync/initiator_test.go b/x/ref/services/syncbase/vsync/initiator_test.go
index bb62d6e..c3f2538 100644
--- a/x/ref/services/syncbase/vsync/initiator_test.go
+++ b/x/ref/services/syncbase/vsync/initiator_test.go
@@ -203,6 +203,91 @@
 	tx.Abort()
 }
 
+// TestLogStreamConflict tests that a local and a remote log stream can be
+// correctly applied when there are conflicts. Commands are in files
+// testdata/<local-init-00.log.sync,remote-conf-00.log.sync>.
+func TestLogStreamConflict(t *testing.T) {
+	svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-00.log.sync")
+	defer cleanup(t, svc)
+
+	objid := util.JoinKeyParts(util.RowPrefix, "foo1")
+
+	// Verify conflict state.
+	if len(iSt.updObjects) != 1 {
+		t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
+	}
+	st := iSt.updObjects[objid]
+	if !st.isConflict {
+		t.Fatalf("Didn't detect a conflict %v", st)
+	}
+	if st.newHead != "6" || st.oldHead != "3" || st.ancestor != "2" {
+		t.Fatalf("Conflict detection didn't succeed %v", st)
+	}
+	if st.res.ty != pickRemote {
+		t.Fatalf("Conflict resolution did not pick remote: %v", st.res.ty)
+	}
+
+	// Verify DAG state.
+	if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
+		t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
+	}
+
+	// Verify Database state.
+	valbuf, err := svc.St().Get([]byte(objid), nil)
+	if err != nil || string(valbuf) != "abc" {
+		t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+	}
+	tx := svc.St().NewTransaction()
+	versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
+	if err != nil || string(versbuf) != "6" {
+		t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
+	}
+	tx.Abort()
+}
+
+// TestLogStreamConflictNoAncestor tests that a local and a remote log stream
+// can be correctly applied when there are conflicts from the start where the
+// two versions of an object have no common ancestor. Commands are in files
+// testdata/<local-init-00.log.sync,remote-conf-03.log.sync>.
+func TestLogStreamConflictNoAncestor(t *testing.T) {
+	svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-03.log.sync")
+	defer cleanup(t, svc)
+
+	objid := util.JoinKeyParts(util.RowPrefix, "foo1")
+
+	// Verify conflict state.
+	if len(iSt.updObjects) != 1 {
+		t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
+	}
+	st := iSt.updObjects[objid]
+	if !st.isConflict {
+		t.Fatalf("Didn't detect a conflict %v", st)
+	}
+	if st.newHead != "6" || st.oldHead != "3" || st.ancestor != "" {
+		t.Fatalf("Conflict detection didn't succeed %v", st)
+	}
+	if st.res.ty != pickRemote {
+		t.Fatalf("Conflict resolution did not pick remote: %v", st.res.ty)
+	}
+
+	// Verify DAG state.
+	if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
+		t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
+	}
+
+	// Verify Database state.
+	valbuf, err := svc.St().Get([]byte(objid), nil)
+	if err != nil || string(valbuf) != "abc" {
+		t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+	}
+	tx := svc.St().NewTransaction()
+	versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
+	if err != nil || string(versbuf) != "6" {
+		t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
+	}
+	tx.Abort()
+}
+
 //////////////////////////////
 // Helpers.