v.io/x/ref/services/syncbase: start ServerBlobFetcher() in server,NewService()

This change moves the starting of the vsync.ServerBlobFetcher()
back into server,NewService(), and wait for it to exit on service.Close().
I've checked (with printfs, now removed) that this is indeed being invoked by some
existing tests.

Some of the prerequisites for the blob fetcher are not set up in tests of
vsync, which was causing some error messages to be generated.

Change-Id: I18e969924bf3a085af602fc8bf71603ec6b22b1c
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index c51d939..87b1e09 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -43,11 +43,13 @@
 
 // service is a singleton (i.e. not per-request) that handles Service RPCs.
 type service struct {
-	st      store.Store // keeps track of which databases exist, etc.
-	sync    interfaces.SyncServerMethods
-	vclock  *vclock.VClock
-	vclockD *vclock.VClockD
-	opts    ServiceOptions
+	st                      store.Store // keeps track of which databases exist, etc.
+	sync                    interfaces.SyncServerMethods
+	vclock                  *vclock.VClock
+	vclockD                 *vclock.VClockD
+	opts                    ServiceOptions
+	cancelServerBlobFetcher context.CancelFunc
+	serverBlobFetcherDone   sync.WaitGroup
 	// Guards the fields below. Held during database Create, Delete, and
 	// SetPermissions.
 	mu  sync.Mutex
@@ -221,6 +223,12 @@
 		vsync.NewSyncDatabase(d).StartStoreWatcher(ctx)
 	}
 
+	// Start a blob fetcher, setting up for stopping it in Close().
+	var serverBlobFetcherCtx *context.T
+	serverBlobFetcherCtx, s.cancelServerBlobFetcher = context.WithCancel(ctx)
+	s.serverBlobFetcherDone.Add(1)
+	go vsync.ServerBlobFetcher(serverBlobFetcherCtx, s.sync, &s.serverBlobFetcherDone)
+
 	// Start the vclock daemon. For now, we disable NTP when running in dev mode.
 	// If we decide to change this behavior, we'll need to update the tests in
 	// v.io/v23/syncbase/featuretests/vclock_v23_test.go to disable NTP some other
@@ -300,9 +308,11 @@
 //
 // TODO(hpucha): Close or cleanup Syncbase database data structures.
 func (s *service) Close() {
+	s.cancelServerBlobFetcher()
 	s.vclockD.Close()
 	vsync.Close(s.sync)
 	s.st.Close()
+	s.serverBlobFetcherDone.Wait()
 }
 
 ////////////////////////////////////////
diff --git a/services/syncbase/vsync/server_blob_fetcher.go b/services/syncbase/vsync/server_blob_fetcher.go
index ec38aea..76f8719 100644
--- a/services/syncbase/vsync/server_blob_fetcher.go
+++ b/services/syncbase/vsync/server_blob_fetcher.go
@@ -6,6 +6,7 @@
 
 import "container/heap"
 import "math/rand"
+import "sync"
 import "time"
 
 import "v.io/v23/context"
@@ -319,7 +320,8 @@
 // with gaps specified by parameters in parameters.go before scanning them all again.  It
 // stops only if the context *ctx is cancelled.  The function fetchFunc() will
 // be used to fetch blobs, and passed the argument clientData on each call.
-func ServerBlobFetcher(ctx *context.T, ssm interfaces.SyncServerMethods) {
+// If done!=nil, done.Done() is called just before the routine returns.
+func ServerBlobFetcher(ctx *context.T, ssm interfaces.SyncServerMethods, done *sync.WaitGroup) {
 	bf := NewBlobFetcher(ctx, serverBlobFetchConcurrency)
 	ss := ssm.(*syncService)
 	var delay time.Duration = serverBlobFetchInitialScanDelay
@@ -340,4 +342,8 @@
 		}
 		delay = serverBlobFetchExtraScanDelay + serverBlobFetchScanDelayMultiplier*time.Since(startTime)
 	}
+	bf.WaitForExit()
+	if done != nil {
+		done.Done()
+	}
 }
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 3d7e4fe..20eabc5 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -234,9 +234,6 @@
 	// Start the discovery service thread to listen to neighborhood updates.
 	go s.discoverNeighborhood(ctx)
 
-	// Start a blob fetcher.
-	go ServerBlobFetcher(ctx, s)
-
 	return s, nil
 }