syncbase/vsync: eager sync watcher

Make the sync watcher more eager to consume updates from databases by
continuing to process updates as long at it finds some.  Remain fair by
doing a round-robin across databases, taking one batch update from each
database at a time.  Continue doing passes across the databases until
all of them have no further updates pending.

Change-Id: Iaf49be67d5400bd7d57cd0ec954c4da29d3d2cab
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index e70d0d2..8f5f60c 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -71,10 +71,29 @@
 			return
 
 		case <-ticker.C:
-			s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
-				s.processDatabase(ctx, appName, dbName, st)
-				return false
-			})
+			s.processStoreUpdates(ctx)
+		}
+	}
+}
+
+// processStoreUpdates fetches updates from all databases and processes them.
+// To maintain fairness among databases, it processes one batch update from
+// each database, in a round-robin manner, until there are no further updates
+// from any database.
+func (s *syncService) processStoreUpdates(ctx *context.T) {
+	for {
+		total, active := 0, 0
+		s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
+			if s.processDatabase(ctx, appName, dbName, st) {
+				active++
+			}
+			total++
+			return false
+		})
+
+		vlog.VI(2).Infof("sync: processStoreUpdates: %d/%d databases had updates", active, total)
+		if active == 0 {
+			break
 		}
 	}
 }
@@ -82,8 +101,9 @@
 // processDatabase fetches from the given database at most one new batch update
 // (transaction) and processes it.  The one-batch limit prevents one database
 // from starving others.  A batch is stored as a contiguous set of log records
-// ending with one record having the "continued" flag set to false.
-func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) {
+// ending with one record having the "continued" flag set to false.  The call
+// returns true if a new batch update was processed.
+func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) bool {
 	s.thLock.Lock()
 	defer s.thLock.Unlock()
 
@@ -94,7 +114,7 @@
 	if err != nil {
 		if verror.ErrorID(err) != verror.ErrNoExist.ID {
 			vlog.Errorf("sync: processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
-			return
+			return false
 		}
 		resMark = ""
 	}
@@ -105,7 +125,9 @@
 	// Get a batch of watch log entries, if any, after this resume marker.
 	if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
 		s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
+		return true
 	}
+	return false
 }
 
 // processWatchLogBatch parses the given batch of watch log records, updates the