syncbase/vsync: DAG grafting outside transaction
Allow DAG grafting to read the head of an object from the store without
contaminating the read-set of the Initiator's transaction that stores
the newly received deltas.
Change-Id: If81d124c716ef4e4bb3cddbef9318ef9ebc507f6
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index 2caa50d..1d0eacd 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -113,8 +113,14 @@
// batchSet holds information on a set of write batches.
type batchSet map[uint64]*batchInfo
-// graftMap holds the state of DAG node grafting (attaching) per object.
-type graftMap map[string]*graftInfo
+// graftMap holds the state of DAG node grafting (attaching) per object. It
+// holds a store handle to use when reading the object heads during grafting
+// operations. This avoids contaminating the transaction read-set of the
+// caller (typically the Initiator storing newly received deltas).
+type graftMap struct {
+ objGraft map[string]*graftInfo
+ st store.Store
+}
// graftInfo holds the state of an object's node grafting in the DAG.
// It is ephemeral (in-memory), used during a single sync operation to track
@@ -247,7 +253,7 @@
//
// The grafting structure is not needed when nodes are being added locally by
// the Watcher, passing a nil grafting structure.
-func (s *syncService) addNode(ctx *context.T, tx store.Transaction, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error {
+func (s *syncService) addNode(ctx *context.T, tx store.Transaction, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft *graftMap) error {
if parents != nil {
if len(parents) > 2 {
return verror.New(verror.ErrInternal, ctx, "cannot have more than 2 parents")
@@ -347,7 +353,7 @@
// to track DAG attachements during a sync operation. It is not needed if the
// parent linkage is due to a local change (from conflict resolution selecting
// an existing version).
-func (s *syncService) addParent(ctx *context.T, tx store.Transaction, oid, version, parent string, graft graftMap) error {
+func (s *syncService) addParent(ctx *context.T, tx store.Transaction, oid, version, parent string, graft *graftMap) error {
if version == parent {
return verror.New(verror.ErrInternal, ctx, "object", oid, version, "cannot be its own parent")
}
@@ -435,7 +441,7 @@
// head is different from the current local head. If there is a single new-head
// and the snapshot head is the same as the current local head, the object
// changes were applied without triggering a conflict.
-func hasConflict(ctx *context.T, st store.StoreReader, oid string, graft graftMap) (isConflict bool, newHead, oldHead, ancestor string, err error) {
+func hasConflict(ctx *context.T, st store.StoreReader, oid string, graft *graftMap) (isConflict bool, newHead, oldHead, ancestor string, err error) {
isConflict = false
oldHead = NoVersion
newHead = NoVersion
@@ -447,7 +453,7 @@
return
}
- info := graft[oid]
+ info := graft.objGraft[oid]
if info == nil {
err = verror.New(verror.ErrInternal, ctx, "node", oid, "has no DAG graft info")
return
@@ -521,18 +527,22 @@
return
}
-// newGraft allocates a graftMap to track DAG node grafting during sync.
-func newGraft() graftMap {
- return make(graftMap)
+// newGraft allocates a graftMap to track DAG node grafting during sync. It is
+// given a handle to a store to use for its own reading of object head nodes.
+func newGraft(st store.Store) *graftMap {
+ return &graftMap{
+ objGraft: make(map[string]*graftInfo),
+ st: st,
+ }
}
// getObjectGraft returns the graftInfo for an object ID. If the graftMap is
// nil, a nil graftInfo is returned because grafting is not being tracked.
-func getObjectGraftInfo(ctx *context.T, sntx store.SnapshotOrTransaction, graft graftMap, oid string) *graftInfo {
+func getObjectGraftInfo(ctx *context.T, sntx store.SnapshotOrTransaction, graft *graftMap, oid string) *graftInfo {
if graft == nil {
return nil
}
- if info := graft[oid]; info != nil {
+ if info := graft.objGraft[oid]; info != nil {
return info
}
@@ -543,12 +553,20 @@
}
// If the object has a head node, include it in the set of new heads.
- if head, err := getHead(ctx, sntx, oid); err == nil {
+ // Note: use the store handle of the graftMap if available to avoid
+ // contaminating the read-set of the caller's transaction. Otherwise
+ // use the caller's transaction.
+ var st store.StoreReader
+ st = graft.st
+ if st == nil {
+ st = sntx
+ }
+ if head, err := getHead(ctx, st, oid); err == nil {
info.newHeads[head] = true
info.oldHeadSnap = head
}
- graft[oid] = info
+ graft.objGraft[oid] = info
return info
}
@@ -818,15 +836,15 @@
// getParentMap is a testing and debug helper function that returns for an
// object a map of its DAG (node-to-parents relations). If a graft structure
// is given, include its fragments in the map.
-func getParentMap(ctx *context.T, st store.StoreReader, oid string, graft graftMap) map[string][]string {
+func getParentMap(ctx *context.T, st store.StoreReader, oid string, graft *graftMap) map[string][]string {
parentMap := make(map[string][]string)
var start []string
if head, err := getHead(ctx, st, oid); err == nil {
start = append(start, head)
}
- if graft != nil && graft[oid] != nil {
- for v := range graft[oid].newHeads {
+ if graft != nil && graft.objGraft[oid] != nil {
+ for v := range graft.objGraft[oid].newHeads {
start = append(start, v)
}
}
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index 57291a4..709df1c 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -128,7 +128,7 @@
}
tx.Commit()
- graft := newGraft()
+ graft := newGraft(st)
tx = st.NewTransaction()
if err := s.addParent(nil, tx, oid, version, version, graft); err == nil {
t.Errorf("addParent() did not fail on a self-parent for object %s:%s", oid, version)
@@ -154,7 +154,7 @@
}
tx.Commit()
- var g graftMap
+ var g *graftMap
if remote {
g = graft
}
@@ -311,7 +311,7 @@
}
// Verify the grafting of remote nodes.
- g := graft[oid]
+ g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true}
@@ -388,7 +388,7 @@
}
// Verify the grafting of remote nodes.
- g := graft[oid]
+ g := graft.objGraft[oid]
expNewHeads := map[string]bool{"6": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
@@ -471,7 +471,7 @@
}
// Verify the grafting of remote nodes.
- g := graft[oid]
+ g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true, "6": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
@@ -564,7 +564,7 @@
}
// Verify the grafting of remote nodes.
- g := graft[oid]
+ g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true, "6": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
@@ -651,7 +651,7 @@
}
// Verify the grafting of remote nodes.
- g := graft[oid]
+ g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true, "6": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
@@ -956,7 +956,7 @@
}
// Verify the grafting of remote nodes.
- g := graft[oid]
+ g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
@@ -981,7 +981,7 @@
t.Errorf("hasConflict() on %v did not fail with a nil graft map: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, newGraft())
+ isConflict, newHead, oldHead, ancestor, errConflict = hasConflict(nil, st, oid, newGraft(st))
if errConflict == nil {
t.Errorf("hasConflict() on %v did not fail with an empty graft map: flag %t, newHead %s, oldHead %s, ancestor %s, err %v",
oid, isConflict, newHead, oldHead, ancestor, errConflict)
@@ -1026,7 +1026,7 @@
}
// Verify the grafting of remote nodes.
- g := graft[oid]
+ g := graft.objGraft[oid]
expNewHeads := map[string]bool{"3": true, "4": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
@@ -1086,7 +1086,7 @@
}
// Verify the grafting of remote nodes.
- g := graft[oid]
+ g := graft.objGraft[oid]
expNewHeads := map[string]bool{"4": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
@@ -1148,7 +1148,7 @@
}
// Verify the grafting of remote nodes.
- g := graft[oid]
+ g := graft.objGraft[oid]
expNewHeads := map[string]bool{"5": true}
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
@@ -1185,7 +1185,7 @@
t.Errorf("object %s has wrong head: %s", oid, head)
}
- g = graft[oid]
+ g = graft.objGraft[oid]
if !reflect.DeepEqual(g.newHeads, expNewHeads) {
t.Errorf("object %s has invalid newHeads: (%v) instead of (%v)", oid, g.newHeads, expNewHeads)
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 5875e21..94f79e1 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -180,7 +180,7 @@
remote interfaces.GenVector // generation vector from the remote peer.
updLocal interfaces.GenVector // updated local generation vector at the end of sync round.
updObjects map[string]*objConflictState // tracks updated objects during a log replay.
- dagGraft graftMap // DAG state that tracks conflicts and common ancestors.
+ dagGraft *graftMap // DAG state that tracks conflicts and common ancestors.
req interfaces.DeltaReq // GetDeltas RPC request.
stream interfaces.SyncGetDeltasClientCall // stream handle for the GetDeltas RPC.
@@ -243,7 +243,7 @@
iSt := &initiationState{}
iSt.config = c
iSt.updObjects = make(map[string]*objConflictState)
- iSt.dagGraft = newGraft()
+ iSt.dagGraft = newGraft(c.st)
iSt.sg = sg
return iSt
}
@@ -436,18 +436,6 @@
// resolution during replay. This avoids resolving conflicts that have already
// been resolved by other devices.
func (iSt *initiationState) recvAndProcessDeltas(ctx *context.T) error {
- // This is to handle issues with graftMap in DAG. Ideally, the
- // transaction created to store all the deltas received over the network
- // should not contend with any other store changes since this is all
- // brand new information. However, as log records are received over the
- // network, they are also incrementally processed. To enable incremental
- // processing, the current head of each dirty object is read to populate
- // the graftMap. This read can potentially contend with the watcher
- // updating the head of an object. This lock prevents that contention in
- // order to avoid retrying the whole transaction.
- iSt.config.sync.thLock.Lock()
- defer iSt.config.sync.thLock.Unlock()
-
// TODO(hpucha): This works for now, but figure out a long term solution
// as this may be implementation dependent. It currently works because
// the RecvStream call is stateless, and grabbing a handle to it
diff --git a/services/syncbase/vsync/replay_test.go b/services/syncbase/vsync/replay_test.go
index e13eab6..272b945 100644
--- a/services/syncbase/vsync/replay_test.go
+++ b/services/syncbase/vsync/replay_test.go
@@ -179,14 +179,14 @@
// dagReplayCommands parses a sync test file and replays its commands, updating
// the DAG structures associated with the sync service.
-func (s *syncService) dagReplayCommands(ctx *context.T, syncfile string) (graftMap, error) {
+func (s *syncService) dagReplayCommands(ctx *context.T, syncfile string) (*graftMap, error) {
cmds, err := parseSyncCommands(syncfile)
if err != nil {
return nil, err
}
st := s.sv.St()
- graft := newGraft()
+ graft := newGraft(st)
for _, cmd := range cmds {
tx := st.NewTransaction()