veyron/runtimes/google/vsync: Support conflict resolution via
blessings. Blessings are captured by adding dag links. So we also
provide a new type of log record (LinkRec) to communicate link
addition.

Change-Id: I1c4c6d8da7155df9a245a53e58f5d4f11659e1d6
diff --git a/runtimes/google/vsync/gc.go b/runtimes/google/vsync/gc.go
index 726868c..9dc8dd1 100644
--- a/runtimes/google/vsync/gc.go
+++ b/runtimes/google/vsync/gc.go
@@ -96,7 +96,9 @@
 	// strictCheck when enabled performs strict checking of every
 	// log record being deleted to confirm that it should be in
 	// fact deleted.
-	strictCheck = true
+	// TODO(hpucha): Support strictCheck in the presence
+	// of Link log records.
+	strictCheck = false
 
 	// Every compactCount iterations of garbage collection, kvdb
 	// is compacted.  This value has performance implications as
@@ -304,6 +306,12 @@
 			return err
 		}
 
+		if rec.RecType == LinkRec {
+			// For a link log record, gc it right away.
+			g.dagPruneCallBack(logRecKey(devid, gnum, l))
+			continue
+		}
+
 		// Insert the object in this log record to the prune
 		// map if needed.
 		// If this object does not exist, create it.
@@ -441,19 +449,21 @@
 		if err != nil {
 			return err
 		}
-		objHist, ok := g.verifyPruneMap[rec.ObjID]
-		if !ok {
-			return errors.New("obj not found in verifyMap")
-		}
-		_, found := objHist.versions[rec.CurVers]
-		// If online consistency check is in progress, we
-		// cannot strictly verify all the versions to be
-		// deleted, and we ignore the failure to find a
-		// version.
-		if found {
-			delete(objHist.versions, rec.CurVers)
-		} else if !g.checkConsistency {
-			return errors.New("verification failed")
+		if rec.RecType == NodeRec {
+			objHist, ok := g.verifyPruneMap[rec.ObjID]
+			if !ok {
+				return errors.New("obj not found in verifyMap")
+			}
+			_, found := objHist.versions[rec.CurVers]
+			// If online consistency check is in progress, we
+			// cannot strictly verify all the versions to be
+			// deleted, and we ignore the failure to find a
+			// version.
+			if found {
+				delete(objHist.versions, rec.CurVers)
+			} else if !g.checkConsistency {
+				return errors.New("verification failed")
+			}
 		}
 	}
 
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index feb92a8..930df89 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -45,6 +45,7 @@
 	"veyron/services/store/raw"
 
 	"veyron2/storage"
+	"veyron2/vlog"
 )
 
 var (
@@ -300,12 +301,13 @@
 	return l.gens.del(key)
 }
 
-// createLocalLogRec creates a new local log record.
+// createLocalLogRec creates a new local log record of type NodeRec.
 func (l *iLog) createLocalLogRec(obj storage.ID, vers storage.Version, par []storage.Version, val *LogValue) (*LogRec, error) {
 	rec := &LogRec{
-		DevID: l.s.id,
-		GNum:  l.head.Curgen,
-		LSN:   l.head.Curlsn,
+		DevID:   l.s.id,
+		GNum:    l.head.Curgen,
+		LSN:     l.head.Curlsn,
+		RecType: NodeRec,
 
 		ObjID:   obj,
 		CurVers: vers,
@@ -319,6 +321,25 @@
 	return rec, nil
 }
 
+// createLocalLinkLogRec creates a new local log record of type LinkRec.
+func (l *iLog) createLocalLinkLogRec(obj storage.ID, vers, par storage.Version) (*LogRec, error) {
+	rec := &LogRec{
+		DevID:   l.s.id,
+		GNum:    l.head.Curgen,
+		LSN:     l.head.Curlsn,
+		RecType: LinkRec,
+
+		ObjID:   obj,
+		CurVers: vers,
+		Parents: []storage.Version{par},
+	}
+
+	// Increment the LSN for the local log.
+	l.head.Curlsn++
+
+	return rec, nil
+}
+
 // createRemoteGeneration adds a new remote generation.
 func (l *iLog) createRemoteGeneration(dev DeviceID, gnum GenID, gen *genMetadata) error {
 	if l.db == nil {
@@ -357,6 +378,7 @@
 	}
 	err := l.putGenMetadata(l.s.id, g, val)
 
+	vlog.VI(2).Infof("createLocalGeneration:: created gen %d %v", g, val)
 	// Move to the next generation irrespective of err.
 	l.head.Curorder++
 	l.head.Curgen++
@@ -370,6 +392,8 @@
 	if l.db == nil {
 		return errInvalidLog
 	}
+
+	vlog.VI(2).Infof("processWatchRecord:: adding object %v %v", objID, vers)
 	// Check if the object version already exists in the DAG. if so return.
 	if l.s.dag.hasNode(objID, vers) {
 		return nil
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 67422d9..e0e84b7 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -191,6 +191,7 @@
 		return GenVector{}, err
 	}
 
+	vlog.VI(2).Infof("updateLocalGeneration:: created gen %d", gen)
 	// Update local generation vector in devTable.
 	if err = i.syncd.devtab.updateGeneration(i.syncd.id, i.syncd.id, gen); err != nil {
 		return GenVector{}, err
@@ -308,10 +309,16 @@
 	if err != nil {
 		return err
 	}
-	if err = i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey); err != nil {
-		return err
+
+	vlog.VI(2).Infof("insertRecInLogAndDag:: Adding log record %v", rec)
+	switch rec.RecType {
+	case NodeRec:
+		return i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey)
+	case LinkRec:
+		return i.syncd.dag.addParent(rec.ObjID, rec.CurVers, rec.Parents[0], true)
+	default:
+		return fmt.Errorf("unknown log record type")
 	}
-	return nil
 }
 
 // createGenMetadataBatch inserts a batch of generations into the log.
@@ -337,12 +344,31 @@
 
 // processUpdatedObjects processes all the updates received by the
 // initiator, one object at a time. For each updated object, we first
-// check if the object has any conflicts.  If there is a conflict, we
-// resolve the conflict and generate a new store mutation reflecting
-// the conflict resolution. If there is no conflict, we generate a
-// store mutation to simply update the store to the latest value. We
-// then put all these mutations in the store. If the put succeeds, we
-// update the log and dag state suitably (move the head ptr of the
+// check if the object has any conflicts, resulting in three
+// possibilities:
+//
+// * There is no conflict, and no updates are needed to the store
+// (isConflict=false, newHead == oldHead). All changes received convey
+// information that still keeps the local head as the most recent
+// version. This occurs when conflicts are resolved by blessings.
+//
+// * There is no conflict, but a remote version is discovered that
+// builds on the local head (isConflict=false, newHead != oldHead). In
+// this case, we generate a store mutation to simply update the store
+// to the latest value.
+//
+// * There is a conflict and we call into the app or the system to
+// resolve the conflict, resulting in three possibilties: (a) conflict
+// was resolved by blessing the local version. In this case, store
+// need not be updated, but a link is added to record the
+// blessing. (b) conflict was resolved by blessing the remote
+// version. In this case, store is updated with the remote version and
+// a link is added as well. (c) conflict was resolved by generating a
+// new store mutation. In this case, store is updated with the new
+// version.
+//
+// We then put all these mutations in the store. If the put succeeds,
+// we update the log and dag state suitably (move the head ptr of the
 // object in the dag to the latest version, and create a new log
 // record reflecting conflict resolution if any). Puts to store can
 // fail since preconditions on the objects may have been violated. In
@@ -355,12 +381,11 @@
 			return err
 		}
 
-		m, err := i.resolveConflicts()
-		if err != nil {
+		if err := i.resolveConflicts(); err != nil {
 			return err
 		}
 
-		err = i.updateStoreAndSync(ctx, m, local, minGens, remote, dID)
+		err := i.updateStoreAndSync(ctx, local, minGens, remote, dID)
 		if err == nil {
 			break
 		}
@@ -390,60 +415,28 @@
 		// Check if object has a conflict.
 		var err error
 		st.isConflict, st.newHead, st.oldHead, st.ancestor, err = i.syncd.dag.hasConflict(obj)
+		vlog.VI(2).Infof("detectConflicts:: object %v state %v err %v",
+			obj, st, err)
 		if err != nil {
 			return err
 		}
-		if !st.isConflict {
-			rec, err := i.getLogRec(obj, st.newHead)
-			if err != nil {
-				return err
-			}
-			st.resolvVal = &rec.Value
-			// Sanity check.
-			if st.resolvVal.Mutation.Version != st.newHead {
-				return fmt.Errorf("bad mutation %d %d",
-					st.resolvVal.Mutation.Version, st.newHead)
-			}
-		}
 	}
 	return nil
 }
 
-// getLogRec returns the log record corresponding to a given object and its version.
-func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
-	logKey, err := i.syncd.dag.getLogrec(obj, vers)
-	if err != nil {
-		return nil, err
-	}
-	dev, gen, lsn, err := splitLogRecKey(logKey)
-	if err != nil {
-		return nil, err
-	}
-	rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
-	if err != nil {
-		return nil, err
-	}
-	return rec, nil
-}
-
-// resolveConflicts resolves conflicts for updated objects.
-func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
+// resolveConflicts resolves conflicts for updated objects. Conflicts
+// may be resolved by adding new versions or blessing either the local
+// or the remote version.
+func (i *syncInitiator) resolveConflicts() error {
 	switch conflictResolutionPolicy {
 	case useTime:
 		if err := i.resolveConflictsByTime(); err != nil {
-			return nil, err
+			return err
 		}
 	default:
-		return nil, fmt.Errorf("unknown conflict resolution policy")
+		return fmt.Errorf("unknown conflict resolution policy")
 	}
-
-	var m []raw.Mutation
-	for _, st := range i.updObjects {
-		// Append to mutations.
-		st.resolvVal.Mutation.PriorVersion = st.oldHead
-		m = append(m, st.resolvVal.Mutation)
-	}
-	return m, nil
+	return nil
 }
 
 // resolveConflictsByTime resolves conflicts using the timestamps
@@ -482,11 +475,10 @@
 			res = 1
 		}
 
-		m := lrecs[res].Value.Mutation
-		m.Version = storage.NewVersion()
-
-		// TODO(hpucha): handle continue and delete flags.
-		st.resolvVal = &LogValue{Mutation: m}
+		// Instead of creating a new version that resolves the
+		// conflict, we are blessing an existing version as
+		// the conflict resolution.
+		st.resolvVal = &lrecs[res].Value
 	}
 
 	return nil
@@ -511,11 +503,38 @@
 
 // updateStoreAndSync updates the store, and if that is successful,
 // updates log and dag data structures.
-func (i *syncInitiator) updateStoreAndSync(ctx context.T, m []raw.Mutation, local, minGens, remote GenVector, dID DeviceID) error {
+func (i *syncInitiator) updateStoreAndSync(ctx context.T, local, minGens, remote GenVector, dID DeviceID) error {
 	// TODO(hpucha): Eliminate reaching into syncd's lock.
 	i.syncd.lock.Lock()
 	defer i.syncd.lock.Unlock()
 
+	var m []raw.Mutation
+	for obj, st := range i.updObjects {
+		if !st.isConflict {
+			rec, err := i.getLogRec(obj, st.newHead)
+			if err != nil {
+				return err
+			}
+			st.resolvVal = &rec.Value
+			// Sanity check.
+			if st.resolvVal.Mutation.Version != st.newHead {
+				return fmt.Errorf("bad mutation %d %d",
+					st.resolvVal.Mutation.Version, st.newHead)
+			}
+		}
+
+		// If the local version is picked, no further updates
+		// to the store are needed. If the remote version is
+		// picked, we put it in the store.
+		if st.resolvVal.Mutation.Version != st.oldHead {
+			// Append to mutations.
+			st.resolvVal.Mutation.PriorVersion = st.oldHead
+			vlog.VI(2).Infof("updateStoreAndSync:: appending mutation %v for obj %v",
+				st.resolvVal.Mutation, obj)
+			m = append(m, st.resolvVal.Mutation)
+		}
+	}
+
 	// TODO(hpucha): We will hold the lock across PutMutations rpc
 	// to prevent a race with watcher. The next iteration will
 	// clean up this coordination.
@@ -553,30 +572,68 @@
 	return nil
 }
 
+// getLogRec returns the log record corresponding to a given object and its version.
+func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
+	logKey, err := i.syncd.dag.getLogrec(obj, vers)
+	if err != nil {
+		return nil, err
+	}
+	dev, gen, lsn, err := splitLogRecKey(logKey)
+	if err != nil {
+		return nil, err
+	}
+	rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
+	if err != nil {
+		return nil, err
+	}
+	return rec, nil
+}
+
 // updateLogAndDag updates the log and dag data structures on a successful store put.
 func (i *syncInitiator) updateLogAndDag() error {
 	for obj, st := range i.updObjects {
 		if st.isConflict {
 			// Object had a conflict, which was resolved successfully.
 			// Put is successful, create a log record.
-			parents := []storage.Version{st.newHead, st.oldHead}
-			rec, err := i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
+			var err error
+			var rec *LogRec
+
+			switch {
+			case st.resolvVal.Mutation.Version == st.oldHead:
+				// Local version was blessed as the conflict resolution.
+				rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.oldHead, st.newHead)
+			case st.resolvVal.Mutation.Version == st.newHead:
+				// Remote version was blessed as the conflict resolution.
+				rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.newHead, st.oldHead)
+			default:
+				// New version was created to resolve the conflict.
+				parents := []storage.Version{st.newHead, st.oldHead}
+				rec, err = i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
+
+			}
 			if err != nil {
 				return err
 			}
-
 			logKey, err := i.syncd.log.putLogRec(rec)
 			if err != nil {
 				return err
 			}
-
-			// Put is successful, add a new DAG node.
-			if err = i.syncd.dag.addNode(obj, st.resolvVal.Mutation.Version, false, parents, logKey); err != nil {
+			// Add a new DAG node.
+			switch rec.RecType {
+			case NodeRec:
+				err = i.syncd.dag.addNode(obj, rec.CurVers, false, rec.Parents, logKey)
+			case LinkRec:
+				err = i.syncd.dag.addParent(obj, rec.CurVers, rec.Parents[0], false)
+			default:
+				return fmt.Errorf("unknown log record type")
+			}
+			if err != nil {
 				return err
 			}
 		}
 
-		// Move the head.
+		// Move the head. This should be idempotent. We may
+		// move head to the local head in some cases.
 		if err := i.syncd.dag.moveHead(obj, st.resolvVal.Mutation.Version); err != nil {
 			return err
 		}
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index 3c9a063..468c882 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -172,22 +172,27 @@
 			t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
 		}
 	}
-
 	if err := s.hdlInitiator.detectConflicts(); err != nil {
 		t.Fatalf("detectConflicts failed with err %v", err)
 	}
 	if len(s.hdlInitiator.updObjects) != 1 {
 		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 2 || st.oldHead != storage.NoVersion {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if len(m) != 1 || m[0].PriorVersion != storage.NoVersion || m[0].Version != 2 {
-		t.Errorf("Unexpected mutations %v", m)
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -270,15 +275,21 @@
 	if len(s.hdlInitiator.updObjects) != 1 {
 		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 2 || st.oldHead != storage.NoVersion {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if len(m) != 1 || m[0].PriorVersion != storage.NoVersion || m[0].Version != 2 {
-		t.Errorf("Unexpected mutations %v", m)
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -362,22 +373,27 @@
 			v = v + 1
 		}
 	}
-
 	if err := s.hdlInitiator.detectConflicts(); err != nil {
 		t.Fatalf("detectConflicts failed with err %v", err)
 	}
 	if len(s.hdlInitiator.updObjects) != 1 {
 		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 5 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if len(m) != 1 || m[0].PriorVersion != 2 || m[0].Version != 5 {
-		t.Errorf("Unexpected versions %v", m[0])
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -437,16 +453,14 @@
 		t.Errorf("Data mismatch for generation metadata: %v instead of %v",
 			curVal, expVal)
 	}
-
 	if err := s.hdlInitiator.detectConflicts(); err != nil {
 		t.Fatalf("detectConflicts failed with err %v", err)
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
 
 	objid, err := strToObjID("12345")
@@ -466,6 +480,9 @@
 			if curRec.ObjID != objid {
 				t.Errorf("Data mismatch in log record %v", curRec)
 			}
+			if devid == "VeyronTab" && index == 3 && curRec.RecType != LinkRec {
+				t.Errorf("Data mismatch in log record %v", curRec)
+			}
 			// Verify DAG state.
 			if _, err := s.dag.getNode(objid, v); err != nil {
 				t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
@@ -473,19 +490,25 @@
 			v = v + 1
 		}
 	}
-
-	if len(m) != 1 || len(s.hdlInitiator.updObjects) != 1 {
-		t.Errorf("Unexpected number of updated objects %d %d", len(m), len(s.hdlInitiator.updObjects))
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	if m[0].PriorVersion != 2 {
-		t.Errorf("Unexpected version %v", m[0])
+	st := s.hdlInitiator.updObjects[objid]
+	if !st.isConflict {
+		t.Errorf("Didn't detect a conflict %v", st)
+	}
+	if st.newHead != 5 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	// Curlsn == 4 for the log record that resolves conflict.
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 1 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
 	}
 	// Verify DAG state.
-	if head, err := s.dag.getHead(objid); err != nil || head <= 5 {
+	if head, err := s.dag.getHead(objid); err != nil || head != 5 {
 		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
 	}
 }
@@ -555,12 +578,11 @@
 	if err := s.hdlInitiator.detectConflicts(); err != nil {
 		t.Fatalf("detectConflicts failed with err %v", err)
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
 
 	objid, err := strToObjID("12345")
@@ -580,6 +602,9 @@
 			if curRec.ObjID != objid {
 				t.Errorf("Data mismatch in log record %v", curRec)
 			}
+			if devid == "VeyronTab" && index == 3 && curRec.RecType != LinkRec {
+				t.Errorf("Data mismatch in log record %v", curRec)
+			}
 			// Verify DAG state.
 			if _, err := s.dag.getNode(objid, v); err != nil {
 				t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
@@ -587,18 +612,357 @@
 			v = v + 1
 		}
 	}
-	if len(m) != 1 || len(s.hdlInitiator.updObjects) != 1 {
-		t.Errorf("Unexpected number of updated objects %d %d", len(m), len(s.hdlInitiator.updObjects))
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	if m[0].PriorVersion != 2 {
-		t.Errorf("Unexpected version %v", m[0])
+	st := s.hdlInitiator.updObjects[objid]
+	if !st.isConflict {
+		t.Errorf("Didn't detect a conflict %v", st)
+	}
+	if st.newHead != 5 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	// Curlsn == 4 for the log record that resolves conflict.
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 2 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
 	}
 	// Verify DAG state.
-	if head, err := s.dag.getHead(objid); err != nil || head <= 5 {
+	if head, err := s.dag.getHead(objid); err != nil || head != 5 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+}
+
+// TestInitiatorBlessNoConf0 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// unchanged at the end of replay. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-00.log.sync>.
+func TestInitiatorBlessNoConf0(t *testing.T) {
+	dir, err := createTempDir()
+	if err != nil {
+		t.Errorf("Could not create tempdir %v", err)
+	}
+	// Set a large value to prevent the threads from firing.
+	// Test is not thread safe.
+	peerSyncInterval = 1 * time.Hour
+	garbageCollectInterval = 1 * time.Hour
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+	defer s.Close()
+	defer os.RemoveAll(dir)
+
+	if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+		t.Error(err)
+	}
+	stream, err := createReplayStream("remote-noconf-link-00.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	var minGens GenVector
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	objid, err := strToObjID("12345")
+	if err != nil {
+		t.Errorf("Could not create objid %v", err)
+	}
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 2 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// No new log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 2 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+}
+
+// TestInitiatorBlessNoConf1 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// updated at the end of the replay. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-01.log.sync>.
+func TestInitiatorBlessNoConf1(t *testing.T) {
+	dir, err := createTempDir()
+	if err != nil {
+		t.Errorf("Could not create tempdir %v", err)
+	}
+	// Set a large value to prevent the threads from firing.
+	// Test is not thread safe.
+	peerSyncInterval = 1 * time.Hour
+	garbageCollectInterval = 1 * time.Hour
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+	defer s.Close()
+	defer os.RemoveAll(dir)
+
+	if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+		t.Error(err)
+	}
+	stream, err := createReplayStream("remote-noconf-link-01.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	var minGens GenVector
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	objid, err := strToObjID("12345")
+	if err != nil {
+		t.Errorf("Could not create objid %v", err)
+	}
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 3 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// No new log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 3 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+}
+
+// TestInitiatorBlessNoConf2 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// updated at the end of the first replay. In the second replay, a
+// conflict resolved locally is rediscovered since it was also
+// resolved remotely. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-02.log.sync,
+// remote-noconf-link-repeat.log.sync>.
+func TestInitiatorBlessNoConf2(t *testing.T) {
+	dir, err := createTempDir()
+	if err != nil {
+		t.Errorf("Could not create tempdir %v", err)
+	}
+	// Set a large value to prevent the threads from firing.
+	// Test is not thread safe.
+	peerSyncInterval = 1 * time.Hour
+	garbageCollectInterval = 1 * time.Hour
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+	defer s.Close()
+	defer os.RemoveAll(dir)
+
+	if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+		t.Error(err)
+	}
+	stream, err := createReplayStream("remote-noconf-link-02.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	var minGens GenVector
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	objid, err := strToObjID("12345")
+	if err != nil {
+		t.Errorf("Could not create objid %v", err)
+	}
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 4 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{"VeyronTab": 0}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 4 || st.resolvVal.Mutation.PriorVersion != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// No new log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 2 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+
+	// Test simultaneous conflict resolution.
+	stream, err = createReplayStream("remote-noconf-link-repeat.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	st = s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 4 || st.oldHead != 4 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronLaptop"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 4 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// No new log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 3 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+}
+
+// TestInitiatorBlessConf tests that a local and a remote log record
+// stream can be correctly applied, when the conflict is resolved by a
+// blessing. Commands are in files
+// testdata/<local-init-00.sync,remote-conf-link.log.sync>.
+func TestInitiatorBlessConf(t *testing.T) {
+	dir, err := createTempDir()
+	if err != nil {
+		t.Errorf("Could not create tempdir %v", err)
+	}
+	// Set a large value to prevent the threads from firing.
+	// Test is not thread safe.
+	peerSyncInterval = 1 * time.Hour
+	garbageCollectInterval = 1 * time.Hour
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+	defer s.Close()
+	defer os.RemoveAll(dir)
+
+	if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+		t.Error(err)
+	}
+	stream, err := createReplayStream("remote-conf-link.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	var minGens GenVector
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	objid, err := strToObjID("12345")
+	if err != nil {
+		t.Errorf("Could not create objid %v", err)
+	}
+	st := s.hdlInitiator.updObjects[objid]
+	if !st.isConflict {
+		t.Errorf("Didn't detect a conflict %v", st)
+	}
+	if st.newHead != 3 || st.oldHead != 2 || st.ancestor != 1 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 3 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// New log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 1 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	curRec, err := s.log.getLogRec(s.id, GenID(1), LSN(3))
+	if err != nil || curRec == nil {
+		t.Fatalf("GetLogRec() can not find object %s:1:3 in log file err %v",
+			s.id, err)
+	}
+	if curRec.ObjID != objid || curRec.RecType != LinkRec {
+		t.Errorf("Data mismatch in log record %v", curRec)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 3 {
 		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
 	}
 }
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 783d23f..bd9356c 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -78,7 +78,7 @@
 	for _, cmd := range cmds {
 		switch cmd.cmd {
 		case addLocal:
-			err = log.processWatchRecord(cmd.objID, cmd.version, cmd.parents, &LogValue{})
+			err = log.processWatchRecord(cmd.objID, cmd.version, cmd.parents, &LogValue{Mutation: raw.Mutation{Version: cmd.version}})
 			if err != nil {
 				return nil, fmt.Errorf("cannot replay local log records %d:%s err %v",
 					cmd.objID, cmd.version, err)
@@ -121,25 +121,31 @@
 
 	stream := newStream()
 	for _, cmd := range cmds {
+		id, gnum, lsn, err := splitLogRecKey(cmd.logrec)
+		if err != nil {
+			return nil, err
+		}
+		rec := LogRec{
+			DevID:   id,
+			GNum:    gnum,
+			LSN:     lsn,
+			ObjID:   cmd.objID,
+			CurVers: cmd.version,
+			Parents: cmd.parents,
+			Value: LogValue{
+				Mutation: raw.Mutation{Version: cmd.version},
+			},
+		}
+
 		switch cmd.cmd {
 		case addRemote:
-			id, gnum, lsn, err := splitLogRecKey(cmd.logrec)
-			if err != nil {
-				return nil, err
-			}
-			rec := LogRec{
-				DevID:   id,
-				GNum:    gnum,
-				LSN:     lsn,
-				ObjID:   cmd.objID,
-				CurVers: cmd.version,
-				Parents: cmd.parents,
-				Value: LogValue{
-					Mutation: raw.Mutation{Version: cmd.version},
-				},
-			}
-			stream.add(rec)
+			rec.RecType = NodeRec
+		case linkRemote:
+			rec.RecType = LinkRec
+		default:
+			return nil, err
 		}
+		stream.add(rec)
 	}
 
 	return stream, nil
diff --git a/runtimes/google/vsync/vsync.vdl b/runtimes/google/vsync/vsync.vdl
index 4de7c9c..aa06ec9 100644
--- a/runtimes/google/vsync/vsync.vdl
+++ b/runtimes/google/vsync/vsync.vdl
@@ -15,13 +15,21 @@
 // GenVector is the generation vector.
 type GenVector map[DeviceID]GenID
 
+const (
+      // NodeRec type log record adds a new node in the dag.
+      NodeRec = byte(0)
+      // LinkRec type log record adds a new link in the dag.
+      LinkRec = byte(1)
+)
+
 // LogRec represents a single log record that is exchanged between two
 // peers.
 //
 // It contains log related metadata: DevID is the id of the
 // device that created the log record, GNum is the ID of the
-// generation that the log record is part of, and LSN is the log
-// sequence number of the log record in the generation GNum.
+// generation that the log record is part of, LSN is the log
+// sequence number of the log record in the generation GNum, 
+// and RecType is the type of log record.
 //
 // It also contains information relevant to the updates to an object
 // in the store: ObjID is the id of the object that was
@@ -34,6 +42,7 @@
   DevID    DeviceID
   GNum     GenID
   LSN      LSN
+  RecType  byte
   // Object related information.
   ObjID    storage.ID
   CurVers  storage.Version
diff --git a/runtimes/google/vsync/vsync.vdl.go b/runtimes/google/vsync/vsync.vdl.go
index 2055734..e2120db 100644
--- a/runtimes/google/vsync/vsync.vdl.go
+++ b/runtimes/google/vsync/vsync.vdl.go
@@ -35,8 +35,9 @@
 //
 // It contains log related metadata: DevID is the id of the
 // device that created the log record, GNum is the ID of the
-// generation that the log record is part of, and LSN is the log
-// sequence number of the log record in the generation GNum.
+// generation that the log record is part of, LSN is the log
+// sequence number of the log record in the generation GNum,
+// and RecType is the type of log record.
 //
 // It also contains information relevant to the updates to an object
 // in the store: ObjID is the id of the object that was
@@ -46,9 +47,10 @@
 // the object mutation.
 type LogRec struct {
 	// Log related information.
-	DevID DeviceID
-	GNum  GenID
-	LSN   LSN
+	DevID   DeviceID
+	GNum    GenID
+	LSN     LSN
+	RecType byte
 	// Object related information.
 	ObjID   storage.ID
 	CurVers storage.Version
@@ -71,6 +73,14 @@
 	Continue bool
 }
 
+const (
+	// NodeRec type log record adds a new node in the dag.
+	NodeRec = byte(0)
+
+	// LinkRec type log record adds a new link in the dag.
+	LinkRec = byte(1)
+)
+
 // Sync allows a device to GetDeltas from another device.
 // Sync is the interface the client binds and uses.
 // Sync_ExcludingUniversal is the interface without internal framework-added methods
@@ -303,6 +313,7 @@
 				_gen_wiretype.FieldType{Type: 0x41, Name: "DevID"},
 				_gen_wiretype.FieldType{Type: 0x42, Name: "GNum"},
 				_gen_wiretype.FieldType{Type: 0x45, Name: "LSN"},
+				_gen_wiretype.FieldType{Type: 0x46, Name: "RecType"},
 				_gen_wiretype.FieldType{Type: 0x47, Name: "ObjID"},
 				_gen_wiretype.FieldType{Type: 0x48, Name: "CurVers"},
 				_gen_wiretype.FieldType{Type: 0x49, Name: "Parents"},