syncbase blobs: Fix bug in leaf handoff of blobs, and add test for same.

v.io/v23/services/syncbase/service.vdl
        Add DevModeGetBlobShares() to the Service interface.  This allows tests
        to check the blob ownership shares in the syncbases instances they
        start.

v.io/v23/syncbase/featuretests/blob_v23_test.go
	Add checking of blob ownership shares to TestV23ServerBlobFetch().

        Add new test TestV23LeafBlobFetch() that checks that a blob (and one of
        its shares) is transferred off a leaf as soon as the leaf gets a chance
        to ask another syncbase to take it.

        The new routine getTestStructAndBlobFromRow() is a common piece from
        various other routines.  I pulled it out because I was adding the
        getBlobOwnershipShares() call.

        The checkShares() call uses DevModeGetBlobShares() to check the
        ownership shares of the various blobs and syncbases.

v.io/x/ref/services/syncbase/server/interfaces/sync.vdl
        Change RequestTakeBlob() to RequestTakeBlobs() (plural) to allow a
        single call to request that the recipient take ownership of several
        blobs.

	Add GetBlobShares(), which is used by DevModeGetBlobShares().

v.io/x/ref/services/syncbase/server/service.go
        Add DevModeGetBlobShares() which allows the client to interrogate the
        blob ownership sjhare state on the server.  It requires "-dev" mode,
        and admin access.

v.io/x/ref/services/syncbase/vsync/blob.go
	Implement RequestTakeBlobs().  The previous call RequestTakeBlob()
	merely returned a "not implemented" error.
        The new implementation iterates over all the blobs for which the
        requester is asking for shares to be taken, and queues the blob to be
        fetched iff the current server doesn't have any shares already.

        Add syncService.GetBlobShares(), which is called by
        DevModeGetBlobShares() to allow etsts to check the ownership share
        counts.

v.io/x/ref/services/syncbase/vsync/initiator.go
        In syncService.getDeltas(), after updating the syncgroup priorities
        after the GetDeltas() call, look for blob shares to handoff, and if
        there are any, call RequestTakeBlobs() to tell the responder to
	fetch the relevant blobs.

v.io/x/ref/services/syncbase/vsync/server_blob_fetcher.go
	Add an "expiry" time to blob fetch requests.
        This is useful both in the server (where it should probably stop
        fetching eventually if the blobref goes away), and for fetching for
        leaves (where it should stop fetching if the syncbase is no longer in
        communication with the leaf).

	Modify StartFetchingBlob() to take an expiry time for the fetch request,
	and to set in in the blobFetchState.

        Add SyncServiceBlobFetcher() to allow the rest of the syncService code
        to get its hands on the blobFetcher created by the ServerBlobFetcher()
        thread.

        Modify ServerBlobFetcher() to advertise the blobFetcher in the
        syncService struct.

v.io/x/ref/services/syncbase/vsync/server_blob_fetcher_test.go
	Accommodate new parameter of StartFetchingBlob().

v.io/x/ref/services/syncbase/vsync/sync.go
	Advertise the blobFetcher in the syncService struct.

v.io/x/ref/services/syncbase/vsync/syncgroup.go
	Bug fix in sgPriorityLowerThan().  In a previous change,
	I changed the order of the BlobDevType{Leaf,Normal,Server}
	constants so that "normal" would be the default (zero) value.
	At the time I said, that nothing depends on the order, but I was wrong.
	This routine did.

        Modify updateAllSyncgroupPriorities() to return, as a side-effect, the
        ownership shares that a leaf should hand off to a non-leaf after a
        GetDeltas() call.

MultiPart: 2/2
Change-Id: If38e9f5bd124af057035b01a060d7d3be4f44901
diff --git a/services/syncbase/server/interfaces/interfaces.vdl.go b/services/syncbase/server/interfaces/interfaces.vdl.go
index ded5dab..6ae24f9 100644
--- a/services/syncbase/server/interfaces/interfaces.vdl.go
+++ b/services/syncbase/server/interfaces/interfaces.vdl.go
@@ -2590,12 +2590,12 @@
 	// AcceptedBlobOwnership() call.
 	FetchBlobRecipe(_ *context.T, br syncbase.BlobRef, callerName string, mySgPriorities SgPriorities, _ ...rpc.CallOpt) (SyncFetchBlobRecipeClientCall, error)
 	FetchChunks(*context.T, ...rpc.CallOpt) (SyncFetchChunksClientCall, error)
-	// RequestTakeBlob indicates that the caller wishes the server to take
-	// some blob ownership shares for various syncgroups for the specified blob.
+	// RequestTakeBlobs indicates that the caller wishes the server to take
+	// some blob ownership shares for various syncgroups for the specified blobs.
 	// If the server chooses to act on the request, it may call FetchBlob/FetchBlobRecipe,
 	// and ultimately AcceptedBlobOwnership().
 	// callerName is the syncbase Id of the caller, expressed as a string.
-	RequestTakeBlob(_ *context.T, br syncbase.BlobRef, callerName string, shares BlobSharesBySyncgroup, _ ...rpc.CallOpt) error
+	RequestTakeBlobs(_ *context.T, callerName string, blobRefToShares map[syncbase.BlobRef]BlobSharesBySyncgroup, _ ...rpc.CallOpt) error
 	// AcceptedBlobOwnership tells the server that the client callerName (a
 	// syncbase Id expressed as a string) has accepted blob ownership of a
 	// specified number of shares for blob br.  The server may decrement
@@ -2606,6 +2606,12 @@
 	// server is likely to keep the blob itself, plus its syncbase Id
 	// expressed as a string.
 	AcceptedBlobOwnership(_ *context.T, br syncbase.BlobRef, callerName string, shares BlobSharesBySyncgroup, _ ...rpc.CallOpt) (serverName string, keepingBlob bool, _ error)
+	// GetBlobShares returns the number of ownership shares for the specified blob
+	// held by the server.  It is used by the DevModeGetBlobShares() call in the
+	// service.  It uses the "map[string]int32" type, rather than
+	// interfaces.BlobSharesBySyncgroup, so that the client of this call code doesn't
+	// need to reach into the sync service's type definitions.
+	GetBlobShares(_ *context.T, br syncbase.BlobRef, _ ...rpc.CallOpt) (map[string]int32, error)
 }
 
 // SyncClientStub adds universal methods to SyncClientMethods.
@@ -2679,8 +2685,8 @@
 	return
 }
 
-func (c implSyncClientStub) RequestTakeBlob(ctx *context.T, i0 syncbase.BlobRef, i1 string, i2 BlobSharesBySyncgroup, opts ...rpc.CallOpt) (err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "RequestTakeBlob", []interface{}{i0, i1, i2}, nil, opts...)
+func (c implSyncClientStub) RequestTakeBlobs(ctx *context.T, i0 string, i1 map[syncbase.BlobRef]BlobSharesBySyncgroup, opts ...rpc.CallOpt) (err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "RequestTakeBlobs", []interface{}{i0, i1}, nil, opts...)
 	return
 }
 
@@ -2689,6 +2695,11 @@
 	return
 }
 
+func (c implSyncClientStub) GetBlobShares(ctx *context.T, i0 syncbase.BlobRef, opts ...rpc.CallOpt) (o0 map[string]int32, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "GetBlobShares", []interface{}{i0}, []interface{}{&o0}, opts...)
+	return
+}
+
 // SyncGetDeltasClientStream is the client stream for Sync.GetDeltas.
 type SyncGetDeltasClientStream interface {
 	// RecvStream returns the receiver side of the Sync.GetDeltas client stream.
@@ -3094,12 +3105,12 @@
 	// AcceptedBlobOwnership() call.
 	FetchBlobRecipe(_ *context.T, _ SyncFetchBlobRecipeServerCall, br syncbase.BlobRef, callerName string, mySgPriorities SgPriorities) (shares BlobSharesBySyncgroup, _ error)
 	FetchChunks(*context.T, SyncFetchChunksServerCall) error
-	// RequestTakeBlob indicates that the caller wishes the server to take
-	// some blob ownership shares for various syncgroups for the specified blob.
+	// RequestTakeBlobs indicates that the caller wishes the server to take
+	// some blob ownership shares for various syncgroups for the specified blobs.
 	// If the server chooses to act on the request, it may call FetchBlob/FetchBlobRecipe,
 	// and ultimately AcceptedBlobOwnership().
 	// callerName is the syncbase Id of the caller, expressed as a string.
-	RequestTakeBlob(_ *context.T, _ rpc.ServerCall, br syncbase.BlobRef, callerName string, shares BlobSharesBySyncgroup) error
+	RequestTakeBlobs(_ *context.T, _ rpc.ServerCall, callerName string, blobRefToShares map[syncbase.BlobRef]BlobSharesBySyncgroup) error
 	// AcceptedBlobOwnership tells the server that the client callerName (a
 	// syncbase Id expressed as a string) has accepted blob ownership of a
 	// specified number of shares for blob br.  The server may decrement
@@ -3110,6 +3121,12 @@
 	// server is likely to keep the blob itself, plus its syncbase Id
 	// expressed as a string.
 	AcceptedBlobOwnership(_ *context.T, _ rpc.ServerCall, br syncbase.BlobRef, callerName string, shares BlobSharesBySyncgroup) (serverName string, keepingBlob bool, _ error)
+	// GetBlobShares returns the number of ownership shares for the specified blob
+	// held by the server.  It is used by the DevModeGetBlobShares() call in the
+	// service.  It uses the "map[string]int32" type, rather than
+	// interfaces.BlobSharesBySyncgroup, so that the client of this call code doesn't
+	// need to reach into the sync service's type definitions.
+	GetBlobShares(_ *context.T, _ rpc.ServerCall, br syncbase.BlobRef) (map[string]int32, error)
 }
 
 // SyncServerStubMethods is the server interface containing
@@ -3208,12 +3225,12 @@
 	// AcceptedBlobOwnership() call.
 	FetchBlobRecipe(_ *context.T, _ *SyncFetchBlobRecipeServerCallStub, br syncbase.BlobRef, callerName string, mySgPriorities SgPriorities) (shares BlobSharesBySyncgroup, _ error)
 	FetchChunks(*context.T, *SyncFetchChunksServerCallStub) error
-	// RequestTakeBlob indicates that the caller wishes the server to take
-	// some blob ownership shares for various syncgroups for the specified blob.
+	// RequestTakeBlobs indicates that the caller wishes the server to take
+	// some blob ownership shares for various syncgroups for the specified blobs.
 	// If the server chooses to act on the request, it may call FetchBlob/FetchBlobRecipe,
 	// and ultimately AcceptedBlobOwnership().
 	// callerName is the syncbase Id of the caller, expressed as a string.
-	RequestTakeBlob(_ *context.T, _ rpc.ServerCall, br syncbase.BlobRef, callerName string, shares BlobSharesBySyncgroup) error
+	RequestTakeBlobs(_ *context.T, _ rpc.ServerCall, callerName string, blobRefToShares map[syncbase.BlobRef]BlobSharesBySyncgroup) error
 	// AcceptedBlobOwnership tells the server that the client callerName (a
 	// syncbase Id expressed as a string) has accepted blob ownership of a
 	// specified number of shares for blob br.  The server may decrement
@@ -3224,6 +3241,12 @@
 	// server is likely to keep the blob itself, plus its syncbase Id
 	// expressed as a string.
 	AcceptedBlobOwnership(_ *context.T, _ rpc.ServerCall, br syncbase.BlobRef, callerName string, shares BlobSharesBySyncgroup) (serverName string, keepingBlob bool, _ error)
+	// GetBlobShares returns the number of ownership shares for the specified blob
+	// held by the server.  It is used by the DevModeGetBlobShares() call in the
+	// service.  It uses the "map[string]int32" type, rather than
+	// interfaces.BlobSharesBySyncgroup, so that the client of this call code doesn't
+	// need to reach into the sync service's type definitions.
+	GetBlobShares(_ *context.T, _ rpc.ServerCall, br syncbase.BlobRef) (map[string]int32, error)
 }
 
 // SyncServerStub adds universal methods to SyncServerStubMethods.
@@ -3287,14 +3310,18 @@
 	return s.impl.FetchChunks(ctx, call)
 }
 
-func (s implSyncServerStub) RequestTakeBlob(ctx *context.T, call rpc.ServerCall, i0 syncbase.BlobRef, i1 string, i2 BlobSharesBySyncgroup) error {
-	return s.impl.RequestTakeBlob(ctx, call, i0, i1, i2)
+func (s implSyncServerStub) RequestTakeBlobs(ctx *context.T, call rpc.ServerCall, i0 string, i1 map[syncbase.BlobRef]BlobSharesBySyncgroup) error {
+	return s.impl.RequestTakeBlobs(ctx, call, i0, i1)
 }
 
 func (s implSyncServerStub) AcceptedBlobOwnership(ctx *context.T, call rpc.ServerCall, i0 syncbase.BlobRef, i1 string, i2 BlobSharesBySyncgroup) (string, bool, error) {
 	return s.impl.AcceptedBlobOwnership(ctx, call, i0, i1, i2)
 }
 
+func (s implSyncServerStub) GetBlobShares(ctx *context.T, call rpc.ServerCall, i0 syncbase.BlobRef) (map[string]int32, error) {
+	return s.impl.GetBlobShares(ctx, call, i0)
+}
+
 func (s implSyncServerStub) Globber() *rpc.GlobState {
 	return s.gs
 }
@@ -3400,12 +3427,11 @@
 			Name: "FetchChunks",
 		},
 		{
-			Name: "RequestTakeBlob",
-			Doc:  "// RequestTakeBlob indicates that the caller wishes the server to take\n// some blob ownership shares for various syncgroups for the specified blob.\n// If the server chooses to act on the request, it may call FetchBlob/FetchBlobRecipe,\n// and ultimately AcceptedBlobOwnership().\n// callerName is the syncbase Id of the caller, expressed as a string.",
+			Name: "RequestTakeBlobs",
+			Doc:  "// RequestTakeBlobs indicates that the caller wishes the server to take\n// some blob ownership shares for various syncgroups for the specified blobs.\n// If the server chooses to act on the request, it may call FetchBlob/FetchBlobRecipe,\n// and ultimately AcceptedBlobOwnership().\n// callerName is the syncbase Id of the caller, expressed as a string.",
 			InArgs: []rpc.ArgDesc{
-				{"br", ``},         // syncbase.BlobRef
-				{"callerName", ``}, // string
-				{"shares", ``},     // BlobSharesBySyncgroup
+				{"callerName", ``},      // string
+				{"blobRefToShares", ``}, // map[syncbase.BlobRef]BlobSharesBySyncgroup
 			},
 		},
 		{
@@ -3421,6 +3447,16 @@
 				{"keepingBlob", ``}, // bool
 			},
 		},
+		{
+			Name: "GetBlobShares",
+			Doc:  "// GetBlobShares returns the number of ownership shares for the specified blob\n// held by the server.  It is used by the DevModeGetBlobShares() call in the\n// service.  It uses the \"map[string]int32\" type, rather than\n// interfaces.BlobSharesBySyncgroup, so that the client of this call code doesn't\n// need to reach into the sync service's type definitions.",
+			InArgs: []rpc.ArgDesc{
+				{"br", ``}, // syncbase.BlobRef
+			},
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // map[string]int32
+			},
+		},
 	},
 }
 
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 830f7c3..bc82136 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -114,12 +114,12 @@
 	FetchBlobRecipe(br wire.BlobRef, callerName string, mySgPriorities SgPriorities) stream<_, ChunkHash> (shares BlobSharesBySyncgroup | error)
 	FetchChunks() stream<ChunkHash, ChunkData> error
 
-	// RequestTakeBlob indicates that the caller wishes the server to take
-	// some blob ownership shares for various syncgroups for the specified blob.
+	// RequestTakeBlobs indicates that the caller wishes the server to take
+	// some blob ownership shares for various syncgroups for the specified blobs.
 	// If the server chooses to act on the request, it may call FetchBlob/FetchBlobRecipe,
 	// and ultimately AcceptedBlobOwnership().
 	// callerName is the syncbase Id of the caller, expressed as a string.
-	RequestTakeBlob(br wire.BlobRef, callerName string, shares BlobSharesBySyncgroup) error
+	RequestTakeBlobs(callerName string, blobRefToShares map[wire.BlobRef]BlobSharesBySyncgroup) error
 
 	// AcceptedBlobOwnership tells the server that the client callerName (a
 	// syncbase Id expressed as a string) has accepted blob ownership of a
@@ -131,6 +131,13 @@
 	// server is likely to keep the blob itself, plus its syncbase Id
 	// expressed as a string.
 	AcceptedBlobOwnership(br wire.BlobRef, callerName string, shares BlobSharesBySyncgroup) (serverName string, keepingBlob bool | error)
+
+	// GetBlobShares returns the number of ownership shares for the specified blob
+	// held by the server.  It is used by the DevModeGetBlobShares() call in the
+	// service.  It uses the "map[string]int32" type, rather than
+	// interfaces.BlobSharesBySyncgroup, so that the client of this call code doesn't
+	// need to reach into the sync service's type definitions.
+	GetBlobShares(br wire.BlobRef) (map[string]int32 | error)
 }
 
 // TODO(ivanpi): Some methods are missing additional parameter handling ({:_}).
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 7c3a42a..a315037 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -358,6 +358,22 @@
 	return s.vclock.Now()
 }
 
+// DevModeGetBlobShares returns the number of ownership shares for the
+// specified blob held by the server.  It is available only when the --dev
+// flag has been passed to the server.
+func (s *service) DevModeGetBlobShares(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (
+	shares map[string]int32, err error) {
+
+	allowGetBlobShares := []access.Tag{access.Admin}
+	if !s.opts.DevMode {
+		return shares, wire.NewErrNotInDevMode(ctx)
+	}
+	if _, err := common.GetPermsWithAuth(ctx, call, s, allowGetBlobShares, s.st); err != nil {
+		return shares, err
+	}
+	return s.sync.GetBlobShares(ctx, call, br)
+}
+
 func (s *service) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
 	allowSetPermissions := []access.Tag{access.Admin}
 
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
index 99cd782..91300b4 100644
--- a/services/syncbase/vsync/blob.go
+++ b/services/syncbase/vsync/blob.go
@@ -515,12 +515,38 @@
 	return verror.NewErrNotImplemented(ctx)
 }
 
-// RequestTakeBlob tells the server that client wishes the server to take some
-// ownership shares for the blob br.
-func (s *syncService) RequestTakeBlob(ctx *context.T, call rpc.ServerCall,
-	br wire.BlobRef, callerName string, shares interfaces.BlobSharesBySyncgroup) error {
+// RequestTakeBlobs tells the server that client wishes the server to take some
+// ownership shares for the blobs in the map blobRefToShares.
+func (s *syncService) RequestTakeBlobs(ctx *context.T, call rpc.ServerCall,
+	callerName string, blobRefToShares map[wire.BlobRef]interfaces.BlobSharesBySyncgroup) error {
 
-	return verror.NewErrNotImplemented(ctx)
+	var blobRef wire.BlobRef
+	var sgToShares interfaces.BlobSharesBySyncgroup
+	var bf *blobFetcher = s.SyncServiceBlobFetcher()
+	if bf != nil {
+		for blobRef, sgToShares = range blobRefToShares {
+			var blobMetadata blob.BlobMetadata
+			err := s.bst.GetBlobMetadata(ctx, blobRef, &blobMetadata)
+			var shouldFetch bool
+			if err == nil {
+				var gid interfaces.GroupId
+				var sharesToTake int32
+				for gid, sharesToTake = range sgToShares {
+					if sharesToTake > 0 && blobMetadata.OwnerShares[gid] == 0 {
+						shouldFetch = true
+						break
+					}
+				}
+			} else { // if there's no metadata, should fetch the blob
+				shouldFetch = true
+			}
+			if shouldFetch {
+				bf.StartFetchingBlob(s.bst, blobRef, s, time.Now().Add(5*time.Minute),
+					DefaultBlobFetcherFunc)
+			}
+		}
+	}
+	return nil
 }
 
 // AcceptedBlobOwnership tells the server that the caller has accepted
@@ -588,6 +614,23 @@
 	return s.name, err == nil && totalShares > 0, err
 }
 
+// GetBlobShares returns the number of ownership shares for the specified blob
+// held by the server.  It is used by the DevModeGetBlobShares() call in the
+// service.
+func (s *syncService) GetBlobShares(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (
+	shares map[string]int32, err error) {
+
+	var blobMetadata blob.BlobMetadata
+	err = s.bst.GetBlobMetadata(ctx, br, &blobMetadata)
+	if err == nil {
+		shares = make(map[string]int32)
+		for sg, shareCount := range blobMetadata.OwnerShares {
+			shares[string(sg)] = shareCount
+		}
+	}
+	return shares, err
+}
+
 ////////////////////////////////////////////////////////////
 // Helpers.
 
@@ -763,7 +806,7 @@
 					// b) somehow (in signposts?) communicate to peers when the blob has
 					//    reached a "server" so that they may unilaterally drop their shares, or
 					// c) (most likely) sometimes accept shares when we have none even for
-					//    blobs we already have, triggered perhaps via the RequestTakeBlob() call.
+					//    blobs we already have, triggered perhaps via the RequestTakeBlobs() call.
 					var peerName string
 					var peerKeepingBlob bool
 					peerName, peerKeepingBlob, _ = c.AcceptedBlobOwnership(ctx, br, s.name, takingOwnership)
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index ea7af2e..b8411af 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -310,8 +310,22 @@
 	}
 
 	if !iSt.sg {
+		var blobsToHandoff map[wire.BlobRef]interfaces.BlobSharesBySyncgroup
 		// TODO(m3b): It is unclear what to do if this call returns an error.  We would not wish the GetDeltas call to fail.
-		updateAllSyncgroupPriorities(ctx, s.bst, deltaFinalResp.SgPriorities)
+		blobsToHandoff, _ = updateAllSyncgroupPriorities(ctx, s.bst, deltaFinalResp.SgPriorities)
+		if len(blobsToHandoff) > 0 {
+			op := func(ctx *context.T, peer string) (interface{}, error) {
+				c := interfaces.SyncClient(peer)
+				return nil, c.RequestTakeBlobs(ctx, iSt.config.sync.name, blobsToHandoff,
+					options.ServerAuthorizer{iSt.config.auth},
+					options.ChannelTimeout(channelTimeout),
+					options.ConnectionTimeout(syncConnectionTimeout))
+			}
+			// Don't worry if the following call fails; it's a hint.
+			var runAtPeerCancel context.CancelFunc
+			c.peer, _, runAtPeerCancel, _ = runAtPeer(ctx, c.peer, op)
+			defer runAtPeerCancel()
+		}
 	}
 
 	vlog.VI(4).Infof("sync: getDeltas: got reply: %v", iSt.remote)
diff --git a/services/syncbase/vsync/server_blob_fetcher.go b/services/syncbase/vsync/server_blob_fetcher.go
index 76f8719..d52a0ce 100644
--- a/services/syncbase/vsync/server_blob_fetcher.go
+++ b/services/syncbase/vsync/server_blob_fetcher.go
@@ -43,6 +43,7 @@
 	nextAttempt   time.Time // time of next attempted fetch.
 	err           error     // error recorded on the last fetch.
 	heapIndex     int       // index, if in a heap; -1 if being fetched.
+	expiry        time.Time // additional fetch attempts are not made after this time
 }
 
 // ---------------------------------
@@ -169,10 +170,12 @@
 		sp.FetchAttempts = toFetch.fetchAttempts
 		toFetch.bst.SetSignpost(bf.ctx, toFetch.blobRef, &sp)
 	}
-	if err == nil || bf.ctx.Err() != nil || toFetch.stopFetching { // fetched blob, or we're told not to retry.
+	var nextFetchTime time.Time = time.Now().Add(bf.fetchDelay(toFetch.fetchAttempts))
+	if err == nil || bf.ctx.Err() != nil || toFetch.stopFetching || nextFetchTime.After(toFetch.expiry) {
+		// fetched blob, or we're told not to retry.
 		delete(bf.blobMap, toFetch.blobRef)
 	} else { // failed to fetch blob; try again
-		toFetch.nextAttempt = time.Now().Add(bf.fetchDelay(toFetch.fetchAttempts))
+		toFetch.nextAttempt = nextFetchTime
 		heap.Push(&bf.blobQueue, toFetch)
 	}
 	if toFetch.heapIndex == 0 || bf.curFetcherThreads == bf.maxFetcherThreads {
@@ -201,32 +204,39 @@
 
 // StartFetchingBlob() adds a blobRef to blobFetcher *bf, if it's not already
 // known to it, and not shutting down.  The client-provided function
-// fetchFunc() will be used to fetch the blob.
+// fetchFunc() will be used to fetch the blob.  Attempts to fetch the blob
+// will continue until expiry, though at least one new attempt will be made,
+// even if after expiry.
 func (bf *blobFetcher) StartFetchingBlob(bst blob.BlobStore, blobRef wire.BlobRef,
-	clientData interface{}, fetchFunc BlobFetcherFunc) {
+	clientData interface{}, expiry time.Time, fetchFunc BlobFetcherFunc) {
 
 	bf.mu.Lock()
-	var bfs *blobFetchState
-	var found bool
-	bfs, found = bf.blobMap[blobRef]
-	if bf.ctx.Err() == nil && !found {
-		bfs = &blobFetchState{
-			bf:         bf,
-			bst:        bst,
-			blobRef:    blobRef,
-			clientData: clientData,
-			fetchFunc:  fetchFunc,
-			heapIndex:  -1,
-		}
-		var sp interfaces.Signpost
-		if err := bst.GetSignpost(bf.ctx, blobRef, &sp); err == nil {
-			bfs.fetchAttempts = sp.FetchAttempts
-			bfs.nextAttempt = time.Now().Add(bf.fetchDelay(bfs.fetchAttempts))
-		}
-		bf.blobMap[blobRef] = bfs
-		heap.Push(&bf.blobQueue, bfs)
-		if bfs.heapIndex == 0 { // a new lowest fetch time
-			bf.startFetchThreadCV.Broadcast()
+	if bf.ctx.Err() == nil {
+		var bfs *blobFetchState
+		var found bool
+		bfs, found = bf.blobMap[blobRef]
+		if !found {
+			bfs = &blobFetchState{
+				bf:         bf,
+				bst:        bst,
+				blobRef:    blobRef,
+				clientData: clientData,
+				fetchFunc:  fetchFunc,
+				expiry:     expiry,
+				heapIndex:  -1,
+			}
+			var sp interfaces.Signpost
+			if err := bst.GetSignpost(bf.ctx, blobRef, &sp); err == nil {
+				bfs.fetchAttempts = sp.FetchAttempts
+				bfs.nextAttempt = time.Now().Add(bf.fetchDelay(bfs.fetchAttempts))
+			}
+			bf.blobMap[blobRef] = bfs
+			heap.Push(&bf.blobQueue, bfs)
+			if bfs.heapIndex == 0 { // a new lowest fetch time
+				bf.startFetchThreadCV.Broadcast()
+			}
+		} else if expiry.After(bfs.expiry) {
+			bfs.expiry = expiry
 		}
 	}
 	bf.mu.Unlock()
@@ -301,7 +311,8 @@
 				// We do not have the blob locally, and this
 				// syncbase is a server in some syncgroup in
 				// which the blob has been seen.
-				bf.StartFetchingBlob(s.bst, blobRef, clientData, fetchFunc)
+				bf.StartFetchingBlob(s.bst, blobRef, clientData,
+					time.Now().Add(720*time.Hour /*a month*/), fetchFunc)
 			}
 		}
 		err = sps.Err()
@@ -310,8 +321,17 @@
 	return err
 }
 
-// defaultBlobFetcherFunc() is a BlobFetcherFunc that fetches blob blobRef in the normal way.
-func defaultBlobFetcherFunc(ctx *context.T, blobRef wire.BlobRef, clientData interface{}) error {
+// SyncServiceBlobFetcher() returns a pointer to the blobFetcher associated with syncService *s,
+// created by a call to ServerBlobFetcher().  It may return nil if no such pointer exists.
+func (s *syncService) SyncServiceBlobFetcher() (bf *blobFetcher) {
+	s.blobFetcherMu.Lock()
+	bf = s.blobFetcher
+	s.blobFetcherMu.Unlock()
+	return bf
+}
+
+// DefaultBlobFetcherFunc() is a BlobFetcherFunc that fetches blob blobRef in the normal way.
+func DefaultBlobFetcherFunc(ctx *context.T, blobRef wire.BlobRef, clientData interface{}) error {
 	s := clientData.(*syncService)
 	return s.fetchBlobRemote(ctx, blobRef, nil, nil, 0)
 }
@@ -324,6 +344,9 @@
 func ServerBlobFetcher(ctx *context.T, ssm interfaces.SyncServerMethods, done *sync.WaitGroup) {
 	bf := NewBlobFetcher(ctx, serverBlobFetchConcurrency)
 	ss := ssm.(*syncService)
+	ss.blobFetcherMu.Lock()
+	ss.blobFetcher = bf
+	ss.blobFetcherMu.Unlock()
 	var delay time.Duration = serverBlobFetchInitialScanDelay
 	errCount := 0 // state for limiting log records
 	for ctx.Err() == nil {
@@ -333,7 +356,7 @@
 		}
 		startTime := time.Now()
 		if ctx.Err() == nil {
-			if err := ss.serverBlobScan(ctx, bf, ss, defaultBlobFetcherFunc); err != nil {
+			if err := ss.serverBlobScan(ctx, bf, ss, DefaultBlobFetcherFunc); err != nil {
 				if (errCount & (errCount - 1)) == 0 { // errCount is 0 or a power of 2.
 					vlog.Errorf("ServerBlobFetcher:%d: %v", errCount, err)
 				}
@@ -342,6 +365,9 @@
 		}
 		delay = serverBlobFetchExtraScanDelay + serverBlobFetchScanDelayMultiplier*time.Since(startTime)
 	}
+	ss.blobFetcherMu.Lock()
+	ss.blobFetcher = nil
+	ss.blobFetcherMu.Unlock()
 	bf.WaitForExit()
 	if done != nil {
 		done.Done()
diff --git a/services/syncbase/vsync/server_blob_fetcher_test.go b/services/syncbase/vsync/server_blob_fetcher_test.go
index 21029a8..f0b7582 100644
--- a/services/syncbase/vsync/server_blob_fetcher_test.go
+++ b/services/syncbase/vsync/server_blob_fetcher_test.go
@@ -162,7 +162,7 @@
 		ffd.mu.Lock()
 		ffd.fetchesRemaining++
 		ffd.mu.Unlock()
-		bf.StartFetchingBlob(svc.sync.bst, blobRef, &ffd, fakeBlobFetchFunc)
+		bf.StartFetchingBlob(svc.sync.bst, blobRef, &ffd, time.Now().Add(10*time.Minute), fakeBlobFetchFunc)
 	}
 
 	// Wait until all fetching is done.  The test would deadlock here if
@@ -202,7 +202,7 @@
 		ffd.mu.Unlock()
 		for j := 0; j != 3; j++ {
 			// Issue duplicate requests; the duplicates will be ignored.
-			bf.StartFetchingBlob(svc.sync.bst, blobRef, &ffd, fakeBlobFetchFunc)
+			bf.StartFetchingBlob(svc.sync.bst, blobRef, &ffd, time.Now().Add(10*time.Minute), fakeBlobFetchFunc)
 		}
 	}
 	// Wait for fetches to complete.  We would deadlock here if our fetch
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 20eabc5..0448d00 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -128,6 +128,9 @@
 
 	// Naming prefix at which debugging information is exported.
 	statPrefix string
+
+	blobFetcherMu sync.Mutex   // protects blobFetcher
+	blobFetcher   *blobFetcher // a pointer to this syncService's blobFetcher
 }
 
 // syncDatabase contains the metadata for syncing a database. This struct is
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 132a58d..c15e7d7 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -484,8 +484,7 @@
 		// Nothing has higher priority than a server.
 		return false
 	} else if a.DevType != b.DevType {
-		// Different device types have priority defined by the type.
-		return b.DevType < a.DevType
+		return b.DevType == wire.BlobDevTypeServer || b.DevType == wire.BlobDevTypeNormal
 	} else if b.ServerTime.After(a.ServerTime.Add(blobRecencyTimeSlop)) {
 		// Devices with substantially fresher data from a server have higher priority.
 		return true
@@ -517,8 +516,13 @@
 }
 
 // updateAllSyncgroupPriorities updates local syncgroup blob-ownership
-// priorities, based on priority data from a peer.
-func updateAllSyncgroupPriorities(ctx *context.T, bst blob.BlobStore, remoteSgPriorities interfaces.SgPriorities) (anyErr error) {
+// priorities, based on priority data from a peer.  It also returns in
+// blobsToHandoff a vector of BlobRefs that the local host should hand off
+// to the remote one, if it can.
+func updateAllSyncgroupPriorities(ctx *context.T, bst blob.BlobStore, remoteSgPriorities interfaces.SgPriorities) (
+	blobsToHandoff map[wire.BlobRef]interfaces.BlobSharesBySyncgroup, anyErr error) {
+
+	var leafToNonLeafGroups sgSet
 	for sgId, remoteSgPriority := range remoteSgPriorities {
 		var perSyncgroup blob.PerSyncgroup
 		err := bst.GetPerSyncgroup(ctx, sgId, &perSyncgroup)
@@ -533,8 +537,37 @@
 		if err != nil && anyErr == nil {
 			anyErr = err
 		}
+		if perSyncgroup.Priority.DevType == wire.BlobDevTypeLeaf &&
+			remoteSgPriority.DevType != wire.BlobDevTypeLeaf {
+			if leafToNonLeafGroups == nil {
+				leafToNonLeafGroups = make(sgSet)
+			}
+			leafToNonLeafGroups[sgId] = struct{}{}
+		}
 	}
-	return anyErr
+	if leafToNonLeafGroups != nil {
+		var bms blob.BlobMetadataStream = bst.NewBlobMetadataStream(ctx)
+		for bms.Advance() {
+			var ownerShares interfaces.BlobSharesBySyncgroup = bms.BlobMetadata().OwnerShares
+			var donateShares interfaces.BlobSharesBySyncgroup
+			for gid := range leafToNonLeafGroups {
+				if ownerShares[gid] > 0 {
+					if donateShares == nil {
+						donateShares = make(interfaces.BlobSharesBySyncgroup)
+					}
+					donateShares[gid] = 1
+				}
+			}
+			if donateShares != nil {
+				if blobsToHandoff == nil {
+					blobsToHandoff = make(map[wire.BlobRef]interfaces.BlobSharesBySyncgroup)
+				}
+				blobsToHandoff[bms.BlobId()] = donateShares
+			}
+		}
+	}
+
+	return blobsToHandoff, anyErr
 }
 
 // addSyncgroupPriorities inserts into map sgPriMap the syncgroups in sgIds,