Change the default conflict resolution policy.
* The default conflict resolution policy now picks the most recent
of the two conflicting mutations. A UTC timestamp is added to
each mutation when it reached syncd, and is used to select the
winner of a conflict. The mutation version numbers are now used
as a tie-breaker when the two mutations have the same timestamp.
* Change the default peerSyncInterval to 100ms. For testing, add
a command-line arg to override this value.
Change-Id: Ie1263afb85cecf65873fb41054c424044401974b
diff --git a/runtimes/google/vsync/gc_test.go b/runtimes/google/vsync/gc_test.go
index b8a6d5f..fe425f4 100644
--- a/runtimes/google/vsync/gc_test.go
+++ b/runtimes/google/vsync/gc_test.go
@@ -16,7 +16,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -146,7 +146,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -241,7 +241,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -315,7 +315,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -437,7 +437,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -486,7 +486,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -541,7 +541,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -636,7 +636,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -712,7 +712,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -755,7 +755,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -789,7 +789,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -871,7 +871,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -961,7 +961,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -1021,7 +1021,7 @@
if err != nil {
t.Errorf("Could not create tempdir %v", err)
}
- s := NewSyncd("", "", "A", dir, "")
+ s := NewSyncd("", "", "A", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 9b84904..5804e00 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -30,30 +30,26 @@
// Policies for conflict resolution.
const (
- // Resolves conflicts by picking the mutation with the maximum version.
- useVersion = iota
+ // Resolves conflicts by picking the mutation with the most recent timestamp.
+ useTime = iota
// TODO(hpucha): implement other policies.
// Resolves conflicts by using the app conflict resolver callbacks via store.
useCallback
-
- // Resolves conflicts by comparing timestamps on logrecords.
- useTime
)
var (
// peerSyncInterval is the duration between two consecutive
// sync events. In every sync event, the initiator contacts
// one of its peers to obtain any pending updates.
- peerSyncInterval = 10 * time.Second
+ peerSyncInterval = 100 * time.Millisecond
// peerSelectionPolicy is the policy used to select a peer when
// the initiator gets a chance to sync.
peerSelectionPolicy = selectRandom
- // conflictResolutionPolicy is the policy used to resolve
- // conflicts.
- conflictResolutionPolicy = useVersion
+ // conflictResolutionPolicy is the policy used to resolve conflicts.
+ conflictResolutionPolicy = useTime
errNoUsefulPeer = errors.New("no useful peer to contact")
)
@@ -83,7 +79,7 @@
}
// newInitiator creates a new initiator instance attached to the given syncd instance.
-func newInitiator(syncd *syncd, peerEndpoints, peerDeviceIDs string) *syncInitiator {
+func newInitiator(syncd *syncd, peerEndpoints, peerDeviceIDs string, syncTick time.Duration) *syncInitiator {
i := &syncInitiator{syncd: syncd,
updObjects: make(map[storage.ID]*objConflictState),
}
@@ -107,9 +103,15 @@
}
}
+ // Override the default peerSyncInterval value if syncTick is specified.
+ if syncTick > 0 {
+ peerSyncInterval = syncTick
+ }
+
vlog.VI(1).Infof("newInitiator: My device ID: %s", i.syncd.id)
vlog.VI(1).Infof("newInitiator: Peer endpoints: %v", i.neighbors)
vlog.VI(1).Infof("newInitiator: Peer IDs: %v", i.neighborIDs)
+ vlog.VI(1).Infof("newInitiator: Sync interval: %v", peerSyncInterval)
return i
}
@@ -422,8 +424,8 @@
// resolveConflicts resolves conflicts for updated objects.
func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
switch conflictResolutionPolicy {
- case useVersion:
- if err := i.resolveConflictsByVersion(); err != nil {
+ case useTime:
+ if err := i.resolveConflictsByTime(); err != nil {
return nil, err
}
default:
@@ -439,13 +441,15 @@
return m, nil
}
-// resolveConflictsByVersion resolves conflicts using version numbers
-// of the conflicting mutations. Picks a mutation with the larger
-// version number.
+// resolveConflictsByTime resolves conflicts using the timestamps
+// of the conflicting mutations. It picks a mutation with the larger
+// timestamp, i.e. the most recent update. If the timestamps are equal,
+// it uses the mutation version numbers as a tie-breaker, picking the
+// mutation with the larger version.
//
// TODO(hpucha): Based on a few more policies, reconsider nesting
// order of the conflict resolution loop and switch-on-policy.
-func (i *syncInitiator) resolveConflictsByVersion() error {
+func (i *syncInitiator) resolveConflictsByTime() error {
for obj, st := range i.updObjects {
if !st.isConflict {
continue
@@ -460,12 +464,20 @@
if err != nil {
return err
}
- var m raw.Mutation
- if lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version {
- m = lrecs[0].Value.Mutation
- } else {
- m = lrecs[1].Value.Mutation
+
+ res := 0
+ switch {
+ case lrecs[0].Value.SyncTime > lrecs[1].Value.SyncTime:
+ res = 0
+ case lrecs[0].Value.SyncTime < lrecs[1].Value.SyncTime:
+ res = 1
+ case lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version:
+ res = 0
+ case lrecs[0].Value.Mutation.Version < lrecs[1].Value.Mutation.Version:
+ res = 1
}
+
+ m := lrecs[res].Value.Mutation
m.Version = storage.NewVersion()
// TODO(hpucha): handle continue and delete flags.
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index c73e0f1..3c9a063 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -21,7 +21,7 @@
// Set a large value to prevent the threads from firing.
peerSyncInterval = 1 * time.Hour
garbageCollectInterval = 1 * time.Hour
- s := NewSyncd("", "", "VeyronTab", dir, "")
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
s.lock.Lock()
defer s.lock.Unlock()
@@ -61,8 +61,8 @@
}
}
-// TestResolveConflictByVersion tests the version based conflict resolution policy.
-func TestResolveConflictByVersion(t *testing.T) {
+// TestResolveConflictByTime tests the timestamp-based conflict resolution policy.
+func TestResolveConflictByTime(t *testing.T) {
dir, err := createTempDir()
if err != nil {
t.Errorf("Could not create tempdir %v", err)
@@ -71,7 +71,7 @@
// Test is not thread safe.
peerSyncInterval = 1 * time.Hour
garbageCollectInterval = 1 * time.Hour
- s := NewSyncd("", "", "VeyronTab", dir, "")
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -91,7 +91,7 @@
LSN: LSN(100 + v),
ObjID: objID,
CurVers: v,
- Value: LogValue{Mutation: raw.Mutation{Version: v, PriorVersion: 500 + v}},
+ Value: LogValue{Mutation: raw.Mutation{Version: v, PriorVersion: 500 + v}, SyncTime: int64(v)},
}
logKey, err := s.log.putLogRec(expRec)
if err != nil {
@@ -102,8 +102,8 @@
}
}
- if err := s.hdlInitiator.resolveConflictsByVersion(); err != nil {
- t.Errorf("ResolveConflictsByVersion failed with err %v", err)
+ if err := s.hdlInitiator.resolveConflictsByTime(); err != nil {
+ t.Errorf("ResolveConflictsByTime failed with err %v", err)
}
if s.hdlInitiator.updObjects[objID].resolvVal.Mutation.PriorVersion != 540 {
t.Errorf("Data mismatch for resolution %v", s.hdlInitiator.updObjects[objID].resolvVal)
@@ -121,7 +121,7 @@
// Test is not thread safe.
peerSyncInterval = 1 * time.Hour
garbageCollectInterval = 1 * time.Hour
- s := NewSyncd("", "", "VeyronTab", dir, "")
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -211,7 +211,7 @@
// Test is not thread safe.
peerSyncInterval = 1 * time.Hour
garbageCollectInterval = 1 * time.Hour
- s := NewSyncd("", "", "VeyronTab", dir, "")
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -302,7 +302,7 @@
// Test is not thread safe.
peerSyncInterval = 1 * time.Hour
garbageCollectInterval = 1 * time.Hour
- s := NewSyncd("", "", "VeyronTab", dir, "")
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -400,8 +400,8 @@
// Test is not thread safe.
peerSyncInterval = 1 * time.Hour
garbageCollectInterval = 1 * time.Hour
- conflictResolutionPolicy = useVersion
- s := NewSyncd("", "", "VeyronTab", dir, "")
+ conflictResolutionPolicy = useTime
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
@@ -502,8 +502,8 @@
// Test is not thread safe.
peerSyncInterval = 1 * time.Hour
garbageCollectInterval = 1 * time.Hour
- conflictResolutionPolicy = useVersion
- s := NewSyncd("", "", "VeyronTab", dir, "")
+ conflictResolutionPolicy = useTime
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
defer s.Close()
defer os.RemoveAll(dir)
diff --git a/runtimes/google/vsync/vsync.idl b/runtimes/google/vsync/vsync.idl
index 19f206e..4de7c9c 100644
--- a/runtimes/google/vsync/vsync.idl
+++ b/runtimes/google/vsync/vsync.idl
@@ -45,6 +45,8 @@
type LogValue struct {
// Mutation is the store mutation representing the change in the object.
Mutation raw.Mutation
+ // SyncTime is the timestamp of the mutation when it arrives at the Sync server.
+ SyncTime int64
// Delete indicates whether the mutation resulted in the object being
// deleted from the store.
Delete bool
diff --git a/runtimes/google/vsync/vsync.idl.go b/runtimes/google/vsync/vsync.idl.go
index a1958a5..fdbc33e 100644
--- a/runtimes/google/vsync/vsync.idl.go
+++ b/runtimes/google/vsync/vsync.idl.go
@@ -33,6 +33,8 @@
type LogValue struct {
// Mutation is the store mutation representing the change in the object.
Mutation raw.Mutation
+ // SyncTime is the timestamp of the mutation when it arrives at the Sync server.
+ SyncTime int64
// Delete indicates whether the mutation resulted in the object being
// deleted from the store.
Delete bool
@@ -270,6 +272,7 @@
_gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
_gen_wiretype.FieldType{Type: 0x51, Name: "Mutation"},
+ _gen_wiretype.FieldType{Type: 0x25, Name: "SyncTime"},
_gen_wiretype.FieldType{Type: 0x2, Name: "Delete"},
_gen_wiretype.FieldType{Type: 0x2, Name: "Continue"},
},
diff --git a/runtimes/google/vsync/vsyncd.go b/runtimes/google/vsync/vsyncd.go
index 375581b..435f55f 100644
--- a/runtimes/google/vsync/vsyncd.go
+++ b/runtimes/google/vsync/vsyncd.go
@@ -9,6 +9,7 @@
// log records to get in sync with the sender.
import (
"sync"
+ "time"
"veyron/services/store/raw"
@@ -63,7 +64,7 @@
// sync.RWMutex. The spec says that the writers cannot be starved by
// the readers but it does not guarantee FIFO. We may have to revisit
// this in the future.
-func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string) *syncd {
+func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string, syncTick time.Duration) *syncd {
// Connect to the local Veyron store.
// At present this is optional to allow testing (from the command-line) w/o Veyron store running.
// TODO: connecting to Veyron store should be mandatory.
@@ -76,14 +77,15 @@
st = vs
}
- return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint, st)
+ return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint, st, syncTick)
}
// newSyncdCore is the internal function that creates the Syncd
// structure and initilizes its thread (goroutines). It takes a
// Veyron Store parameter to separate the core of Syncd setup from the
// external dependency on Veyron Store.
-func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string, store storage.Store) *syncd {
+func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string,
+ store storage.Store, syncTick time.Duration) *syncd {
s := &syncd{}
// Bootstrap my own DeviceID.
@@ -121,7 +123,7 @@
s.pending.Add(3)
// Get deltas every peerSyncInterval.
- s.hdlInitiator = newInitiator(s, peerEndpoints, peerDeviceIDs)
+ s.hdlInitiator = newInitiator(s, peerEndpoints, peerDeviceIDs, syncTick)
go s.hdlInitiator.contactPeers()
// Garbage collect every garbageCollectInterval.
diff --git a/runtimes/google/vsync/vsyncd/main.go b/runtimes/google/vsync/vsyncd/main.go
index e8204c4..fd12a7a 100644
--- a/runtimes/google/vsync/vsyncd/main.go
+++ b/runtimes/google/vsync/vsyncd/main.go
@@ -20,6 +20,7 @@
vstoreEndpoint := flag.String("vstore", "", "endpoint of the local Veyron store")
// TODO(rthellend): Remove the address flag when the config manager is working.
address := flag.String("address", ":0", "address to listen on")
+ syncTick := flag.Duration("synctick", 0, "clock tick duration for sync with a peer (e.g. 10s)")
flag.Parse()
if *devid == "" {
@@ -37,7 +38,7 @@
}
// Register the "sync" prefix with the sync dispatcher.
- syncd := vsync.NewSyncd(*peerEndpoints, *peerDeviceIDs, *devid, *storePath, *vstoreEndpoint)
+ syncd := vsync.NewSyncd(*peerEndpoints, *peerDeviceIDs, *devid, *storePath, *vstoreEndpoint, *syncTick)
serverSync := vsync.NewServerSync(syncd)
if err := s.Register("sync", ipc.SoloDispatcher(serverSync, nil)); err != nil {
vlog.Fatalf("syncd:: error registering service: err %v", err)
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index 0617640..5cbf0bf 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -130,7 +130,10 @@
return
}
- if err = w.processChanges(changes); err != nil {
+ // Timestamp of these changes arriving at the Sync server.
+ syncTime := time.Now().UnixNano()
+
+ if err = w.processChanges(changes, syncTime); err != nil {
// TODO(rdaoud): don't crash, instead add retry policies to attempt some degree of
// self-healing from a data corruption where feasible, otherwise quarantine this device
// from the cluster and stop Syncd to avoid propagating data corruptions.
@@ -141,7 +144,7 @@
// processChanges applies the batch of changes (object mutations) received from the Watch API.
// The function grabs the write-lock to access the Log and DAG DBs.
-func (w *syncWatcher) processChanges(changes watch.ChangeBatch) error {
+func (w *syncWatcher) processChanges(changes watch.ChangeBatch, syncTime int64) error {
w.syncd.lock.Lock()
defer w.syncd.lock.Unlock()
@@ -159,7 +162,7 @@
return fmt.Errorf("invalid change value, not a mutation: %#v", ch)
}
- val := &LogValue{Mutation: *mu, Delete: ch.State == watch.DoesNotExist, Continue: ch.Continued}
+ val := &LogValue{Mutation: *mu, SyncTime: syncTime, Delete: ch.State == watch.DoesNotExist, Continue: ch.Continued}
var parents []storage.Version
if mu.PriorVersion != storage.NoVersion {
parents = []storage.Version{mu.PriorVersion}
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 01ae25a..45be727 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -269,9 +269,9 @@
func fakeSyncd(t *testing.T, storeDir string, withStore bool) *syncd {
var s *syncd
if withStore {
- s = newSyncdCore("", "", "fake-dev", storeDir, "", &fakeVStore{})
+ s = newSyncdCore("", "", "fake-dev", storeDir, "", &fakeVStore{}, 0)
} else {
- s = newSyncdCore("", "", "fake-dev", storeDir, "", nil)
+ s = newSyncdCore("", "", "fake-dev", storeDir, "", nil, 0)
}
if s == nil {
t.Fatal("cannot create a Sync server")