syncbase/vsync:
Bug fixes for handling cases with conflicting updates.
Added an integration test to go with it.
Issues:
* Contention between initiator and watcher when receiving deltas.
* hasConflict when retried didn't correctly capture the change in DAG head.
* Link log records were not created where needed.
* When sync was stale compared to store, the retry error was not correctly handled.
Change-Id: I6fa1f13cd5aabb8290e6e2631ea8458b55bbfe7f
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 0633079..77e5d27 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "math/rand"
"strconv"
"strings"
"time"
@@ -160,6 +161,72 @@
tu.RunClient(t, client0Creds, runVerifyLocalAndRemoteData, "sync0")
}
+// V23 TestSyncbasedExchangeDeltasWithConflicts tests the exchange of deltas
+// between two Syncbase instances and their clients in the presence of
+// conflicting updates. The 1st client creates a SyncGroup and puts some
+// database entries in it. The 2nd client joins that SyncGroup and reads the
+// database entries. Both clients then update a subset of existing keys
+// concurrently, and sync with each other. During sync, the following
+// possibilities arise: (1) Both clients make their local updates first, sync
+// with each other to detect conflicts. Resolution will cause one of the clients
+// to see a new value based on the timestamp. (2) One client's update is synced
+// before the other client has a chance to commit its update. The other client's
+// update will then not be a conflict but a valid update building on the first
+// one's change.
+//
+// Note that the verification done from the client side can have false positives
+// re. the test's success. Since we cannot accurately predict which client's
+// updates win, the test passes if we find either outcome. However, this could
+// also imply that the sync failed, and each client is merely reading its own
+// local value. The verification step mainly verifies that syncbased is still
+// responsive and that the data is not corrupt.
+//
+// TODO(hpucha): We could diff the states of the two clients and ensure they are
+// identical. Optionally we could expose inner state of syncbased via some
+// debug methods.
+func V23TestSyncbasedExchangeDeltasWithConflicts(t *v23tests.T) {
+ // Run it multiple times to exercise different interactions between sync
+ // and local updates that change every run due to timing.
+ for i := 0; i < 10; i++ {
+ testSyncbasedExchangeDeltasWithConflicts(t)
+ }
+}
+
+func testSyncbasedExchangeDeltasWithConflicts(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/c0"]}, "Write": {"In":["root/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/c1"]}, "Write": {"In":["root/c1"]}}`)
+ defer cleanSync1()
+
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "SG1")
+
+ tu.RunClient(t, client0Creds, runSetupAppA, "sync0")
+ tu.RunClient(t, client0Creds, runCreateSyncGroup, "sync0", sgName, "tb:foo", "root/s0", "root/s1")
+ tu.RunClient(t, client0Creds, runPopulateData, "sync0", "foo", "0")
+
+ tu.RunClient(t, client1Creds, runSetupAppA, "sync1")
+ tu.RunClient(t, client1Creds, runJoinSyncGroup, "sync1", sgName)
+ tu.RunClient(t, client1Creds, runVerifySyncGroupData, "sync1", "foo", "0", "10")
+
+ go tu.RunClient(t, client0Creds, runUpdateData, "sync0", "5")
+ d := time.Duration(rand.Int63n(50)) * time.Millisecond
+ time.Sleep(d)
+ tu.RunClient(t, client1Creds, runUpdateData, "sync1", "5")
+
+ time.Sleep(10 * time.Second)
+
+ tu.RunClient(t, client0Creds, runVerifyConflictResolution, "sync0")
+ tu.RunClient(t, client1Creds, runVerifyConflictResolution, "sync1")
+}
+
// V23TestNestedSyncGroups tests the exchange of deltas between two Syncbase
// instances and their clients with nested SyncGroups. The 1st client creates
// two SyncGroups at prefixes "f" and "foo" and puts some database entries in
@@ -405,7 +472,7 @@
for i := start; i < start+5; i++ {
key := fmt.Sprintf("foo%d", i)
r := tb.Row(key)
- if err := r.Put(ctx, "testkey1"+key); err != nil {
+ if err := r.Put(ctx, "testkey"+args[0]+key); err != nil {
return fmt.Errorf("r.Put() failed: %v\n", err)
}
}
@@ -544,6 +611,47 @@
return nil
}, "runVerifyDeletedData")
+var runVerifyConflictResolution = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService(args[0]).App("a")
+ d := a.NoSQLDatabase("d", nil)
+ tb := d.Table("tb")
+
+ wantData := []struct {
+ start uint64
+ count uint64
+ valPfx []string
+ }{
+ {0, 5, []string{"testkey"}},
+ {5, 5, []string{"testkeysync0", "testkeysync1"}},
+ }
+
+ // Verify that all keys and values made it correctly.
+ for _, d := range wantData {
+ for i := d.start; i < d.start+d.count; i++ {
+ key := fmt.Sprintf("foo%d", i)
+ r := tb.Row(key)
+ var got string
+ if err := r.Get(ctx, &got); err != nil {
+ return fmt.Errorf("r.Get() failed: %v\n", err)
+ }
+ match := 0
+ for _, p := range d.valPfx {
+ want := p + key
+ if got == want {
+ match++
+ }
+ }
+ if match != 1 {
+ return fmt.Errorf("unexpected value: got %q, match %v, want %v\n", got, match, d.valPfx)
+ }
+ }
+ }
+ return nil
+}, "runVerifyConflictResolution")
+
var runVerifyNonSyncGroupData = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
@@ -593,7 +701,7 @@
valPfx string
}{
{0, 5, "testkey"},
- {5, 5, "testkey1"},
+ {5, 5, "testkeysync1"},
{10, 10, "testkey"},
}
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index 88e046d..d235b1b 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -41,6 +41,10 @@
v23tests.RunTest(t, V23TestSyncbasedExchangeDeltas)
}
+func TestV23SyncbasedExchangeDeltasWithConflicts(t *testing.T) {
+ v23tests.RunTest(t, V23TestSyncbasedExchangeDeltasWithConflicts)
+}
+
func TestV23NestedSyncGroups(t *testing.T) {
v23tests.RunTest(t, V23TestNestedSyncGroups)
}
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 728cb0f..3850d5b 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -100,9 +100,9 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
-
"v.io/v23/context"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
)
const (
@@ -141,11 +141,15 @@
// graftInfo holds the state of an object's node grafting in the DAG.
// It is ephemeral (in-memory), used during a single sync operation to track
// where the new DAG fragments are attached to the existing DAG for the object:
-// - newNodes: the set of newly added nodes; used to detect the type of edges
-// between nodes (new-node to old-node or vice versa).
-// - newHeads: the set of new candidate head nodes; used to detect conflicts.
-// - graftNodes: the set of old nodes on which new nodes were added, and their
-// level in the DAG; used to find common ancestors for conflicts.
+// - newNodes: the set of newly added nodes; used to detect the type of edges
+// between nodes (new-node to old-node or vice versa).
+// - newHeads: the set of new candidate head nodes; used to detect conflicts.
+// - graftNodes: the set of old nodes on which new nodes were added, and their
+// level in the DAG; used to find common ancestors for conflicts.
+// - oldHeadSnap: snapshot of the current local head known by sync, used in
+// conflict detection, particularly when conflict detection needs
+// to be retried due to sync dag state being stale compared
+// to local store.
//
// After the received mutations are applied, if there are two heads in the
// newHeads set, there is a conflict to be resolved for the object. Otherwise,
@@ -155,9 +159,10 @@
// TODO(rdaoud): support open DAGs to handle delayed conflict resolution by
// tracking multiple dangling remote heads in addition to the local head node.
type graftInfo struct {
- newNodes map[string]bool
- newHeads map[string]bool
- graftNodes map[string]uint64
+ newNodes map[string]bool
+ newHeads map[string]bool
+ graftNodes map[string]uint64
+ oldHeadSnap string
}
// newBatchInfo allocates and initializes a batch info entry.
@@ -460,9 +465,12 @@
// - Yes: return (true, newHead, oldHead, ancestor) -- from a common past
// - Yes: return (true, newHead, oldHead, NoVersion) -- from disjoint pasts
// - No: return (false, newHead, oldHead, NoVersion) -- no conflict
+//
// A conflict exists when there are two new-head nodes in the graft structure.
// It means the newly added object versions are not derived in part from this
-// device's current knowledge. If there is a single new-head, the object
+// device's current knowledge. A conflict also exists if the snapshotted local
+// head is different from the current local head. If there is a single new-head
+// and the snapshot head is the same as the current local head, the object
// changes were applied without triggering a conflict.
func hasConflict(ctx *context.T, st store.StoreReader, oid string, graft graftMap) (isConflict bool, newHead, oldHead, ancestor string, err error) {
isConflict = false
@@ -493,23 +501,39 @@
// on this device and will not trigger a conflict.
oldHead, _ = getHead(ctx, st, oid)
- // If there is only one new head node there is no conflict. The new
- // head is that single one, even if it might also be the same old node.
+ // If there is only one new head node and the snapshotted old head is
+ // still unchanged, there is no conflict. The new head is that single
+ // one, even if it might also be the same old node.
if numHeads == 1 {
for head := range info.newHeads {
newHead = head
}
- return
+ if newHead == info.oldHeadSnap {
+ // Only link log records could've been received.
+ newHead = oldHead
+ return
+ } else if oldHead == info.oldHeadSnap {
+ return
+ }
}
- // With two candidate head nodes, the new head is the non-old one.
+ // The new head is the non-old one.
for head := range info.newHeads {
- if head != oldHead {
+ if head != info.oldHeadSnap {
newHead = head
break
}
}
+ // There wasn't a conflict at the old snapshot, but now there is. The
+ // snapshotted head is the common ancestor.
+ isConflict = true
+ if numHeads == 1 {
+ vlog.VI(4).Infof("sync: hasConflict: old graft snapshot %v, head %s", graft, oldHead)
+ ancestor = info.oldHeadSnap
+ return
+ }
+
// There is a conflict: the best choice ancestor is the graft node with
// the largest level (farthest from the root). It is possible in some
// corner cases to have multiple graft nodes at the same level. This
@@ -524,7 +548,6 @@
// graft nodes (empty set) and thus no common ancestor because the two
// DAG fragments were created from distinct root nodes. The "NoVersion"
// value is returned as the ancestor.
- isConflict = true
var maxLevel uint64
for node, level := range info.graftNodes {
if maxLevel < level || (maxLevel == level && ancestor < node) {
@@ -559,6 +582,7 @@
// If the object has a head node, include it in the set of new heads.
if head, err := getHead(ctx, st, oid); err == nil {
info.newHeads[head] = true
+ info.oldHeadSnap = head
}
graft[oid] = info
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index 18df27b..e237277 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -314,6 +314,9 @@
//
// TODO(hpucha): Refactor this code with computeDelta code in sync_state.go.
func (iSt *initiationState) createLocalGenVec(ctx *context.T) error {
+ iSt.sync.thLock.Lock()
+ defer iSt.sync.thLock.Unlock()
+
// Freeze the most recent batch of local changes before fetching
// remote changes from a peer. This frozen state is used by the
// responder when responding to GetDeltas RPC.
@@ -386,6 +389,9 @@
// resolution during replay. This avoids resolving conflicts that have already
// been resolved by other devices.
func (iSt *initiationState) recvAndProcessDeltas(ctx *context.T) error {
+ iSt.sync.thLock.Lock()
+ defer iSt.sync.thLock.Unlock()
+
// TODO(hpucha): This works for now, but figure out a long term solution
// as this may be implementation dependent. It currently works because
// the RecvStream call is stateless, and grabbing a handle to it
@@ -585,11 +591,10 @@
return err
}
- if err := iSt.updateDbAndSyncSt(ctx); err != nil {
- return err
+ err := iSt.updateDbAndSyncSt(ctx)
+ if err == nil {
+ err = iSt.tx.Commit()
}
-
- err := iSt.tx.Commit()
if err == nil {
committed = true
// Update in-memory genvector since commit is successful.
@@ -599,11 +604,13 @@
vlog.VI(3).Info("sync: processUpdatedObjects: end: changes committed")
return nil
}
-
if verror.ErrorID(err) != store.ErrConcurrentTransaction.ID {
return err
}
+ // Either updateDbAndSyncSt() or tx.Commit() detected a
+ // concurrent transaction. Retry processing the remote updates.
+ //
// TODO(hpucha): Sleeping and retrying is a temporary
// solution. Next iteration will have coordination with watch
// thread to intelligently retry. Hence this value is not a
@@ -626,7 +633,11 @@
}
if !confSt.isConflict {
- confSt.res = &conflictResolution{ty: pickRemote}
+ if confSt.newHead == confSt.oldHead {
+ confSt.res = &conflictResolution{ty: pickLocal}
+ } else {
+ confSt.res = &conflictResolution{ty: pickRemote}
+ }
} else {
count++
}
@@ -639,76 +650,78 @@
func (iSt *initiationState) updateDbAndSyncSt(ctx *context.T) error {
for objid, confSt := range iSt.updObjects {
// If the local version is picked, no further updates to the
- // Database are needed.
- if confSt.res.ty == pickLocal {
- continue
- }
+ // Database are needed. If the remote version is picked or if a
+ // new version is created, we put it in the Database.
+ if confSt.res.ty != pickLocal {
- // If the remote version is picked or if a new version is
- // created, we put it in the Database.
-
- // TODO(hpucha): Hack right now. Need to change Database's
- // handling of deleted objects.
- oldVersDeleted := true
- if confSt.oldHead != NoVersion {
- oldDagNode, err := getNode(ctx, iSt.tx, objid, confSt.oldHead)
- if err != nil {
- return err
+ // TODO(hpucha): Hack right now. Need to change Database's
+ // handling of deleted objects.
+ oldVersDeleted := true
+ if confSt.oldHead != NoVersion {
+ oldDagNode, err := getNode(ctx, iSt.tx, objid, confSt.oldHead)
+ if err != nil {
+ return err
+ }
+ oldVersDeleted = oldDagNode.Deleted
}
- oldVersDeleted = oldDagNode.Deleted
- }
- var newVersion string
- var newVersDeleted bool
- switch confSt.res.ty {
- case pickRemote:
- newVersion = confSt.newHead
- newDagNode, err := getNode(ctx, iSt.tx, objid, newVersion)
- if err != nil {
- return err
+ var newVersion string
+ var newVersDeleted bool
+ switch confSt.res.ty {
+ case pickRemote:
+ newVersion = confSt.newHead
+ newDagNode, err := getNode(ctx, iSt.tx, objid, newVersion)
+ if err != nil {
+ return err
+ }
+ newVersDeleted = newDagNode.Deleted
+ case createNew:
+ newVersion = confSt.res.rec.Metadata.CurVers
+ newVersDeleted = confSt.res.rec.Metadata.Delete
}
- newVersDeleted = newDagNode.Deleted
- case createNew:
- newVersion = confSt.res.rec.Metadata.CurVers
- newVersDeleted = confSt.res.rec.Metadata.Delete
- }
- // Skip delete followed by a delete.
- if oldVersDeleted && newVersDeleted {
- continue
- }
+ // Skip delete followed by a delete.
+ if oldVersDeleted && newVersDeleted {
+ continue
+ }
- if !oldVersDeleted {
- // Read current version to enter it in the readset of the transaction.
- version, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid))
- if err != nil {
- return err
+ if !oldVersDeleted {
+ // Read current version to enter it in the readset of the transaction.
+ version, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid))
+ if err != nil {
+ return err
+ }
+ if string(version) != confSt.oldHead {
+ vlog.VI(4).Infof("sync: updateDbAndSyncSt: concurrent updates %s %s", version, confSt.oldHead)
+ return store.NewErrConcurrentTransaction(ctx)
+ }
+ } else {
+ // Ensure key doesn't exist.
+ if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
+ return store.NewErrConcurrentTransaction(ctx)
+ }
}
- if string(version) != confSt.oldHead {
- return store.NewErrConcurrentTransaction(ctx)
- }
- } else {
- // Ensure key doesn't exist.
- if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(objid)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
- return store.NewErrConcurrentTransaction(ctx)
- }
- }
- if !newVersDeleted {
- if confSt.res.ty == createNew {
- if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(objid), confSt.res.val, []byte(newVersion)); err != nil {
+ if !newVersDeleted {
+ if confSt.res.ty == createNew {
+ vlog.VI(4).Infof("sync: updateDbAndSyncSt: PutAtVersion %s %s", objid, newVersion)
+ if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(objid), confSt.res.val, []byte(newVersion)); err != nil {
+ return err
+ }
+ }
+ vlog.VI(4).Infof("sync: updateDbAndSyncSt: PutVersion %s %s", objid, newVersion)
+ if err := watchable.PutVersion(ctx, iSt.tx, []byte(objid), []byte(newVersion)); err != nil {
+ return err
+ }
+ } else {
+ vlog.VI(4).Infof("sync: updateDbAndSyncSt: Deleting obj %s", objid)
+ if err := iSt.tx.Delete([]byte(objid)); err != nil {
return err
}
}
- if err := watchable.PutVersion(ctx, iSt.tx, []byte(objid), []byte(newVersion)); err != nil {
- return err
- }
- } else {
- if err := iSt.tx.Delete([]byte(objid)); err != nil {
- return err
- }
}
-
+ // Always update sync state irrespective of local/remote/new
+ // versions being picked.
if err := iSt.updateLogAndDag(ctx, objid); err != nil {
return err
}
@@ -774,6 +787,8 @@
func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, obj, vers, par string) *localLogRec {
gen, pos := iSt.sync.reserveGenAndPosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
+ vlog.VI(4).Infof("sync: createLocalLinkLogRec: obj %s vers %s par %s", obj, vers, par)
+
rec := &localLogRec{
Metadata: interfaces.LogRecMetadata{
Id: iSt.sync.id,
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index d0322d4..6b017ee 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -33,6 +33,22 @@
sv interfaces.Service
server rpc.Server
+ // High-level lock to serialize the watcher and the initiator. This lock is
+ // needed to handle the following cases: (a) When the initiator is
+ // cutting a local generation, it waits for the watcher to commit the
+ // latest local changes before including them in the checkpoint. (b)
+ // When the initiator is receiving updates, it reads the latest head of
+ // an object as per the DAG state in order to construct the in-memory
+ // graft map used for conflict detection. At the same time, if a watcher
+ // is processing local updates, it may move the object head. Hence the
+ // initiator and watcher contend on the DAG head of an object. Instead
+ // of retrying a transaction which causes the entire delta to be
+ // replayed, we use pessimistic locking to serialize the initiator and
+ // the watcher.
+ //
+ // TODO(hpucha): This is a temporary hack.
+ thLock sync.RWMutex
+
// State to coordinate shutdown of spawned goroutines.
pending sync.WaitGroup
closed chan struct{}
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 3b745a8..e70d0d2 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -84,6 +84,9 @@
// from starving others. A batch is stored as a contiguous set of log records
// ending with one record having the "continued" flag set to false.
func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) {
+ s.thLock.Lock()
+ defer s.thLock.Unlock()
+
vlog.VI(2).Infof("sync: processDatabase: begin: %s, %s", appName, dbName)
defer vlog.VI(2).Infof("sync: processDatabase: end: %s, %s", appName, dbName)