Change the default conflict resolution policy.

* The default conflict resolution policy now picks the most recent
  of the two conflicting mutations.  A UTC timestamp is added to
  each mutation when it reached syncd, and is used to select the
  winner of a conflict.  The mutation version numbers are now used
  as a tie-breaker when the two mutations have the same timestamp.

* Change the default peerSyncInterval to 100ms.  For testing, add
  a command-line arg to override this value.

Change-Id: Ie1263afb85cecf65873fb41054c424044401974b
diff --git a/runtimes/google/vsync/gc_test.go b/runtimes/google/vsync/gc_test.go
index b8a6d5f..fe425f4 100644
--- a/runtimes/google/vsync/gc_test.go
+++ b/runtimes/google/vsync/gc_test.go
@@ -16,7 +16,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -146,7 +146,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -241,7 +241,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -315,7 +315,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -437,7 +437,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -486,7 +486,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -541,7 +541,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -636,7 +636,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -712,7 +712,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -755,7 +755,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -789,7 +789,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -871,7 +871,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -961,7 +961,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -1021,7 +1021,7 @@
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
 	}
-	s := NewSyncd("", "", "A", dir, "")
+	s := NewSyncd("", "", "A", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 9b84904..5804e00 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -30,30 +30,26 @@
 
 // Policies for conflict resolution.
 const (
-	// Resolves conflicts by picking the mutation with the maximum version.
-	useVersion = iota
+	// Resolves conflicts by picking the mutation with the most recent timestamp.
+	useTime = iota
 
 	// TODO(hpucha): implement other policies.
 	// Resolves conflicts by using the app conflict resolver callbacks via store.
 	useCallback
-
-	// Resolves conflicts by comparing timestamps on logrecords.
-	useTime
 )
 
 var (
 	// peerSyncInterval is the duration between two consecutive
 	// sync events.  In every sync event, the initiator contacts
 	// one of its peers to obtain any pending updates.
-	peerSyncInterval = 10 * time.Second
+	peerSyncInterval = 100 * time.Millisecond
 
 	// peerSelectionPolicy is the policy used to select a peer when
 	// the initiator gets a chance to sync.
 	peerSelectionPolicy = selectRandom
 
-	// conflictResolutionPolicy is the policy used to resolve
-	// conflicts.
-	conflictResolutionPolicy = useVersion
+	// conflictResolutionPolicy is the policy used to resolve conflicts.
+	conflictResolutionPolicy = useTime
 
 	errNoUsefulPeer = errors.New("no useful peer to contact")
 )
@@ -83,7 +79,7 @@
 }
 
 // newInitiator creates a new initiator instance attached to the given syncd instance.
-func newInitiator(syncd *syncd, peerEndpoints, peerDeviceIDs string) *syncInitiator {
+func newInitiator(syncd *syncd, peerEndpoints, peerDeviceIDs string, syncTick time.Duration) *syncInitiator {
 	i := &syncInitiator{syncd: syncd,
 		updObjects: make(map[storage.ID]*objConflictState),
 	}
@@ -107,9 +103,15 @@
 		}
 	}
 
+	// Override the default peerSyncInterval value if syncTick is specified.
+	if syncTick > 0 {
+		peerSyncInterval = syncTick
+	}
+
 	vlog.VI(1).Infof("newInitiator: My device ID: %s", i.syncd.id)
 	vlog.VI(1).Infof("newInitiator: Peer endpoints: %v", i.neighbors)
 	vlog.VI(1).Infof("newInitiator: Peer IDs: %v", i.neighborIDs)
+	vlog.VI(1).Infof("newInitiator: Sync interval: %v", peerSyncInterval)
 
 	return i
 }
@@ -422,8 +424,8 @@
 // resolveConflicts resolves conflicts for updated objects.
 func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
 	switch conflictResolutionPolicy {
-	case useVersion:
-		if err := i.resolveConflictsByVersion(); err != nil {
+	case useTime:
+		if err := i.resolveConflictsByTime(); err != nil {
 			return nil, err
 		}
 	default:
@@ -439,13 +441,15 @@
 	return m, nil
 }
 
-// resolveConflictsByVersion resolves conflicts using version numbers
-// of the conflicting mutations. Picks a mutation with the larger
-// version number.
+// resolveConflictsByTime resolves conflicts using the timestamps
+// of the conflicting mutations.  It picks a mutation with the larger
+// timestamp, i.e. the most recent update.  If the timestamps are equal,
+// it uses the mutation version numbers as a tie-breaker, picking the
+// mutation with the larger version.
 //
 // TODO(hpucha): Based on a few more policies, reconsider nesting
 // order of the conflict resolution loop and switch-on-policy.
-func (i *syncInitiator) resolveConflictsByVersion() error {
+func (i *syncInitiator) resolveConflictsByTime() error {
 	for obj, st := range i.updObjects {
 		if !st.isConflict {
 			continue
@@ -460,12 +464,20 @@
 		if err != nil {
 			return err
 		}
-		var m raw.Mutation
-		if lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version {
-			m = lrecs[0].Value.Mutation
-		} else {
-			m = lrecs[1].Value.Mutation
+
+		res := 0
+		switch {
+		case lrecs[0].Value.SyncTime > lrecs[1].Value.SyncTime:
+			res = 0
+		case lrecs[0].Value.SyncTime < lrecs[1].Value.SyncTime:
+			res = 1
+		case lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version:
+			res = 0
+		case lrecs[0].Value.Mutation.Version < lrecs[1].Value.Mutation.Version:
+			res = 1
 		}
+
+		m := lrecs[res].Value.Mutation
 		m.Version = storage.NewVersion()
 
 		// TODO(hpucha): handle continue and delete flags.
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index c73e0f1..3c9a063 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -21,7 +21,7 @@
 	// Set a large value to prevent the threads from firing.
 	peerSyncInterval = 1 * time.Hour
 	garbageCollectInterval = 1 * time.Hour
-	s := NewSyncd("", "", "VeyronTab", dir, "")
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
 
 	s.lock.Lock()
 	defer s.lock.Unlock()
@@ -61,8 +61,8 @@
 	}
 }
 
-// TestResolveConflictByVersion tests the version based conflict resolution policy.
-func TestResolveConflictByVersion(t *testing.T) {
+// TestResolveConflictByTime tests the timestamp-based conflict resolution policy.
+func TestResolveConflictByTime(t *testing.T) {
 	dir, err := createTempDir()
 	if err != nil {
 		t.Errorf("Could not create tempdir %v", err)
@@ -71,7 +71,7 @@
 	// Test is not thread safe.
 	peerSyncInterval = 1 * time.Hour
 	garbageCollectInterval = 1 * time.Hour
-	s := NewSyncd("", "", "VeyronTab", dir, "")
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -91,7 +91,7 @@
 			LSN:     LSN(100 + v),
 			ObjID:   objID,
 			CurVers: v,
-			Value:   LogValue{Mutation: raw.Mutation{Version: v, PriorVersion: 500 + v}},
+			Value:   LogValue{Mutation: raw.Mutation{Version: v, PriorVersion: 500 + v}, SyncTime: int64(v)},
 		}
 		logKey, err := s.log.putLogRec(expRec)
 		if err != nil {
@@ -102,8 +102,8 @@
 		}
 	}
 
-	if err := s.hdlInitiator.resolveConflictsByVersion(); err != nil {
-		t.Errorf("ResolveConflictsByVersion failed with err %v", err)
+	if err := s.hdlInitiator.resolveConflictsByTime(); err != nil {
+		t.Errorf("ResolveConflictsByTime failed with err %v", err)
 	}
 	if s.hdlInitiator.updObjects[objID].resolvVal.Mutation.PriorVersion != 540 {
 		t.Errorf("Data mismatch for resolution %v", s.hdlInitiator.updObjects[objID].resolvVal)
@@ -121,7 +121,7 @@
 	// Test is not thread safe.
 	peerSyncInterval = 1 * time.Hour
 	garbageCollectInterval = 1 * time.Hour
-	s := NewSyncd("", "", "VeyronTab", dir, "")
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -211,7 +211,7 @@
 	// Test is not thread safe.
 	peerSyncInterval = 1 * time.Hour
 	garbageCollectInterval = 1 * time.Hour
-	s := NewSyncd("", "", "VeyronTab", dir, "")
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -302,7 +302,7 @@
 	// Test is not thread safe.
 	peerSyncInterval = 1 * time.Hour
 	garbageCollectInterval = 1 * time.Hour
-	s := NewSyncd("", "", "VeyronTab", dir, "")
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -400,8 +400,8 @@
 	// Test is not thread safe.
 	peerSyncInterval = 1 * time.Hour
 	garbageCollectInterval = 1 * time.Hour
-	conflictResolutionPolicy = useVersion
-	s := NewSyncd("", "", "VeyronTab", dir, "")
+	conflictResolutionPolicy = useTime
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
@@ -502,8 +502,8 @@
 	// Test is not thread safe.
 	peerSyncInterval = 1 * time.Hour
 	garbageCollectInterval = 1 * time.Hour
-	conflictResolutionPolicy = useVersion
-	s := NewSyncd("", "", "VeyronTab", dir, "")
+	conflictResolutionPolicy = useTime
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
 
 	defer s.Close()
 	defer os.RemoveAll(dir)
diff --git a/runtimes/google/vsync/vsync.idl b/runtimes/google/vsync/vsync.idl
index 19f206e..4de7c9c 100644
--- a/runtimes/google/vsync/vsync.idl
+++ b/runtimes/google/vsync/vsync.idl
@@ -45,6 +45,8 @@
 type LogValue struct {
   // Mutation is the store mutation representing the change in the object.
   Mutation raw.Mutation
+  // SyncTime is the timestamp of the mutation when it arrives at the Sync server.
+  SyncTime int64
   // Delete indicates whether the mutation resulted in the object being
   // deleted from the store.
   Delete   bool
diff --git a/runtimes/google/vsync/vsync.idl.go b/runtimes/google/vsync/vsync.idl.go
index a1958a5..fdbc33e 100644
--- a/runtimes/google/vsync/vsync.idl.go
+++ b/runtimes/google/vsync/vsync.idl.go
@@ -33,6 +33,8 @@
 type LogValue struct {
 	// Mutation is the store mutation representing the change in the object.
 	Mutation raw.Mutation
+	// SyncTime is the timestamp of the mutation when it arrives at the Sync server.
+	SyncTime int64
 	// Delete indicates whether the mutation resulted in the object being
 	// deleted from the store.
 	Delete bool
@@ -270,6 +272,7 @@
 		_gen_wiretype.StructType{
 			[]_gen_wiretype.FieldType{
 				_gen_wiretype.FieldType{Type: 0x51, Name: "Mutation"},
+				_gen_wiretype.FieldType{Type: 0x25, Name: "SyncTime"},
 				_gen_wiretype.FieldType{Type: 0x2, Name: "Delete"},
 				_gen_wiretype.FieldType{Type: 0x2, Name: "Continue"},
 			},
diff --git a/runtimes/google/vsync/vsyncd.go b/runtimes/google/vsync/vsyncd.go
index 375581b..435f55f 100644
--- a/runtimes/google/vsync/vsyncd.go
+++ b/runtimes/google/vsync/vsyncd.go
@@ -9,6 +9,7 @@
 // log records to get in sync with the sender.
 import (
 	"sync"
+	"time"
 
 	"veyron/services/store/raw"
 
@@ -63,7 +64,7 @@
 // sync.RWMutex. The spec says that the writers cannot be starved by
 // the readers but it does not guarantee FIFO. We may have to revisit
 // this in the future.
-func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string) *syncd {
+func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string, syncTick time.Duration) *syncd {
 	// Connect to the local Veyron store.
 	// At present this is optional to allow testing (from the command-line) w/o Veyron store running.
 	// TODO: connecting to Veyron store should be mandatory.
@@ -76,14 +77,15 @@
 		st = vs
 	}
 
-	return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint, st)
+	return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint, st, syncTick)
 }
 
 // newSyncdCore is the internal function that creates the Syncd
 // structure and initilizes its thread (goroutines).  It takes a
 // Veyron Store parameter to separate the core of Syncd setup from the
 // external dependency on Veyron Store.
-func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string, store storage.Store) *syncd {
+func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string,
+	store storage.Store, syncTick time.Duration) *syncd {
 	s := &syncd{}
 
 	// Bootstrap my own DeviceID.
@@ -121,7 +123,7 @@
 	s.pending.Add(3)
 
 	// Get deltas every peerSyncInterval.
-	s.hdlInitiator = newInitiator(s, peerEndpoints, peerDeviceIDs)
+	s.hdlInitiator = newInitiator(s, peerEndpoints, peerDeviceIDs, syncTick)
 	go s.hdlInitiator.contactPeers()
 
 	// Garbage collect every garbageCollectInterval.
diff --git a/runtimes/google/vsync/vsyncd/main.go b/runtimes/google/vsync/vsyncd/main.go
index e8204c4..fd12a7a 100644
--- a/runtimes/google/vsync/vsyncd/main.go
+++ b/runtimes/google/vsync/vsyncd/main.go
@@ -20,6 +20,7 @@
 	vstoreEndpoint := flag.String("vstore", "", "endpoint of the local Veyron store")
 	// TODO(rthellend): Remove the address flag when the config manager is working.
 	address := flag.String("address", ":0", "address to listen on")
+	syncTick := flag.Duration("synctick", 0, "clock tick duration for sync with a peer (e.g. 10s)")
 
 	flag.Parse()
 	if *devid == "" {
@@ -37,7 +38,7 @@
 	}
 
 	// Register the "sync" prefix with the sync dispatcher.
-	syncd := vsync.NewSyncd(*peerEndpoints, *peerDeviceIDs, *devid, *storePath, *vstoreEndpoint)
+	syncd := vsync.NewSyncd(*peerEndpoints, *peerDeviceIDs, *devid, *storePath, *vstoreEndpoint, *syncTick)
 	serverSync := vsync.NewServerSync(syncd)
 	if err := s.Register("sync", ipc.SoloDispatcher(serverSync, nil)); err != nil {
 		vlog.Fatalf("syncd:: error registering service: err %v", err)
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index 0617640..5cbf0bf 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -130,7 +130,10 @@
 			return
 		}
 
-		if err = w.processChanges(changes); err != nil {
+		// Timestamp of these changes arriving at the Sync server.
+		syncTime := time.Now().UnixNano()
+
+		if err = w.processChanges(changes, syncTime); err != nil {
 			// TODO(rdaoud): don't crash, instead add retry policies to attempt some degree of
 			// self-healing from a data corruption where feasible, otherwise quarantine this device
 			// from the cluster and stop Syncd to avoid propagating data corruptions.
@@ -141,7 +144,7 @@
 
 // processChanges applies the batch of changes (object mutations) received from the Watch API.
 // The function grabs the write-lock to access the Log and DAG DBs.
-func (w *syncWatcher) processChanges(changes watch.ChangeBatch) error {
+func (w *syncWatcher) processChanges(changes watch.ChangeBatch, syncTime int64) error {
 	w.syncd.lock.Lock()
 	defer w.syncd.lock.Unlock()
 
@@ -159,7 +162,7 @@
 			return fmt.Errorf("invalid change value, not a mutation: %#v", ch)
 		}
 
-		val := &LogValue{Mutation: *mu, Delete: ch.State == watch.DoesNotExist, Continue: ch.Continued}
+		val := &LogValue{Mutation: *mu, SyncTime: syncTime, Delete: ch.State == watch.DoesNotExist, Continue: ch.Continued}
 		var parents []storage.Version
 		if mu.PriorVersion != storage.NoVersion {
 			parents = []storage.Version{mu.PriorVersion}
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 01ae25a..45be727 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -269,9 +269,9 @@
 func fakeSyncd(t *testing.T, storeDir string, withStore bool) *syncd {
 	var s *syncd
 	if withStore {
-		s = newSyncdCore("", "", "fake-dev", storeDir, "", &fakeVStore{})
+		s = newSyncdCore("", "", "fake-dev", storeDir, "", &fakeVStore{}, 0)
 	} else {
-		s = newSyncdCore("", "", "fake-dev", storeDir, "", nil)
+		s = newSyncdCore("", "", "fake-dev", storeDir, "", nil, 0)
 	}
 	if s == nil {
 		t.Fatal("cannot create a Sync server")