veyron/runtimes/google/vsync: Support conflict resolution via
blessings. Blessings are captured by adding dag links. So we also
provide a new type of log record (LinkRec) to communicate link
addition.
Change-Id: I1c4c6d8da7155df9a245a53e58f5d4f11659e1d6
diff --git a/runtimes/google/vsync/gc.go b/runtimes/google/vsync/gc.go
index 726868c..9dc8dd1 100644
--- a/runtimes/google/vsync/gc.go
+++ b/runtimes/google/vsync/gc.go
@@ -96,7 +96,9 @@
// strictCheck when enabled performs strict checking of every
// log record being deleted to confirm that it should be in
// fact deleted.
- strictCheck = true
+ // TODO(hpucha): Support strictCheck in the presence
+ // of Link log records.
+ strictCheck = false
// Every compactCount iterations of garbage collection, kvdb
// is compacted. This value has performance implications as
@@ -304,6 +306,12 @@
return err
}
+ if rec.RecType == LinkRec {
+ // For a link log record, gc it right away.
+ g.dagPruneCallBack(logRecKey(devid, gnum, l))
+ continue
+ }
+
// Insert the object in this log record to the prune
// map if needed.
// If this object does not exist, create it.
@@ -441,19 +449,21 @@
if err != nil {
return err
}
- objHist, ok := g.verifyPruneMap[rec.ObjID]
- if !ok {
- return errors.New("obj not found in verifyMap")
- }
- _, found := objHist.versions[rec.CurVers]
- // If online consistency check is in progress, we
- // cannot strictly verify all the versions to be
- // deleted, and we ignore the failure to find a
- // version.
- if found {
- delete(objHist.versions, rec.CurVers)
- } else if !g.checkConsistency {
- return errors.New("verification failed")
+ if rec.RecType == NodeRec {
+ objHist, ok := g.verifyPruneMap[rec.ObjID]
+ if !ok {
+ return errors.New("obj not found in verifyMap")
+ }
+ _, found := objHist.versions[rec.CurVers]
+ // If online consistency check is in progress, we
+ // cannot strictly verify all the versions to be
+ // deleted, and we ignore the failure to find a
+ // version.
+ if found {
+ delete(objHist.versions, rec.CurVers)
+ } else if !g.checkConsistency {
+ return errors.New("verification failed")
+ }
}
}
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index feb92a8..930df89 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -45,6 +45,7 @@
"veyron/services/store/raw"
"veyron2/storage"
+ "veyron2/vlog"
)
var (
@@ -300,12 +301,13 @@
return l.gens.del(key)
}
-// createLocalLogRec creates a new local log record.
+// createLocalLogRec creates a new local log record of type NodeRec.
func (l *iLog) createLocalLogRec(obj storage.ID, vers storage.Version, par []storage.Version, val *LogValue) (*LogRec, error) {
rec := &LogRec{
- DevID: l.s.id,
- GNum: l.head.Curgen,
- LSN: l.head.Curlsn,
+ DevID: l.s.id,
+ GNum: l.head.Curgen,
+ LSN: l.head.Curlsn,
+ RecType: NodeRec,
ObjID: obj,
CurVers: vers,
@@ -319,6 +321,25 @@
return rec, nil
}
+// createLocalLinkLogRec creates a new local log record of type LinkRec.
+func (l *iLog) createLocalLinkLogRec(obj storage.ID, vers, par storage.Version) (*LogRec, error) {
+ rec := &LogRec{
+ DevID: l.s.id,
+ GNum: l.head.Curgen,
+ LSN: l.head.Curlsn,
+ RecType: LinkRec,
+
+ ObjID: obj,
+ CurVers: vers,
+ Parents: []storage.Version{par},
+ }
+
+ // Increment the LSN for the local log.
+ l.head.Curlsn++
+
+ return rec, nil
+}
+
// createRemoteGeneration adds a new remote generation.
func (l *iLog) createRemoteGeneration(dev DeviceID, gnum GenID, gen *genMetadata) error {
if l.db == nil {
@@ -357,6 +378,7 @@
}
err := l.putGenMetadata(l.s.id, g, val)
+ vlog.VI(2).Infof("createLocalGeneration:: created gen %d %v", g, val)
// Move to the next generation irrespective of err.
l.head.Curorder++
l.head.Curgen++
@@ -370,6 +392,8 @@
if l.db == nil {
return errInvalidLog
}
+
+ vlog.VI(2).Infof("processWatchRecord:: adding object %v %v", objID, vers)
// Check if the object version already exists in the DAG. if so return.
if l.s.dag.hasNode(objID, vers) {
return nil
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 67422d9..e0e84b7 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -191,6 +191,7 @@
return GenVector{}, err
}
+ vlog.VI(2).Infof("updateLocalGeneration:: created gen %d", gen)
// Update local generation vector in devTable.
if err = i.syncd.devtab.updateGeneration(i.syncd.id, i.syncd.id, gen); err != nil {
return GenVector{}, err
@@ -308,10 +309,16 @@
if err != nil {
return err
}
- if err = i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey); err != nil {
- return err
+
+ vlog.VI(2).Infof("insertRecInLogAndDag:: Adding log record %v", rec)
+ switch rec.RecType {
+ case NodeRec:
+ return i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey)
+ case LinkRec:
+ return i.syncd.dag.addParent(rec.ObjID, rec.CurVers, rec.Parents[0], true)
+ default:
+ return fmt.Errorf("unknown log record type")
}
- return nil
}
// createGenMetadataBatch inserts a batch of generations into the log.
@@ -337,12 +344,31 @@
// processUpdatedObjects processes all the updates received by the
// initiator, one object at a time. For each updated object, we first
-// check if the object has any conflicts. If there is a conflict, we
-// resolve the conflict and generate a new store mutation reflecting
-// the conflict resolution. If there is no conflict, we generate a
-// store mutation to simply update the store to the latest value. We
-// then put all these mutations in the store. If the put succeeds, we
-// update the log and dag state suitably (move the head ptr of the
+// check if the object has any conflicts, resulting in three
+// possibilities:
+//
+// * There is no conflict, and no updates are needed to the store
+// (isConflict=false, newHead == oldHead). All changes received convey
+// information that still keeps the local head as the most recent
+// version. This occurs when conflicts are resolved by blessings.
+//
+// * There is no conflict, but a remote version is discovered that
+// builds on the local head (isConflict=false, newHead != oldHead). In
+// this case, we generate a store mutation to simply update the store
+// to the latest value.
+//
+// * There is a conflict and we call into the app or the system to
+// resolve the conflict, resulting in three possibilties: (a) conflict
+// was resolved by blessing the local version. In this case, store
+// need not be updated, but a link is added to record the
+// blessing. (b) conflict was resolved by blessing the remote
+// version. In this case, store is updated with the remote version and
+// a link is added as well. (c) conflict was resolved by generating a
+// new store mutation. In this case, store is updated with the new
+// version.
+//
+// We then put all these mutations in the store. If the put succeeds,
+// we update the log and dag state suitably (move the head ptr of the
// object in the dag to the latest version, and create a new log
// record reflecting conflict resolution if any). Puts to store can
// fail since preconditions on the objects may have been violated. In
@@ -355,12 +381,11 @@
return err
}
- m, err := i.resolveConflicts()
- if err != nil {
+ if err := i.resolveConflicts(); err != nil {
return err
}
- err = i.updateStoreAndSync(ctx, m, local, minGens, remote, dID)
+ err := i.updateStoreAndSync(ctx, local, minGens, remote, dID)
if err == nil {
break
}
@@ -390,60 +415,28 @@
// Check if object has a conflict.
var err error
st.isConflict, st.newHead, st.oldHead, st.ancestor, err = i.syncd.dag.hasConflict(obj)
+ vlog.VI(2).Infof("detectConflicts:: object %v state %v err %v",
+ obj, st, err)
if err != nil {
return err
}
- if !st.isConflict {
- rec, err := i.getLogRec(obj, st.newHead)
- if err != nil {
- return err
- }
- st.resolvVal = &rec.Value
- // Sanity check.
- if st.resolvVal.Mutation.Version != st.newHead {
- return fmt.Errorf("bad mutation %d %d",
- st.resolvVal.Mutation.Version, st.newHead)
- }
- }
}
return nil
}
-// getLogRec returns the log record corresponding to a given object and its version.
-func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
- logKey, err := i.syncd.dag.getLogrec(obj, vers)
- if err != nil {
- return nil, err
- }
- dev, gen, lsn, err := splitLogRecKey(logKey)
- if err != nil {
- return nil, err
- }
- rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
- if err != nil {
- return nil, err
- }
- return rec, nil
-}
-
-// resolveConflicts resolves conflicts for updated objects.
-func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
+// resolveConflicts resolves conflicts for updated objects. Conflicts
+// may be resolved by adding new versions or blessing either the local
+// or the remote version.
+func (i *syncInitiator) resolveConflicts() error {
switch conflictResolutionPolicy {
case useTime:
if err := i.resolveConflictsByTime(); err != nil {
- return nil, err
+ return err
}
default:
- return nil, fmt.Errorf("unknown conflict resolution policy")
+ return fmt.Errorf("unknown conflict resolution policy")
}
-
- var m []raw.Mutation
- for _, st := range i.updObjects {
- // Append to mutations.
- st.resolvVal.Mutation.PriorVersion = st.oldHead
- m = append(m, st.resolvVal.Mutation)
- }
- return m, nil
+ return nil
}
// resolveConflictsByTime resolves conflicts using the timestamps
@@ -482,11 +475,10 @@
res = 1
}
- m := lrecs[res].Value.Mutation
- m.Version = storage.NewVersion()
-
- // TODO(hpucha): handle continue and delete flags.
- st.resolvVal = &LogValue{Mutation: m}
+ // Instead of creating a new version that resolves the
+ // conflict, we are blessing an existing version as
+ // the conflict resolution.
+ st.resolvVal = &lrecs[res].Value
}
return nil
@@ -511,11 +503,38 @@
// updateStoreAndSync updates the store, and if that is successful,
// updates log and dag data structures.
-func (i *syncInitiator) updateStoreAndSync(ctx context.T, m []raw.Mutation, local, minGens, remote GenVector, dID DeviceID) error {
+func (i *syncInitiator) updateStoreAndSync(ctx context.T, local, minGens, remote GenVector, dID DeviceID) error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
+ var m []raw.Mutation
+ for obj, st := range i.updObjects {
+ if !st.isConflict {
+ rec, err := i.getLogRec(obj, st.newHead)
+ if err != nil {
+ return err
+ }
+ st.resolvVal = &rec.Value
+ // Sanity check.
+ if st.resolvVal.Mutation.Version != st.newHead {
+ return fmt.Errorf("bad mutation %d %d",
+ st.resolvVal.Mutation.Version, st.newHead)
+ }
+ }
+
+ // If the local version is picked, no further updates
+ // to the store are needed. If the remote version is
+ // picked, we put it in the store.
+ if st.resolvVal.Mutation.Version != st.oldHead {
+ // Append to mutations.
+ st.resolvVal.Mutation.PriorVersion = st.oldHead
+ vlog.VI(2).Infof("updateStoreAndSync:: appending mutation %v for obj %v",
+ st.resolvVal.Mutation, obj)
+ m = append(m, st.resolvVal.Mutation)
+ }
+ }
+
// TODO(hpucha): We will hold the lock across PutMutations rpc
// to prevent a race with watcher. The next iteration will
// clean up this coordination.
@@ -553,30 +572,68 @@
return nil
}
+// getLogRec returns the log record corresponding to a given object and its version.
+func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
+ logKey, err := i.syncd.dag.getLogrec(obj, vers)
+ if err != nil {
+ return nil, err
+ }
+ dev, gen, lsn, err := splitLogRecKey(logKey)
+ if err != nil {
+ return nil, err
+ }
+ rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
+ if err != nil {
+ return nil, err
+ }
+ return rec, nil
+}
+
// updateLogAndDag updates the log and dag data structures on a successful store put.
func (i *syncInitiator) updateLogAndDag() error {
for obj, st := range i.updObjects {
if st.isConflict {
// Object had a conflict, which was resolved successfully.
// Put is successful, create a log record.
- parents := []storage.Version{st.newHead, st.oldHead}
- rec, err := i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
+ var err error
+ var rec *LogRec
+
+ switch {
+ case st.resolvVal.Mutation.Version == st.oldHead:
+ // Local version was blessed as the conflict resolution.
+ rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.oldHead, st.newHead)
+ case st.resolvVal.Mutation.Version == st.newHead:
+ // Remote version was blessed as the conflict resolution.
+ rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.newHead, st.oldHead)
+ default:
+ // New version was created to resolve the conflict.
+ parents := []storage.Version{st.newHead, st.oldHead}
+ rec, err = i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
+
+ }
if err != nil {
return err
}
-
logKey, err := i.syncd.log.putLogRec(rec)
if err != nil {
return err
}
-
- // Put is successful, add a new DAG node.
- if err = i.syncd.dag.addNode(obj, st.resolvVal.Mutation.Version, false, parents, logKey); err != nil {
+ // Add a new DAG node.
+ switch rec.RecType {
+ case NodeRec:
+ err = i.syncd.dag.addNode(obj, rec.CurVers, false, rec.Parents, logKey)
+ case LinkRec:
+ err = i.syncd.dag.addParent(obj, rec.CurVers, rec.Parents[0], false)
+ default:
+ return fmt.Errorf("unknown log record type")
+ }
+ if err != nil {
return err
}
}
- // Move the head.
+ // Move the head. This should be idempotent. We may
+ // move head to the local head in some cases.
if err := i.syncd.dag.moveHead(obj, st.resolvVal.Mutation.Version); err != nil {
return err
}
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index 3c9a063..468c882 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -172,22 +172,27 @@
t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
}
}
-
if err := s.hdlInitiator.detectConflicts(); err != nil {
t.Fatalf("detectConflicts failed with err %v", err)
}
if len(s.hdlInitiator.updObjects) != 1 {
t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 2 || st.oldHead != storage.NoVersion {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if len(m) != 1 || m[0].PriorVersion != storage.NoVersion || m[0].Version != 2 {
- t.Errorf("Unexpected mutations %v", m)
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -270,15 +275,21 @@
if len(s.hdlInitiator.updObjects) != 1 {
t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 2 || st.oldHead != storage.NoVersion {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if len(m) != 1 || m[0].PriorVersion != storage.NoVersion || m[0].Version != 2 {
- t.Errorf("Unexpected mutations %v", m)
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -362,22 +373,27 @@
v = v + 1
}
}
-
if err := s.hdlInitiator.detectConflicts(); err != nil {
t.Fatalf("detectConflicts failed with err %v", err)
}
if len(s.hdlInitiator.updObjects) != 1 {
t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 5 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if len(m) != 1 || m[0].PriorVersion != 2 || m[0].Version != 5 {
- t.Errorf("Unexpected versions %v", m[0])
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -437,16 +453,14 @@
t.Errorf("Data mismatch for generation metadata: %v instead of %v",
curVal, expVal)
}
-
if err := s.hdlInitiator.detectConflicts(); err != nil {
t.Fatalf("detectConflicts failed with err %v", err)
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
objid, err := strToObjID("12345")
@@ -466,6 +480,9 @@
if curRec.ObjID != objid {
t.Errorf("Data mismatch in log record %v", curRec)
}
+ if devid == "VeyronTab" && index == 3 && curRec.RecType != LinkRec {
+ t.Errorf("Data mismatch in log record %v", curRec)
+ }
// Verify DAG state.
if _, err := s.dag.getNode(objid, v); err != nil {
t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
@@ -473,19 +490,25 @@
v = v + 1
}
}
-
- if len(m) != 1 || len(s.hdlInitiator.updObjects) != 1 {
- t.Errorf("Unexpected number of updated objects %d %d", len(m), len(s.hdlInitiator.updObjects))
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- if m[0].PriorVersion != 2 {
- t.Errorf("Unexpected version %v", m[0])
+ st := s.hdlInitiator.updObjects[objid]
+ if !st.isConflict {
+ t.Errorf("Didn't detect a conflict %v", st)
+ }
+ if st.newHead != 5 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
// Curlsn == 4 for the log record that resolves conflict.
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head <= 5 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 5 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
@@ -555,12 +578,11 @@
if err := s.hdlInitiator.detectConflicts(); err != nil {
t.Fatalf("detectConflicts failed with err %v", err)
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
objid, err := strToObjID("12345")
@@ -580,6 +602,9 @@
if curRec.ObjID != objid {
t.Errorf("Data mismatch in log record %v", curRec)
}
+ if devid == "VeyronTab" && index == 3 && curRec.RecType != LinkRec {
+ t.Errorf("Data mismatch in log record %v", curRec)
+ }
// Verify DAG state.
if _, err := s.dag.getNode(objid, v); err != nil {
t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
@@ -587,18 +612,357 @@
v = v + 1
}
}
- if len(m) != 1 || len(s.hdlInitiator.updObjects) != 1 {
- t.Errorf("Unexpected number of updated objects %d %d", len(m), len(s.hdlInitiator.updObjects))
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- if m[0].PriorVersion != 2 {
- t.Errorf("Unexpected version %v", m[0])
+ st := s.hdlInitiator.updObjects[objid]
+ if !st.isConflict {
+ t.Errorf("Didn't detect a conflict %v", st)
+ }
+ if st.newHead != 5 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
// Curlsn == 4 for the log record that resolves conflict.
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 2 {
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head <= 5 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 5 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+}
+
+// TestInitiatorBlessNoConf0 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// unchanged at the end of replay. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-00.log.sync>.
+func TestInitiatorBlessNoConf0(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ t.Error(err)
+ }
+ stream, err := createReplayStream("remote-noconf-link-00.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ objid, err := strToObjID("12345")
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 2 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // No new log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 2 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+}
+
+// TestInitiatorBlessNoConf1 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// updated at the end of the replay. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-01.log.sync>.
+func TestInitiatorBlessNoConf1(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ t.Error(err)
+ }
+ stream, err := createReplayStream("remote-noconf-link-01.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ objid, err := strToObjID("12345")
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 3 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // No new log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 3 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+}
+
+// TestInitiatorBlessNoConf2 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// updated at the end of the first replay. In the second replay, a
+// conflict resolved locally is rediscovered since it was also
+// resolved remotely. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-02.log.sync,
+// remote-noconf-link-repeat.log.sync>.
+func TestInitiatorBlessNoConf2(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ t.Error(err)
+ }
+ stream, err := createReplayStream("remote-noconf-link-02.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ objid, err := strToObjID("12345")
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 4 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{"VeyronTab": 0}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 4 || st.resolvVal.Mutation.PriorVersion != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // No new log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 2 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+
+ // Test simultaneous conflict resolution.
+ stream, err = createReplayStream("remote-noconf-link-repeat.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ st = s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 4 || st.oldHead != 4 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronLaptop"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 4 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // No new log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 3 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+}
+
+// TestInitiatorBlessConf tests that a local and a remote log record
+// stream can be correctly applied, when the conflict is resolved by a
+// blessing. Commands are in files
+// testdata/<local-init-00.sync,remote-conf-link.log.sync>.
+func TestInitiatorBlessConf(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ t.Error(err)
+ }
+ stream, err := createReplayStream("remote-conf-link.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ objid, err := strToObjID("12345")
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+ st := s.hdlInitiator.updObjects[objid]
+ if !st.isConflict {
+ t.Errorf("Didn't detect a conflict %v", st)
+ }
+ if st.newHead != 3 || st.oldHead != 2 || st.ancestor != 1 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 3 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // New log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 1 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ curRec, err := s.log.getLogRec(s.id, GenID(1), LSN(3))
+ if err != nil || curRec == nil {
+ t.Fatalf("GetLogRec() can not find object %s:1:3 in log file err %v",
+ s.id, err)
+ }
+ if curRec.ObjID != objid || curRec.RecType != LinkRec {
+ t.Errorf("Data mismatch in log record %v", curRec)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 3 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 783d23f..bd9356c 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -78,7 +78,7 @@
for _, cmd := range cmds {
switch cmd.cmd {
case addLocal:
- err = log.processWatchRecord(cmd.objID, cmd.version, cmd.parents, &LogValue{})
+ err = log.processWatchRecord(cmd.objID, cmd.version, cmd.parents, &LogValue{Mutation: raw.Mutation{Version: cmd.version}})
if err != nil {
return nil, fmt.Errorf("cannot replay local log records %d:%s err %v",
cmd.objID, cmd.version, err)
@@ -121,25 +121,31 @@
stream := newStream()
for _, cmd := range cmds {
+ id, gnum, lsn, err := splitLogRecKey(cmd.logrec)
+ if err != nil {
+ return nil, err
+ }
+ rec := LogRec{
+ DevID: id,
+ GNum: gnum,
+ LSN: lsn,
+ ObjID: cmd.objID,
+ CurVers: cmd.version,
+ Parents: cmd.parents,
+ Value: LogValue{
+ Mutation: raw.Mutation{Version: cmd.version},
+ },
+ }
+
switch cmd.cmd {
case addRemote:
- id, gnum, lsn, err := splitLogRecKey(cmd.logrec)
- if err != nil {
- return nil, err
- }
- rec := LogRec{
- DevID: id,
- GNum: gnum,
- LSN: lsn,
- ObjID: cmd.objID,
- CurVers: cmd.version,
- Parents: cmd.parents,
- Value: LogValue{
- Mutation: raw.Mutation{Version: cmd.version},
- },
- }
- stream.add(rec)
+ rec.RecType = NodeRec
+ case linkRemote:
+ rec.RecType = LinkRec
+ default:
+ return nil, err
}
+ stream.add(rec)
}
return stream, nil
diff --git a/runtimes/google/vsync/vsync.vdl b/runtimes/google/vsync/vsync.vdl
index 4de7c9c..aa06ec9 100644
--- a/runtimes/google/vsync/vsync.vdl
+++ b/runtimes/google/vsync/vsync.vdl
@@ -15,13 +15,21 @@
// GenVector is the generation vector.
type GenVector map[DeviceID]GenID
+const (
+ // NodeRec type log record adds a new node in the dag.
+ NodeRec = byte(0)
+ // LinkRec type log record adds a new link in the dag.
+ LinkRec = byte(1)
+)
+
// LogRec represents a single log record that is exchanged between two
// peers.
//
// It contains log related metadata: DevID is the id of the
// device that created the log record, GNum is the ID of the
-// generation that the log record is part of, and LSN is the log
-// sequence number of the log record in the generation GNum.
+// generation that the log record is part of, LSN is the log
+// sequence number of the log record in the generation GNum,
+// and RecType is the type of log record.
//
// It also contains information relevant to the updates to an object
// in the store: ObjID is the id of the object that was
@@ -34,6 +42,7 @@
DevID DeviceID
GNum GenID
LSN LSN
+ RecType byte
// Object related information.
ObjID storage.ID
CurVers storage.Version
diff --git a/runtimes/google/vsync/vsync.vdl.go b/runtimes/google/vsync/vsync.vdl.go
index 2055734..e2120db 100644
--- a/runtimes/google/vsync/vsync.vdl.go
+++ b/runtimes/google/vsync/vsync.vdl.go
@@ -35,8 +35,9 @@
//
// It contains log related metadata: DevID is the id of the
// device that created the log record, GNum is the ID of the
-// generation that the log record is part of, and LSN is the log
-// sequence number of the log record in the generation GNum.
+// generation that the log record is part of, LSN is the log
+// sequence number of the log record in the generation GNum,
+// and RecType is the type of log record.
//
// It also contains information relevant to the updates to an object
// in the store: ObjID is the id of the object that was
@@ -46,9 +47,10 @@
// the object mutation.
type LogRec struct {
// Log related information.
- DevID DeviceID
- GNum GenID
- LSN LSN
+ DevID DeviceID
+ GNum GenID
+ LSN LSN
+ RecType byte
// Object related information.
ObjID storage.ID
CurVers storage.Version
@@ -71,6 +73,14 @@
Continue bool
}
+const (
+ // NodeRec type log record adds a new node in the dag.
+ NodeRec = byte(0)
+
+ // LinkRec type log record adds a new link in the dag.
+ LinkRec = byte(1)
+)
+
// Sync allows a device to GetDeltas from another device.
// Sync is the interface the client binds and uses.
// Sync_ExcludingUniversal is the interface without internal framework-added methods
@@ -303,6 +313,7 @@
_gen_wiretype.FieldType{Type: 0x41, Name: "DevID"},
_gen_wiretype.FieldType{Type: 0x42, Name: "GNum"},
_gen_wiretype.FieldType{Type: 0x45, Name: "LSN"},
+ _gen_wiretype.FieldType{Type: 0x46, Name: "RecType"},
_gen_wiretype.FieldType{Type: 0x47, Name: "ObjID"},
_gen_wiretype.FieldType{Type: 0x48, Name: "CurVers"},
_gen_wiretype.FieldType{Type: 0x49, Name: "Parents"},