syncbase/vsync: Move the generation cutting from the initiator to the
syncgroup client-side responders and the sync watcher. This eliminates
the need for synchronization between these goroutines. However, the
responder is modified to suppress the echo that results from this
relaxation.

Change-Id: Id5f2096e4ec661fb6b639f180d04194bbd80fdb3
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index c1f3cd0..0441dd8 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -431,48 +431,15 @@
 //
 // TODO(hpucha): Refactor this code with computeDelta code in sync_state.go.
 func (iSt *initiationState) prepareDataDeltaReq(ctx *context.T) error {
-	iSt.config.sync.thLock.Lock()
-	defer iSt.config.sync.thLock.Unlock()
-
 	// isDbSyncable reads the in-memory syncState for this db to verify if
 	// it is allowed to sync or not. This state is mutated by watcher based
 	// on incoming pause/resume requests.
-	// The requirement for pause is that any write after the pause must not
-	// be synced until sync is resumed. The same for resume is that every write
-	// before resume must be seen and added to sync datastructure before sync
-	// resumes.
-	// thLock is used to achieve the above requirements. Watcher acquires this
-	// lock when it processes new entries in the queue. Before cutting a new
-	// generation the initiator acquires this lock and then checks if the
-	// database is syncable or not. If it is, it cuts a new generation and then
-	// releases the thLock. This makes sure that the generation cut by the
-	// initiator is either before sync is paused or no generation is cut and
-	// initiator aborts.
-	// Note: Responder does not need to acquire thLock since it uses the
-	// generation cut by initiator. See responder.go for more details.
 	if !iSt.config.sync.isDbSyncable(ctx, iSt.config.dbId) {
 		// The database is offline. Skip the db till it becomes syncable again.
 		vlog.VI(1).Infof("sync: prepareDataDeltaReq: database not allowed to sync, skipping sync on db %v", iSt.config.dbId)
 		return interfaces.NewErrDbOffline(ctx, iSt.config.dbId)
 	}
 
-	// Freeze the most recent batch of local changes before fetching
-	// remote changes from a peer. This frozen state is used by the
-	// responder when responding to GetDeltas RPC.
-	//
-	// We only allow an initiator to freeze local generations (not
-	// responders/watcher) in order to maintain a static baseline
-	// for the duration of a sync. This addresses the following race
-	// condition: If we allow responders to use newer local
-	// generations while the initiator is in progress, they may beat
-	// the initiator and send these new generations to remote
-	// devices.  These remote devices in turn can send these
-	// generations back to the initiator in progress which was
-	// started with older generation information.
-	if err := iSt.config.sync.checkptLocalGen(ctx, iSt.config.dbId, nil); err != nil {
-		return err
-	}
-
 	local, lgen, err := iSt.config.sync.copyDbGenInfo(ctx, iSt.config.dbId, nil)
 	if err != nil {
 		return err
@@ -543,8 +510,6 @@
 // knowledge for the initiator to send to the responder, and prepares the
 // request to start the syncgroup sync.
 func (iSt *initiationState) prepareSGDeltaReq(ctx *context.T) error {
-	iSt.config.sync.thLock.Lock()
-	defer iSt.config.sync.thLock.Unlock()
 
 	if !iSt.config.sync.isDbSyncable(ctx, iSt.config.dbId) {
 		// The database is offline. Skip the db till it becomes syncable again.
@@ -552,10 +517,6 @@
 		return interfaces.NewErrDbOffline(ctx, iSt.config.dbId)
 	}
 
-	if err := iSt.config.sync.checkptLocalGen(ctx, iSt.config.dbId, iSt.config.sgIds); err != nil {
-		return err
-	}
-
 	var err error
 	iSt.local, _, err = iSt.config.sync.copyDbGenInfo(ctx, iSt.config.dbId, iSt.config.sgIds)
 	if err != nil {
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 1e9add3..8b79979 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -91,10 +91,16 @@
 	return rSt, nil
 }
 
-// sendDeltasPerDatabase sends to an initiator all the missing generations
-// corresponding to the prefixes requested for this Database, and genvectors
-// summarizing the knowledge transferred from the responder to the
-// initiator. This happens in three phases:
+// sendDeltasPerDatabase sends to an initiator from the requesting Syncbase all
+// the missing generations corresponding to the prefixes requested for this
+// Database, and genvectors summarizing the knowledge transferred from the
+// responder to the initiator. Note that the responder does not send any
+// generations corresponding to the requesting Syncbase even if this responder
+// is ahead of the initiator in its knowledge. The responder can be ahead since
+// the responder on the requesting Syncbase can communicate generations newer
+// than when its initiator began its work.
+//
+// The responder operates in three phases:
 //
 // In the first phase, the initiator is checked against the syncgroup ACLs of
 // all the syncgroups it is requesting, and only those prefixes that belong to
@@ -131,11 +137,6 @@
 	// logging level specified is enabled.
 	vlog.VI(3).Infof("sync: sendDeltasPerDatabase: recvd %v: sgids %v, genvecs %v, sg %v", rSt.dbId, rSt.sgIds, rSt.initVecs, rSt.sg)
 
-	// There is no need to acquire syncService.thLock since responder uses
-	// the generation cut by initiator. The initiator maintains the atomicity
-	// of operations performed within a pause-resume block when a generation is
-	// cut. Hence either none of these operations are present within the
-	// generation or all of them are present.
 	if !rSt.sync.isDbSyncable(ctx, rSt.dbId) {
 		// The database is offline. Skip the db till it becomes syncable again.
 		vlog.VI(1).Infof("sync: sendDeltasPerDatabase: database not allowed to sync, skipping sync on db %v", rSt.dbId)
@@ -361,6 +362,18 @@
 	// Init the min heap, one entry per device in the diff.
 	mh := make(minHeap, 0, len(rSt.diff))
 	for dev, r := range rSt.diff {
+		// Do not send generations belonging to the initiator.
+		//
+		// TODO(hpucha): This needs to be relaxed to handle the case
+		// when the initiator has locally destroyed its
+		// database/collection, and has rejoined the syncgroup. In this
+		// case, it would like its data sent back as well. Perhaps the
+		// initiator's request will contain an indication about its
+		// status for the responder to distinguish between the 2 cases.
+		if syncbaseIdToName(dev) == rSt.initiator {
+			continue
+		}
+
 		r.cur = r.min
 		rec, err := getNextLogRec(ctx, rSt.st, pfx, dev, r)
 		if err != nil {
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index ccab758..776918e 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -55,22 +55,6 @@
 	rng     *rand.Rand
 	rngLock sync.Mutex
 
-	// High-level lock to serialize the watcher and the initiator. This lock is
-	// needed to handle the following cases: (a) When the initiator is
-	// cutting a local generation, it waits for the watcher to commit the
-	// latest local changes before including them in the checkpoint. (b)
-	// When the initiator is receiving updates, it reads the latest head of
-	// an object as per the DAG state in order to construct the in-memory
-	// graft map used for conflict detection. At the same time, if a watcher
-	// is processing local updates, it may move the object head. Hence the
-	// initiator and watcher contend on the DAG head of an object. Instead
-	// of retrying a transaction which causes the entire delta to be
-	// replayed, we use pessimistic locking to serialize the initiator and
-	// the watcher.
-	//
-	// TODO(hpucha): This is a temporary hack.
-	thLock sync.RWMutex
-
 	// State to coordinate shutdown of spawned goroutines.
 	pending sync.WaitGroup
 	closed  chan struct{}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 92eb53f..fb251b4 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -711,6 +711,10 @@
 
 	ss.initSyncStateInMem(ctx, dbId, sgOID(gid))
 
+	if err := ss.checkptSgLocalGen(ctx, dbId, gid); err != nil {
+		return err
+	}
+
 	// Advertise the Syncbase at the chosen mount table and in the
 	// neighborhood.
 	if err := ss.advertiseSyncbase(ctx, call, sg); err != nil {
@@ -957,7 +961,8 @@
 	}
 
 	ss := sd.sync.(*syncService)
-	gid := SgIdToGid(sd.db.Id(), sgId)
+	dbId := sd.db.Id()
+	gid := SgIdToGid(dbId, sgId)
 
 	err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
 		// Check permissions on Database.
@@ -1001,19 +1006,37 @@
 		// advertising to change.
 
 		// Reserve a log generation and position counts for the new syncgroup.
-		gen, pos := ss.reserveGenAndPosInDbLog(ctx, sd.db.Id(), sgOID(gid), 1)
+		gen, pos := ss.reserveGenAndPosInDbLog(ctx, dbId, sgOID(gid), 1)
 
 		newVersion := ss.newSyncgroupVersion()
 		sg.Spec = spec
 		sg.SpecVersion = newVersion
 		return ss.updateSyncgroupVersioning(ctx, tx, gid, newVersion, true, ss.id, gen, pos, sg)
 	})
-	return err
+
+	if err != nil {
+		return err
+	}
+
+	return ss.checkptSgLocalGen(ctx, dbId, gid)
 }
 
 //////////////////////////////
 // Helper functions
 
+// checkptSgLocalGen cuts a local generation for the specified syncgroup to
+// capture its updates.
+func (s *syncService) checkptSgLocalGen(ctx *context.T, dbId wire.Id, sgid interfaces.GroupId) error {
+	// Cut a new generation to capture the syncgroup updates. Unlike the
+	// interaction between the watcher and the pause/resume bits, there is
+	// no guarantee of ordering between the syncgroup updates that are
+	// synced and when sync is paused/resumed. This is because in the
+	// current design syncgroup updates are not serialized through the watch
+	// queue.
+	sgs := sgSet{sgid: struct{}{}}
+	return s.checkptLocalGen(ctx, dbId, sgs)
+}
+
 // publishSyncgroup publishes the syncgroup at the remote peer and update its
 // status.  If the publish operation is either successful or rejected by the
 // peer, the status is updated to "running" or "rejected" respectively and the
@@ -1026,6 +1049,14 @@
 	ss := sd.sync.(*syncService)
 	dbId := sd.db.Id()
 
+	// If this admin is offline, it shouldn't attempt to publish the syncgroup
+	// since it would be unable to send out the new syncgroup updates. However, it
+	// is still possible that the admin goes offline right after processing the
+	// request.
+	if !ss.isDbSyncable(ctx, dbId) {
+		return interfaces.NewErrDbOffline(ctx, dbId)
+	}
+
 	gid := SgIdToGid(dbId, sgId)
 	version, err := getSyncgroupVersion(ctx, st, gid)
 	if err != nil {
@@ -1114,8 +1145,10 @@
 	if err != nil {
 		vlog.Errorf("sync: publishSyncgroup: cannot update syncgroup %v status to %s: %v",
 			sgId, status.String(), err)
+		return err
 	}
-	return err
+
+	return ss.checkptSgLocalGen(ctx, dbId, gid)
 }
 
 // bootstrapSyncgroup inserts into the transaction log a syncgroup operation and
@@ -1240,7 +1273,7 @@
 	for _, svc := range neighbors {
 		for _, addr := range svc.Addresses {
 			ctx, cancel := context.WithTimeout(ctxIn, peerConnectionTimeout)
-			// TODO(fredq) check that the service at addr has the expectedSyncbaseBlessings
+			// TODO(fredq): check that the service at addr has the expectedSyncbaseBlessings.
 			c := interfaces.SyncClient(naming.Join(addr, common.SyncbaseSuffix))
 			sg, vers, gv, err := c.JoinSyncgroupAtAdmin(ctx, dbId, sgId, localSyncbaseName, myInfo)
 			cancel()
@@ -1332,17 +1365,19 @@
 		return s.addSyncgroup(ctx, tx, version, false, publisher, genvec, 0, 0, 0, &sg)
 	})
 
-	if err == nil {
-		s.initSyncStateInMem(ctx, sg.DbId, sgOID(gid))
-
-		// Advertise the Syncbase at the chosen mount table and in the
-		// neighborhood.
-		//
-		// TODO(hpucha): Implement failure handling. See note in
-		// CreateSyncgroup for more details.
-		err = s.advertiseSyncbase(ctx, call, &sg)
+	if err != nil {
+		return s.name, err
 	}
 
+	s.initSyncStateInMem(ctx, sg.DbId, sgOID(gid))
+
+	// Advertise the Syncbase at the chosen mount table and in the
+	// neighborhood.
+	//
+	// TODO(hpucha): Implement failure handling. See note in
+	// CreateSyncgroup for more details.
+	err = s.advertiseSyncbase(ctx, call, &sg)
+
 	return s.name, err
 }
 
@@ -1352,6 +1387,13 @@
 
 	nullSG, nullGV := interfaces.Syncgroup{}, interfaces.GenVector{}
 
+	// If this admin is offline, it shouldn't accept the join request since it
+	// would be unable to send out the new syncgroup updates. However, it is still
+	// possible that the admin goes offline right after processing the request.
+	if !s.isDbSyncable(ctx, dbId) {
+		return nullSG, "", nullGV, interfaces.NewErrDbOffline(ctx, dbId)
+	}
+
 	// Find the database store for this syncgroup.
 	dbSt, err := s.getDbStore(ctx, call, dbId)
 	if err != nil {
@@ -1409,6 +1451,10 @@
 		return nullSG, "", nullGV, err
 	}
 
+	if err := s.checkptSgLocalGen(ctx, dbId, gid); err != nil {
+		return nullSG, "", nullGV, err
+	}
+
 	sgs := sgSet{gid: struct{}{}}
 	gv, _, err := s.copyDbGenInfo(ctx, dbId, sgs)
 	if err != nil {
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index 8dc34d3..b14fcbc 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -97,9 +97,6 @@
 // records ending with one record having the "continued" flag set to false.  The
 // call returns true if a new batch update was processed.
 func (s *syncService) processDatabase(ctx *context.T, dbId wire.Id, st store.Store) bool {
-	s.thLock.Lock()
-	defer s.thLock.Unlock()
-
 	vlog.VI(2).Infof("sync: processDatabase: begin: %v", dbId)
 	defer vlog.VI(2).Infof("sync: processDatabase: end: %v", dbId)
 
@@ -131,6 +128,26 @@
 		// TODO(rdaoud): quarantine this database.
 		return false
 	}
+
+	// The requirement for pause is that any write after the pause must not
+	// be synced until sync is resumed. The same for resume is that every
+	// write before resume must be seen and added to sync data structures
+	// before sync resumes.
+	//
+	// Hence, at the end of processing a batch, the watcher checks if sync is
+	// paused/resumed. If it is paused, it does not cut any more generations for
+	// future batches. If sync is resumed, it cuts a new generation. Cutting a
+	// generation freezes the most recent batch of local changes. This frozen
+	// state is used by the responder when responding to GetDeltas RPC.
+	if s.isDbSyncable(ctx, dbId) {
+		// The database is online. Cut a gen.
+		if err := s.checkptLocalGen(ctx, dbId, nil); err != nil {
+			vlog.Errorf("sync: processDatabase: %v: cannot cut a generation: %v", dbId, verror.DebugString(err))
+			return false
+		}
+	} else {
+		vlog.VI(1).Infof("sync: processDatabase: %v database not allowed to sync, skipping cutting a gen", dbId)
+	}
 	return true
 }