syncbase blobs: Fix bug in leaf handoff of blobs, and add test for same.

v.io/v23/services/syncbase/service.vdl
        Add DevModeGetBlobShares() to the Service interface.  This allows tests
        to check the blob ownership shares in the syncbases instances they
        start.

v.io/v23/syncbase/featuretests/blob_v23_test.go
	Add checking of blob ownership shares to TestV23ServerBlobFetch().

        Add new test TestV23LeafBlobFetch() that checks that a blob (and one of
        its shares) is transferred off a leaf as soon as the leaf gets a chance
        to ask another syncbase to take it.

        The new routine getTestStructAndBlobFromRow() is a common piece from
        various other routines.  I pulled it out because I was adding the
        getBlobOwnershipShares() call.

        The checkShares() call uses DevModeGetBlobShares() to check the
        ownership shares of the various blobs and syncbases.

v.io/x/ref/services/syncbase/server/interfaces/sync.vdl
        Change RequestTakeBlob() to RequestTakeBlobs() (plural) to allow a
        single call to request that the recipient take ownership of several
        blobs.

	Add GetBlobShares(), which is used by DevModeGetBlobShares().

v.io/x/ref/services/syncbase/server/service.go
        Add DevModeGetBlobShares() which allows the client to interrogate the
        blob ownership sjhare state on the server.  It requires "-dev" mode,
        and admin access.

v.io/x/ref/services/syncbase/vsync/blob.go
	Implement RequestTakeBlobs().  The previous call RequestTakeBlob()
	merely returned a "not implemented" error.
        The new implementation iterates over all the blobs for which the
        requester is asking for shares to be taken, and queues the blob to be
        fetched iff the current server doesn't have any shares already.

        Add syncService.GetBlobShares(), which is called by
        DevModeGetBlobShares() to allow etsts to check the ownership share
        counts.

v.io/x/ref/services/syncbase/vsync/initiator.go
        In syncService.getDeltas(), after updating the syncgroup priorities
        after the GetDeltas() call, look for blob shares to handoff, and if
        there are any, call RequestTakeBlobs() to tell the responder to
	fetch the relevant blobs.

v.io/x/ref/services/syncbase/vsync/server_blob_fetcher.go
	Add an "expiry" time to blob fetch requests.
        This is useful both in the server (where it should probably stop
        fetching eventually if the blobref goes away), and for fetching for
        leaves (where it should stop fetching if the syncbase is no longer in
        communication with the leaf).

	Modify StartFetchingBlob() to take an expiry time for the fetch request,
	and to set in in the blobFetchState.

        Add SyncServiceBlobFetcher() to allow the rest of the syncService code
        to get its hands on the blobFetcher created by the ServerBlobFetcher()
        thread.

        Modify ServerBlobFetcher() to advertise the blobFetcher in the
        syncService struct.

v.io/x/ref/services/syncbase/vsync/server_blob_fetcher_test.go
	Accommodate new parameter of StartFetchingBlob().

v.io/x/ref/services/syncbase/vsync/sync.go
	Advertise the blobFetcher in the syncService struct.

v.io/x/ref/services/syncbase/vsync/syncgroup.go
	Bug fix in sgPriorityLowerThan().  In a previous change,
	I changed the order of the BlobDevType{Leaf,Normal,Server}
	constants so that "normal" would be the default (zero) value.
	At the time I said, that nothing depends on the order, but I was wrong.
	This routine did.

        Modify updateAllSyncgroupPriorities() to return, as a side-effect, the
        ownership shares that a leaf should hand off to a non-leaf after a
        GetDeltas() call.

MultiPart: 1/2
Change-Id: Ide99a45b868093d9bfdf53e4d7cdc4f86a43941e
diff --git a/services/syncbase/.api b/services/syncbase/.api
index e69d89a..a4372bf 100644
--- a/services/syncbase/.api
+++ b/services/syncbase/.api
@@ -668,12 +668,15 @@
 pkg syncbase, type SchemaMetadata struct
 pkg syncbase, type SchemaMetadata struct, Policy CrPolicy
 pkg syncbase, type SchemaMetadata struct, Version int32
+pkg syncbase, type ServiceClientMethods interface, DevModeGetBlobShares(*context.T, BlobRef, ...rpc.CallOpt) (map[string]int32, error)
 pkg syncbase, type ServiceClientMethods interface, DevModeGetTime(*context.T, ...rpc.CallOpt) (time.Time, error)
 pkg syncbase, type ServiceClientMethods interface, DevModeUpdateVClock(*context.T, DevModeUpdateVClockOpts, ...rpc.CallOpt) error
 pkg syncbase, type ServiceClientMethods interface, unexported methods
+pkg syncbase, type ServiceClientStub interface, DevModeGetBlobShares(*context.T, BlobRef, ...rpc.CallOpt) (map[string]int32, error)
 pkg syncbase, type ServiceClientStub interface, DevModeGetTime(*context.T, ...rpc.CallOpt) (time.Time, error)
 pkg syncbase, type ServiceClientStub interface, DevModeUpdateVClock(*context.T, DevModeUpdateVClockOpts, ...rpc.CallOpt) error
 pkg syncbase, type ServiceClientStub interface, unexported methods
+pkg syncbase, type ServiceServerMethods interface, DevModeGetBlobShares(*context.T, rpc.ServerCall, BlobRef) (map[string]int32, error)
 pkg syncbase, type ServiceServerMethods interface, DevModeGetTime(*context.T, rpc.ServerCall) (time.Time, error)
 pkg syncbase, type ServiceServerMethods interface, DevModeUpdateVClock(*context.T, rpc.ServerCall, DevModeUpdateVClockOpts) error
 pkg syncbase, type ServiceServerMethods interface, unexported methods
diff --git a/services/syncbase/service.vdl b/services/syncbase/service.vdl
index 394cbac..6f71fdd 100644
--- a/services/syncbase/service.vdl
+++ b/services/syncbase/service.vdl
@@ -65,6 +65,13 @@
 	// Also requires --dev flag to be set.
 	DevModeGetTime() (time.Time | error) {access.Admin}
 
+	// DevModeGetBlobShares returns the number of ownership shares held by
+	// the server for the specified blob.
+	//
+	// Requires: Admin on Service.
+	// Also requires --dev flag to be set.
+	DevModeGetBlobShares(br BlobRef) (map[string]int32 | error) {access.Admin}
+
 	// SetPermissions and GetPermissions are included from the Object interface.
 	// Permissions must include at least one admin.
 	//
diff --git a/services/syncbase/syncbase.vdl.go b/services/syncbase/syncbase.vdl.go
index 735806f..2837096 100644
--- a/services/syncbase/syncbase.vdl.go
+++ b/services/syncbase/syncbase.vdl.go
@@ -3284,6 +3284,12 @@
 	// Requires: Admin on Service.
 	// Also requires --dev flag to be set.
 	DevModeGetTime(*context.T, ...rpc.CallOpt) (time.Time, error)
+	// DevModeGetBlobShares returns the number of ownership shares held by
+	// the server for the specified blob.
+	//
+	// Requires: Admin on Service.
+	// Also requires --dev flag to be set.
+	DevModeGetBlobShares(_ *context.T, br BlobRef, _ ...rpc.CallOpt) (map[string]int32, error)
 }
 
 // ServiceClientStub adds universal methods to ServiceClientMethods.
@@ -3313,6 +3319,11 @@
 	return
 }
 
+func (c implServiceClientStub) DevModeGetBlobShares(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (o0 map[string]int32, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "DevModeGetBlobShares", []interface{}{i0}, []interface{}{&o0}, opts...)
+	return
+}
+
 // ServiceServerMethods is the interface a server writer
 // implements for Service.
 //
@@ -3376,6 +3387,12 @@
 	// Requires: Admin on Service.
 	// Also requires --dev flag to be set.
 	DevModeGetTime(*context.T, rpc.ServerCall) (time.Time, error)
+	// DevModeGetBlobShares returns the number of ownership shares held by
+	// the server for the specified blob.
+	//
+	// Requires: Admin on Service.
+	// Also requires --dev flag to be set.
+	DevModeGetBlobShares(_ *context.T, _ rpc.ServerCall, br BlobRef) (map[string]int32, error)
 }
 
 // ServiceServerStubMethods is the server interface containing
@@ -3423,6 +3440,10 @@
 	return s.impl.DevModeGetTime(ctx, call)
 }
 
+func (s implServiceServerStub) DevModeGetBlobShares(ctx *context.T, call rpc.ServerCall, i0 BlobRef) (map[string]int32, error) {
+	return s.impl.DevModeGetBlobShares(ctx, call, i0)
+}
+
 func (s implServiceServerStub) Globber() *rpc.GlobState {
 	return s.gs
 }
@@ -3459,6 +3480,17 @@
 			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
 		},
+		{
+			Name: "DevModeGetBlobShares",
+			Doc:  "// DevModeGetBlobShares returns the number of ownership shares held by\n// the server for the specified blob.\n//\n// Requires: Admin on Service.\n// Also requires --dev flag to be set.",
+			InArgs: []rpc.ArgDesc{
+				{"br", ``}, // BlobRef
+			},
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // map[string]int32
+			},
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
+		},
 	},
 }
 
diff --git a/syncbase/featuretests/blob_v23_test.go b/syncbase/featuretests/blob_v23_test.go
index 394840f..9d371fd 100644
--- a/syncbase/featuretests/blob_v23_test.go
+++ b/syncbase/featuretests/blob_v23_test.go
@@ -68,14 +68,15 @@
 	defer sh.Cleanup()
 	sh.StartRootMountTable()
 
-	sbs := setupSyncbases(t, sh, 2, false, false)
+	sbs := setupSyncbases(t, sh, 2, true, false)
 
 	sgId := wire.Id{Name: "SG1", Blessing: testCx.Blessing}
 
 	ok(t, createCollection(sbs[0].clientCtx, sbs[0].sbName, testCx.Name))
 	ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, 10))
 	// sbs[0] is not a server.
-	ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", nil, clBlessings(sbs), "", wire.SyncgroupMemberInfo{SyncPriority: 8}))
+	ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", nil, clBlessings(sbs), "",
+		wire.SyncgroupMemberInfo{SyncPriority: 8}))
 	// sbs[1] is a server, so will fetch blobs automatically; it joins the syncgroup.
 	ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId,
 		wire.SyncgroupMemberInfo{SyncPriority: 10, BlobDevType: byte(wire.BlobDevTypeServer)}))
@@ -85,10 +86,22 @@
 	ok(t, generateBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz")))
 	ok(t, generateBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat")))
 
+	// Before the server fetch, each node should have just its own blob.
+	checkShares(t, "before server fetch", sbs, []int{
+		2, 0,
+		0, 2,
+	})
+
 	// Sleep until the fetch mechanism has a chance to run  (its initial timeout is 10s,
 	// to make this test possible).
 	time.Sleep(15 * time.Second)
 
+	// Now the server should have all blob shares.
+	checkShares(t, "after server fetch", sbs, []int{
+		0, 0,
+		2, 2,
+	})
+
 	// The syncbases should have the blobs they themselves created.
 	ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, 9, true /*already present*/))
 	ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, 12, true /*already present*/))
@@ -99,6 +112,12 @@
 	// sbs[0] should not already have the blob created by sbs[1], since sbs[0] is not a server.
 	ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, 12, false /*not already present*/))
 
+	// The share distribution should not have changed.
+	checkShares(t, "at end", sbs, []int{
+		0, 0,
+		2, 2,
+	})
+
 	// Wrap up by checking the blob values.
 	ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, []byte("wombatnumbat"), 0))
 	ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat"), 0))
@@ -106,6 +125,62 @@
 	ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("foobarbaz"), 0))
 }
 
+// TestV23LeafBlobFetch tests the mechanism for leaf syncbases to give blobs to
+// non-leaf syncbases with which they sync.  It sets up two syncbases:
+// - 0 is a non-leaf syncgroup member; it creates a syncgroup.
+// - 1 is a leaf syncgroup member; it joins the syncgroup.
+// Both create blobs.   The test checks that the non-leaf syncbase automatically
+// fetches the blob created on the leaf, but not vice versa.
+func TestV23LeafBlobFetch(t *testing.T) {
+	v23test.SkipUnlessRunningIntegrationTests(t)
+	sh := v23test.NewShell(t, nil)
+	defer sh.Cleanup()
+	sh.StartRootMountTable()
+
+	sbs := setupSyncbases(t, sh, 2, true, false)
+
+	sgId := wire.Id{Name: "SG1", Blessing: testCx.Blessing}
+
+	ok(t, createCollection(sbs[0].clientCtx, sbs[0].sbName, testCx.Name))
+	ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, 10))
+	// sbs[0] is a non-leaf member that creates the syncgroup, and creates a blob.
+	ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", nil, clBlessings(sbs), "",
+		wire.SyncgroupMemberInfo{SyncPriority: 8}))
+	// sbs[1] is a leaf member; it joins the syncgroup, and creates a blob.
+	// It will tell the non-leaf member to fetch its blob.
+	ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId,
+		wire.SyncgroupMemberInfo{SyncPriority: 8, BlobDevType: byte(wire.BlobDevTypeLeaf)}))
+	ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, 10))
+
+	// Generate a blob on each member.
+	ok(t, generateBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz")))
+	ok(t, generateBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat")))
+
+	time.Sleep(5 * time.Second)
+
+	// One ownership share of the leaf's blob should have been transferred to the non-leaf.
+	checkShares(t, "leaf initial", sbs, []int{
+		2, 1,
+		0, 1,
+	})
+
+	// The syncbases should have the blobs they themselves created.
+	ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, 9, true /*already present*/))
+	ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, 12, true /*already present*/))
+
+	// sbs[0] should have the leaf's blob too.
+	ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, 12, true /*already present*/))
+
+	// sbs[1] should not have the blob created by sbs[0].
+	ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, 9, false /*not already present*/))
+
+	// Wrap up by checking the blob values.
+	ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz"), 0))
+	ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("foobarbaz"), 0))
+	ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, []byte("wombatnumbat"), 0))
+	ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat"), 0))
+}
+
 ////////////////////////////////////
 // Helpers.
 
@@ -155,19 +230,20 @@
 	return nil
 }
 
-// fetchBlob ensures that the blob named by key (keyPrefix, pos) can be fetched
-// by syncbaseName, and has size wantSize.  If alreadyPresent is set, it checks
-// that the blob is already present, and other checks that the blob is fetched
-// from some remote syncbase.
-func fetchBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int, wantSize int64, alreadyPresent bool) error {
+// getTestStructAndBlobFromRow waits until the row with the key keyPrefix + pos
+// is accessible on the specified syncbase, then reads and returns the value
+// from that row, which is expected to be a testStruct.  A blob handle from the
+// blobref is also returned.
+// This is a common subroutine used in several of the routines below.
+func getTestStructAndBlobFromRow(ctx *context.T, syncbaseName, keyPrefix string, pos int) (testStruct, syncbase.Blob, error) {
 	d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
 	c := d.CollectionForId(testCx)
 
 	key := fmt.Sprintf("%s%d", keyPrefix, pos)
 	r := c.Row(key)
-	var s testStruct
 
 	// Try for 10 seconds to get the new value.
+	var s testStruct
 	var err error
 	for i := 0; i < 10; i++ {
 		// Note: the error is a decode error since the old value is a
@@ -179,10 +255,23 @@
 	}
 
 	if err != nil {
-		return fmt.Errorf("r.Get() failed: %v", err)
+		return testStruct{}, nil, fmt.Errorf("r.Get() failed: %v", err)
+	}
+	return s, d.Blob(s.Blob), nil
+}
+
+// fetchBlob ensures that the blob named by key (keyPrefix, pos) can be fetched
+// by syncbaseName, and has size wantSize.  If alreadyPresent is set, it checks
+// that the blob is already present, and other checks that the blob is fetched
+// from some remote syncbase.
+func fetchBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int, wantSize int64, alreadyPresent bool) error {
+	var b syncbase.Blob
+	var err error
+	_, b, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
+	if err != nil {
+		return err
 	}
 
-	b := d.Blob(s.Blob)
 	bs, err := b.Fetch(ctx, 100)
 	if err != nil {
 		return fmt.Errorf("Fetch RPC failed, err %v", err)
@@ -230,29 +319,13 @@
 }
 
 func getBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int, wantVal []byte, offset int64) error {
-	d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
-	c := d.CollectionForId(testCx)
-
-	key := fmt.Sprintf("%s%d", keyPrefix, pos)
-	r := c.Row(key)
-	var s testStruct
-
-	// Try for 10 seconds to get the new value.
+	var b syncbase.Blob
 	var err error
-	for i := 0; i < 10; i++ {
-		// Note: the error is a decode error since the old value is a
-		// string, and the new value is testStruct.
-		if err = r.Get(ctx, &s); err == nil {
-			break
-		}
-		time.Sleep(1 * time.Second)
-	}
-
+	_, b, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
 	if err != nil {
-		return fmt.Errorf("r.Get() failed: %v", err)
+		return err
 	}
 
-	b := d.Blob(s.Blob)
 	br, err := b.Get(ctx, offset)
 	if err != nil {
 		return fmt.Errorf("GetBlob RPC failed, err %v", err)
@@ -322,29 +395,14 @@
 }
 
 func getBigBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int) error {
-	d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
-	c := d.CollectionForId(testCx)
-
-	key := fmt.Sprintf("%s%d", keyPrefix, pos)
-	r := c.Row(key)
 	var s testStruct
-
-	// Try for 10 seconds to get the new value.
+	var b syncbase.Blob
 	var err error
-	for i := 0; i < 10; i++ {
-		// Note: the error is a decode error since the old value is a
-		// string, and the new value is testStruct.
-		if err = r.Get(ctx, &s); err == nil {
-			break
-		}
-		time.Sleep(1 * time.Second)
-	}
-
+	s, b, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
 	if err != nil {
-		return fmt.Errorf("r.Get() failed: %v", err)
+		return err
 	}
 
-	b := d.Blob(s.Blob)
 	br, err := b.Get(ctx, 0)
 	if err != nil {
 		return fmt.Errorf("GetBlob RPC failed, err %v", err)
@@ -366,6 +424,43 @@
 	return nil
 }
 
+// getBlobOwnershipShares returns the ownership shares (summed across all syncgroups) for blob br.
+func getBlobOwnershipShares(ctx *context.T, syncbaseName string, keyPrefix string, pos int) (shares int) {
+	var s testStruct
+	var err error
+	s, _, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
+	if err == nil {
+		var shareMap map[string]int32
+		shareMap, err = sc(syncbaseName).DevModeGetBlobShares(ctx, s.Blob)
+		if err == nil {
+			// Sum the shares across all syncgroups.
+			for _, shareCount := range shareMap {
+				shares += int(shareCount)
+			}
+		}
+	}
+	return shares
+}
+
+// checkShares checks that the shares of the blobs mentioned in rows fooN (for N in 0..len(sbs)-1)
+// are at the expected values for the various syncbases.   The elements of expectedShares[]
+// are in the order sbs[0] for foo0, foo1, ...,  followed by sbs[1] for foo0, foo1, ... etc.
+// An expected value of -1 means "don't care".
+func checkShares(t *testing.T, msg string, sbs []*testSyncbase, expectedShares []int) {
+	for sbIndex := 0; sbIndex != len(sbs); sbIndex++ {
+		for blobIndex := 0; blobIndex != len(sbs); blobIndex++ {
+			var expected int = expectedShares[blobIndex+len(sbs)*sbIndex]
+			if expected != -1 {
+				var shares int = getBlobOwnershipShares(sbs[sbIndex].clientCtx, sbs[sbIndex].sbName, "foo", blobIndex)
+				if expected != shares {
+					t.Errorf("%s: sb %d blob %d got %d shares, expected %d\n",
+						msg, sbIndex, blobIndex, shares, expected)
+				}
+			}
+		}
+	}
+}
+
 // Copied from localblobstore/fs_cablobstore/fs_cablobstore.go.
 //
 // hashToString() returns a string representation of the hash.