syncbase/vsync: Move the generation cutting from the initiator to the
syncgroup client-side responders and the sync watcher. This eliminates
the need for synchronization between these goroutines. However, the
responder is modified to suppress the echo that results from this
relaxation.
Change-Id: Id5f2096e4ec661fb6b639f180d04194bbd80fdb3
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index c1f3cd0..0441dd8 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -431,48 +431,15 @@
//
// TODO(hpucha): Refactor this code with computeDelta code in sync_state.go.
func (iSt *initiationState) prepareDataDeltaReq(ctx *context.T) error {
- iSt.config.sync.thLock.Lock()
- defer iSt.config.sync.thLock.Unlock()
-
// isDbSyncable reads the in-memory syncState for this db to verify if
// it is allowed to sync or not. This state is mutated by watcher based
// on incoming pause/resume requests.
- // The requirement for pause is that any write after the pause must not
- // be synced until sync is resumed. The same for resume is that every write
- // before resume must be seen and added to sync datastructure before sync
- // resumes.
- // thLock is used to achieve the above requirements. Watcher acquires this
- // lock when it processes new entries in the queue. Before cutting a new
- // generation the initiator acquires this lock and then checks if the
- // database is syncable or not. If it is, it cuts a new generation and then
- // releases the thLock. This makes sure that the generation cut by the
- // initiator is either before sync is paused or no generation is cut and
- // initiator aborts.
- // Note: Responder does not need to acquire thLock since it uses the
- // generation cut by initiator. See responder.go for more details.
if !iSt.config.sync.isDbSyncable(ctx, iSt.config.dbId) {
// The database is offline. Skip the db till it becomes syncable again.
vlog.VI(1).Infof("sync: prepareDataDeltaReq: database not allowed to sync, skipping sync on db %v", iSt.config.dbId)
return interfaces.NewErrDbOffline(ctx, iSt.config.dbId)
}
- // Freeze the most recent batch of local changes before fetching
- // remote changes from a peer. This frozen state is used by the
- // responder when responding to GetDeltas RPC.
- //
- // We only allow an initiator to freeze local generations (not
- // responders/watcher) in order to maintain a static baseline
- // for the duration of a sync. This addresses the following race
- // condition: If we allow responders to use newer local
- // generations while the initiator is in progress, they may beat
- // the initiator and send these new generations to remote
- // devices. These remote devices in turn can send these
- // generations back to the initiator in progress which was
- // started with older generation information.
- if err := iSt.config.sync.checkptLocalGen(ctx, iSt.config.dbId, nil); err != nil {
- return err
- }
-
local, lgen, err := iSt.config.sync.copyDbGenInfo(ctx, iSt.config.dbId, nil)
if err != nil {
return err
@@ -543,8 +510,6 @@
// knowledge for the initiator to send to the responder, and prepares the
// request to start the syncgroup sync.
func (iSt *initiationState) prepareSGDeltaReq(ctx *context.T) error {
- iSt.config.sync.thLock.Lock()
- defer iSt.config.sync.thLock.Unlock()
if !iSt.config.sync.isDbSyncable(ctx, iSt.config.dbId) {
// The database is offline. Skip the db till it becomes syncable again.
@@ -552,10 +517,6 @@
return interfaces.NewErrDbOffline(ctx, iSt.config.dbId)
}
- if err := iSt.config.sync.checkptLocalGen(ctx, iSt.config.dbId, iSt.config.sgIds); err != nil {
- return err
- }
-
var err error
iSt.local, _, err = iSt.config.sync.copyDbGenInfo(ctx, iSt.config.dbId, iSt.config.sgIds)
if err != nil {
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 1e9add3..8b79979 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -91,10 +91,16 @@
return rSt, nil
}
-// sendDeltasPerDatabase sends to an initiator all the missing generations
-// corresponding to the prefixes requested for this Database, and genvectors
-// summarizing the knowledge transferred from the responder to the
-// initiator. This happens in three phases:
+// sendDeltasPerDatabase sends to an initiator from the requesting Syncbase all
+// the missing generations corresponding to the prefixes requested for this
+// Database, and genvectors summarizing the knowledge transferred from the
+// responder to the initiator. Note that the responder does not send any
+// generations corresponding to the requesting Syncbase even if this responder
+// is ahead of the initiator in its knowledge. The responder can be ahead since
+// the responder on the requesting Syncbase can communicate generations newer
+// than when its initiator began its work.
+//
+// The responder operates in three phases:
//
// In the first phase, the initiator is checked against the syncgroup ACLs of
// all the syncgroups it is requesting, and only those prefixes that belong to
@@ -131,11 +137,6 @@
// logging level specified is enabled.
vlog.VI(3).Infof("sync: sendDeltasPerDatabase: recvd %v: sgids %v, genvecs %v, sg %v", rSt.dbId, rSt.sgIds, rSt.initVecs, rSt.sg)
- // There is no need to acquire syncService.thLock since responder uses
- // the generation cut by initiator. The initiator maintains the atomicity
- // of operations performed within a pause-resume block when a generation is
- // cut. Hence either none of these operations are present within the
- // generation or all of them are present.
if !rSt.sync.isDbSyncable(ctx, rSt.dbId) {
// The database is offline. Skip the db till it becomes syncable again.
vlog.VI(1).Infof("sync: sendDeltasPerDatabase: database not allowed to sync, skipping sync on db %v", rSt.dbId)
@@ -361,6 +362,18 @@
// Init the min heap, one entry per device in the diff.
mh := make(minHeap, 0, len(rSt.diff))
for dev, r := range rSt.diff {
+ // Do not send generations belonging to the initiator.
+ //
+ // TODO(hpucha): This needs to be relaxed to handle the case
+ // when the initiator has locally destroyed its
+ // database/collection, and has rejoined the syncgroup. In this
+ // case, it would like its data sent back as well. Perhaps the
+ // initiator's request will contain an indication about its
+ // status for the responder to distinguish between the 2 cases.
+ if syncbaseIdToName(dev) == rSt.initiator {
+ continue
+ }
+
r.cur = r.min
rec, err := getNextLogRec(ctx, rSt.st, pfx, dev, r)
if err != nil {
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index ccab758..776918e 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -55,22 +55,6 @@
rng *rand.Rand
rngLock sync.Mutex
- // High-level lock to serialize the watcher and the initiator. This lock is
- // needed to handle the following cases: (a) When the initiator is
- // cutting a local generation, it waits for the watcher to commit the
- // latest local changes before including them in the checkpoint. (b)
- // When the initiator is receiving updates, it reads the latest head of
- // an object as per the DAG state in order to construct the in-memory
- // graft map used for conflict detection. At the same time, if a watcher
- // is processing local updates, it may move the object head. Hence the
- // initiator and watcher contend on the DAG head of an object. Instead
- // of retrying a transaction which causes the entire delta to be
- // replayed, we use pessimistic locking to serialize the initiator and
- // the watcher.
- //
- // TODO(hpucha): This is a temporary hack.
- thLock sync.RWMutex
-
// State to coordinate shutdown of spawned goroutines.
pending sync.WaitGroup
closed chan struct{}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 92eb53f..fb251b4 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -711,6 +711,10 @@
ss.initSyncStateInMem(ctx, dbId, sgOID(gid))
+ if err := ss.checkptSgLocalGen(ctx, dbId, gid); err != nil {
+ return err
+ }
+
// Advertise the Syncbase at the chosen mount table and in the
// neighborhood.
if err := ss.advertiseSyncbase(ctx, call, sg); err != nil {
@@ -957,7 +961,8 @@
}
ss := sd.sync.(*syncService)
- gid := SgIdToGid(sd.db.Id(), sgId)
+ dbId := sd.db.Id()
+ gid := SgIdToGid(dbId, sgId)
err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
// Check permissions on Database.
@@ -1001,19 +1006,37 @@
// advertising to change.
// Reserve a log generation and position counts for the new syncgroup.
- gen, pos := ss.reserveGenAndPosInDbLog(ctx, sd.db.Id(), sgOID(gid), 1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, dbId, sgOID(gid), 1)
newVersion := ss.newSyncgroupVersion()
sg.Spec = spec
sg.SpecVersion = newVersion
return ss.updateSyncgroupVersioning(ctx, tx, gid, newVersion, true, ss.id, gen, pos, sg)
})
- return err
+
+ if err != nil {
+ return err
+ }
+
+ return ss.checkptSgLocalGen(ctx, dbId, gid)
}
//////////////////////////////
// Helper functions
+// checkptSgLocalGen cuts a local generation for the specified syncgroup to
+// capture its updates.
+func (s *syncService) checkptSgLocalGen(ctx *context.T, dbId wire.Id, sgid interfaces.GroupId) error {
+ // Cut a new generation to capture the syncgroup updates. Unlike the
+ // interaction between the watcher and the pause/resume bits, there is
+ // no guarantee of ordering between the syncgroup updates that are
+ // synced and when sync is paused/resumed. This is because in the
+ // current design syncgroup updates are not serialized through the watch
+ // queue.
+ sgs := sgSet{sgid: struct{}{}}
+ return s.checkptLocalGen(ctx, dbId, sgs)
+}
+
// publishSyncgroup publishes the syncgroup at the remote peer and update its
// status. If the publish operation is either successful or rejected by the
// peer, the status is updated to "running" or "rejected" respectively and the
@@ -1026,6 +1049,14 @@
ss := sd.sync.(*syncService)
dbId := sd.db.Id()
+ // If this admin is offline, it shouldn't attempt to publish the syncgroup
+ // since it would be unable to send out the new syncgroup updates. However, it
+ // is still possible that the admin goes offline right after processing the
+ // request.
+ if !ss.isDbSyncable(ctx, dbId) {
+ return interfaces.NewErrDbOffline(ctx, dbId)
+ }
+
gid := SgIdToGid(dbId, sgId)
version, err := getSyncgroupVersion(ctx, st, gid)
if err != nil {
@@ -1114,8 +1145,10 @@
if err != nil {
vlog.Errorf("sync: publishSyncgroup: cannot update syncgroup %v status to %s: %v",
sgId, status.String(), err)
+ return err
}
- return err
+
+ return ss.checkptSgLocalGen(ctx, dbId, gid)
}
// bootstrapSyncgroup inserts into the transaction log a syncgroup operation and
@@ -1240,7 +1273,7 @@
for _, svc := range neighbors {
for _, addr := range svc.Addresses {
ctx, cancel := context.WithTimeout(ctxIn, peerConnectionTimeout)
- // TODO(fredq) check that the service at addr has the expectedSyncbaseBlessings
+ // TODO(fredq): check that the service at addr has the expectedSyncbaseBlessings.
c := interfaces.SyncClient(naming.Join(addr, common.SyncbaseSuffix))
sg, vers, gv, err := c.JoinSyncgroupAtAdmin(ctx, dbId, sgId, localSyncbaseName, myInfo)
cancel()
@@ -1332,17 +1365,19 @@
return s.addSyncgroup(ctx, tx, version, false, publisher, genvec, 0, 0, 0, &sg)
})
- if err == nil {
- s.initSyncStateInMem(ctx, sg.DbId, sgOID(gid))
-
- // Advertise the Syncbase at the chosen mount table and in the
- // neighborhood.
- //
- // TODO(hpucha): Implement failure handling. See note in
- // CreateSyncgroup for more details.
- err = s.advertiseSyncbase(ctx, call, &sg)
+ if err != nil {
+ return s.name, err
}
+ s.initSyncStateInMem(ctx, sg.DbId, sgOID(gid))
+
+ // Advertise the Syncbase at the chosen mount table and in the
+ // neighborhood.
+ //
+ // TODO(hpucha): Implement failure handling. See note in
+ // CreateSyncgroup for more details.
+ err = s.advertiseSyncbase(ctx, call, &sg)
+
return s.name, err
}
@@ -1352,6 +1387,13 @@
nullSG, nullGV := interfaces.Syncgroup{}, interfaces.GenVector{}
+ // If this admin is offline, it shouldn't accept the join request since it
+ // would be unable to send out the new syncgroup updates. However, it is still
+ // possible that the admin goes offline right after processing the request.
+ if !s.isDbSyncable(ctx, dbId) {
+ return nullSG, "", nullGV, interfaces.NewErrDbOffline(ctx, dbId)
+ }
+
// Find the database store for this syncgroup.
dbSt, err := s.getDbStore(ctx, call, dbId)
if err != nil {
@@ -1409,6 +1451,10 @@
return nullSG, "", nullGV, err
}
+ if err := s.checkptSgLocalGen(ctx, dbId, gid); err != nil {
+ return nullSG, "", nullGV, err
+ }
+
sgs := sgSet{gid: struct{}{}}
gv, _, err := s.copyDbGenInfo(ctx, dbId, sgs)
if err != nil {
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index 8dc34d3..b14fcbc 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -97,9 +97,6 @@
// records ending with one record having the "continued" flag set to false. The
// call returns true if a new batch update was processed.
func (s *syncService) processDatabase(ctx *context.T, dbId wire.Id, st store.Store) bool {
- s.thLock.Lock()
- defer s.thLock.Unlock()
-
vlog.VI(2).Infof("sync: processDatabase: begin: %v", dbId)
defer vlog.VI(2).Infof("sync: processDatabase: end: %v", dbId)
@@ -131,6 +128,26 @@
// TODO(rdaoud): quarantine this database.
return false
}
+
+ // The requirement for pause is that any write after the pause must not
+ // be synced until sync is resumed. The same for resume is that every
+ // write before resume must be seen and added to sync data structures
+ // before sync resumes.
+ //
+ // Hence, at the end of processing a batch, the watcher checks if sync is
+ // paused/resumed. If it is paused, it does not cut any more generations for
+ // future batches. If sync is resumed, it cuts a new generation. Cutting a
+ // generation freezes the most recent batch of local changes. This frozen
+ // state is used by the responder when responding to GetDeltas RPC.
+ if s.isDbSyncable(ctx, dbId) {
+ // The database is online. Cut a gen.
+ if err := s.checkptLocalGen(ctx, dbId, nil); err != nil {
+ vlog.Errorf("sync: processDatabase: %v: cannot cut a generation: %v", dbId, verror.DebugString(err))
+ return false
+ }
+ } else {
+ vlog.VI(1).Infof("sync: processDatabase: %v database not allowed to sync, skipping cutting a gen", dbId)
+ }
return true
}