syncbase blobs: Fix bug in leaf handoff of blobs, and add test for same.
v.io/v23/services/syncbase/service.vdl
Add DevModeGetBlobShares() to the Service interface. This allows tests
to check the blob ownership shares in the syncbases instances they
start.
v.io/v23/syncbase/featuretests/blob_v23_test.go
Add checking of blob ownership shares to TestV23ServerBlobFetch().
Add new test TestV23LeafBlobFetch() that checks that a blob (and one of
its shares) is transferred off a leaf as soon as the leaf gets a chance
to ask another syncbase to take it.
The new routine getTestStructAndBlobFromRow() is a common piece from
various other routines. I pulled it out because I was adding the
getBlobOwnershipShares() call.
The checkShares() call uses DevModeGetBlobShares() to check the
ownership shares of the various blobs and syncbases.
v.io/x/ref/services/syncbase/server/interfaces/sync.vdl
Change RequestTakeBlob() to RequestTakeBlobs() (plural) to allow a
single call to request that the recipient take ownership of several
blobs.
Add GetBlobShares(), which is used by DevModeGetBlobShares().
v.io/x/ref/services/syncbase/server/service.go
Add DevModeGetBlobShares() which allows the client to interrogate the
blob ownership sjhare state on the server. It requires "-dev" mode,
and admin access.
v.io/x/ref/services/syncbase/vsync/blob.go
Implement RequestTakeBlobs(). The previous call RequestTakeBlob()
merely returned a "not implemented" error.
The new implementation iterates over all the blobs for which the
requester is asking for shares to be taken, and queues the blob to be
fetched iff the current server doesn't have any shares already.
Add syncService.GetBlobShares(), which is called by
DevModeGetBlobShares() to allow etsts to check the ownership share
counts.
v.io/x/ref/services/syncbase/vsync/initiator.go
In syncService.getDeltas(), after updating the syncgroup priorities
after the GetDeltas() call, look for blob shares to handoff, and if
there are any, call RequestTakeBlobs() to tell the responder to
fetch the relevant blobs.
v.io/x/ref/services/syncbase/vsync/server_blob_fetcher.go
Add an "expiry" time to blob fetch requests.
This is useful both in the server (where it should probably stop
fetching eventually if the blobref goes away), and for fetching for
leaves (where it should stop fetching if the syncbase is no longer in
communication with the leaf).
Modify StartFetchingBlob() to take an expiry time for the fetch request,
and to set in in the blobFetchState.
Add SyncServiceBlobFetcher() to allow the rest of the syncService code
to get its hands on the blobFetcher created by the ServerBlobFetcher()
thread.
Modify ServerBlobFetcher() to advertise the blobFetcher in the
syncService struct.
v.io/x/ref/services/syncbase/vsync/server_blob_fetcher_test.go
Accommodate new parameter of StartFetchingBlob().
v.io/x/ref/services/syncbase/vsync/sync.go
Advertise the blobFetcher in the syncService struct.
v.io/x/ref/services/syncbase/vsync/syncgroup.go
Bug fix in sgPriorityLowerThan(). In a previous change,
I changed the order of the BlobDevType{Leaf,Normal,Server}
constants so that "normal" would be the default (zero) value.
At the time I said, that nothing depends on the order, but I was wrong.
This routine did.
Modify updateAllSyncgroupPriorities() to return, as a side-effect, the
ownership shares that a leaf should hand off to a non-leaf after a
GetDeltas() call.
MultiPart: 1/2
Change-Id: Ide99a45b868093d9bfdf53e4d7cdc4f86a43941e
diff --git a/services/syncbase/.api b/services/syncbase/.api
index e69d89a..a4372bf 100644
--- a/services/syncbase/.api
+++ b/services/syncbase/.api
@@ -668,12 +668,15 @@
pkg syncbase, type SchemaMetadata struct
pkg syncbase, type SchemaMetadata struct, Policy CrPolicy
pkg syncbase, type SchemaMetadata struct, Version int32
+pkg syncbase, type ServiceClientMethods interface, DevModeGetBlobShares(*context.T, BlobRef, ...rpc.CallOpt) (map[string]int32, error)
pkg syncbase, type ServiceClientMethods interface, DevModeGetTime(*context.T, ...rpc.CallOpt) (time.Time, error)
pkg syncbase, type ServiceClientMethods interface, DevModeUpdateVClock(*context.T, DevModeUpdateVClockOpts, ...rpc.CallOpt) error
pkg syncbase, type ServiceClientMethods interface, unexported methods
+pkg syncbase, type ServiceClientStub interface, DevModeGetBlobShares(*context.T, BlobRef, ...rpc.CallOpt) (map[string]int32, error)
pkg syncbase, type ServiceClientStub interface, DevModeGetTime(*context.T, ...rpc.CallOpt) (time.Time, error)
pkg syncbase, type ServiceClientStub interface, DevModeUpdateVClock(*context.T, DevModeUpdateVClockOpts, ...rpc.CallOpt) error
pkg syncbase, type ServiceClientStub interface, unexported methods
+pkg syncbase, type ServiceServerMethods interface, DevModeGetBlobShares(*context.T, rpc.ServerCall, BlobRef) (map[string]int32, error)
pkg syncbase, type ServiceServerMethods interface, DevModeGetTime(*context.T, rpc.ServerCall) (time.Time, error)
pkg syncbase, type ServiceServerMethods interface, DevModeUpdateVClock(*context.T, rpc.ServerCall, DevModeUpdateVClockOpts) error
pkg syncbase, type ServiceServerMethods interface, unexported methods
diff --git a/services/syncbase/service.vdl b/services/syncbase/service.vdl
index 394cbac..6f71fdd 100644
--- a/services/syncbase/service.vdl
+++ b/services/syncbase/service.vdl
@@ -65,6 +65,13 @@
// Also requires --dev flag to be set.
DevModeGetTime() (time.Time | error) {access.Admin}
+ // DevModeGetBlobShares returns the number of ownership shares held by
+ // the server for the specified blob.
+ //
+ // Requires: Admin on Service.
+ // Also requires --dev flag to be set.
+ DevModeGetBlobShares(br BlobRef) (map[string]int32 | error) {access.Admin}
+
// SetPermissions and GetPermissions are included from the Object interface.
// Permissions must include at least one admin.
//
diff --git a/services/syncbase/syncbase.vdl.go b/services/syncbase/syncbase.vdl.go
index 735806f..2837096 100644
--- a/services/syncbase/syncbase.vdl.go
+++ b/services/syncbase/syncbase.vdl.go
@@ -3284,6 +3284,12 @@
// Requires: Admin on Service.
// Also requires --dev flag to be set.
DevModeGetTime(*context.T, ...rpc.CallOpt) (time.Time, error)
+ // DevModeGetBlobShares returns the number of ownership shares held by
+ // the server for the specified blob.
+ //
+ // Requires: Admin on Service.
+ // Also requires --dev flag to be set.
+ DevModeGetBlobShares(_ *context.T, br BlobRef, _ ...rpc.CallOpt) (map[string]int32, error)
}
// ServiceClientStub adds universal methods to ServiceClientMethods.
@@ -3313,6 +3319,11 @@
return
}
+func (c implServiceClientStub) DevModeGetBlobShares(ctx *context.T, i0 BlobRef, opts ...rpc.CallOpt) (o0 map[string]int32, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "DevModeGetBlobShares", []interface{}{i0}, []interface{}{&o0}, opts...)
+ return
+}
+
// ServiceServerMethods is the interface a server writer
// implements for Service.
//
@@ -3376,6 +3387,12 @@
// Requires: Admin on Service.
// Also requires --dev flag to be set.
DevModeGetTime(*context.T, rpc.ServerCall) (time.Time, error)
+ // DevModeGetBlobShares returns the number of ownership shares held by
+ // the server for the specified blob.
+ //
+ // Requires: Admin on Service.
+ // Also requires --dev flag to be set.
+ DevModeGetBlobShares(_ *context.T, _ rpc.ServerCall, br BlobRef) (map[string]int32, error)
}
// ServiceServerStubMethods is the server interface containing
@@ -3423,6 +3440,10 @@
return s.impl.DevModeGetTime(ctx, call)
}
+func (s implServiceServerStub) DevModeGetBlobShares(ctx *context.T, call rpc.ServerCall, i0 BlobRef) (map[string]int32, error) {
+ return s.impl.DevModeGetBlobShares(ctx, call, i0)
+}
+
func (s implServiceServerStub) Globber() *rpc.GlobState {
return s.gs
}
@@ -3459,6 +3480,17 @@
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
},
+ {
+ Name: "DevModeGetBlobShares",
+ Doc: "// DevModeGetBlobShares returns the number of ownership shares held by\n// the server for the specified blob.\n//\n// Requires: Admin on Service.\n// Also requires --dev flag to be set.",
+ InArgs: []rpc.ArgDesc{
+ {"br", ``}, // BlobRef
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // map[string]int32
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
+ },
},
}
diff --git a/syncbase/featuretests/blob_v23_test.go b/syncbase/featuretests/blob_v23_test.go
index 394840f..9d371fd 100644
--- a/syncbase/featuretests/blob_v23_test.go
+++ b/syncbase/featuretests/blob_v23_test.go
@@ -68,14 +68,15 @@
defer sh.Cleanup()
sh.StartRootMountTable()
- sbs := setupSyncbases(t, sh, 2, false, false)
+ sbs := setupSyncbases(t, sh, 2, true, false)
sgId := wire.Id{Name: "SG1", Blessing: testCx.Blessing}
ok(t, createCollection(sbs[0].clientCtx, sbs[0].sbName, testCx.Name))
ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, 10))
// sbs[0] is not a server.
- ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", nil, clBlessings(sbs), "", wire.SyncgroupMemberInfo{SyncPriority: 8}))
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", nil, clBlessings(sbs), "",
+ wire.SyncgroupMemberInfo{SyncPriority: 8}))
// sbs[1] is a server, so will fetch blobs automatically; it joins the syncgroup.
ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId,
wire.SyncgroupMemberInfo{SyncPriority: 10, BlobDevType: byte(wire.BlobDevTypeServer)}))
@@ -85,10 +86,22 @@
ok(t, generateBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz")))
ok(t, generateBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat")))
+ // Before the server fetch, each node should have just its own blob.
+ checkShares(t, "before server fetch", sbs, []int{
+ 2, 0,
+ 0, 2,
+ })
+
// Sleep until the fetch mechanism has a chance to run (its initial timeout is 10s,
// to make this test possible).
time.Sleep(15 * time.Second)
+ // Now the server should have all blob shares.
+ checkShares(t, "after server fetch", sbs, []int{
+ 0, 0,
+ 2, 2,
+ })
+
// The syncbases should have the blobs they themselves created.
ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, 9, true /*already present*/))
ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, 12, true /*already present*/))
@@ -99,6 +112,12 @@
// sbs[0] should not already have the blob created by sbs[1], since sbs[0] is not a server.
ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, 12, false /*not already present*/))
+ // The share distribution should not have changed.
+ checkShares(t, "at end", sbs, []int{
+ 0, 0,
+ 2, 2,
+ })
+
// Wrap up by checking the blob values.
ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, []byte("wombatnumbat"), 0))
ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat"), 0))
@@ -106,6 +125,62 @@
ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("foobarbaz"), 0))
}
+// TestV23LeafBlobFetch tests the mechanism for leaf syncbases to give blobs to
+// non-leaf syncbases with which they sync. It sets up two syncbases:
+// - 0 is a non-leaf syncgroup member; it creates a syncgroup.
+// - 1 is a leaf syncgroup member; it joins the syncgroup.
+// Both create blobs. The test checks that the non-leaf syncbase automatically
+// fetches the blob created on the leaf, but not vice versa.
+func TestV23LeafBlobFetch(t *testing.T) {
+ v23test.SkipUnlessRunningIntegrationTests(t)
+ sh := v23test.NewShell(t, nil)
+ defer sh.Cleanup()
+ sh.StartRootMountTable()
+
+ sbs := setupSyncbases(t, sh, 2, true, false)
+
+ sgId := wire.Id{Name: "SG1", Blessing: testCx.Blessing}
+
+ ok(t, createCollection(sbs[0].clientCtx, sbs[0].sbName, testCx.Name))
+ ok(t, populateData(sbs[0].clientCtx, sbs[0].sbName, testCx.Name, "foo", 0, 10))
+ // sbs[0] is a non-leaf member that creates the syncgroup, and creates a blob.
+ ok(t, createSyncgroup(sbs[0].clientCtx, sbs[0].sbName, sgId, testCx.Name, "", nil, clBlessings(sbs), "",
+ wire.SyncgroupMemberInfo{SyncPriority: 8}))
+ // sbs[1] is a leaf member; it joins the syncgroup, and creates a blob.
+ // It will tell the non-leaf member to fetch its blob.
+ ok(t, joinSyncgroup(sbs[1].clientCtx, sbs[1].sbName, sbs[0].sbName, sgId,
+ wire.SyncgroupMemberInfo{SyncPriority: 8, BlobDevType: byte(wire.BlobDevTypeLeaf)}))
+ ok(t, verifySyncgroupData(sbs[1].clientCtx, sbs[1].sbName, testCx.Name, "foo", "", 0, 10))
+
+ // Generate a blob on each member.
+ ok(t, generateBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz")))
+ ok(t, generateBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat")))
+
+ time.Sleep(5 * time.Second)
+
+ // One ownership share of the leaf's blob should have been transferred to the non-leaf.
+ checkShares(t, "leaf initial", sbs, []int{
+ 2, 1,
+ 0, 1,
+ })
+
+ // The syncbases should have the blobs they themselves created.
+ ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, 9, true /*already present*/))
+ ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, 12, true /*already present*/))
+
+ // sbs[0] should have the leaf's blob too.
+ ok(t, fetchBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, 12, true /*already present*/))
+
+ // sbs[1] should not have the blob created by sbs[0].
+ ok(t, fetchBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, 9, false /*not already present*/))
+
+ // Wrap up by checking the blob values.
+ ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 0, []byte("foobarbaz"), 0))
+ ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 0, []byte("foobarbaz"), 0))
+ ok(t, getBlob(sbs[0].clientCtx, sbs[0].sbName, "foo", 1, []byte("wombatnumbat"), 0))
+ ok(t, getBlob(sbs[1].clientCtx, sbs[1].sbName, "foo", 1, []byte("wombatnumbat"), 0))
+}
+
////////////////////////////////////
// Helpers.
@@ -155,19 +230,20 @@
return nil
}
-// fetchBlob ensures that the blob named by key (keyPrefix, pos) can be fetched
-// by syncbaseName, and has size wantSize. If alreadyPresent is set, it checks
-// that the blob is already present, and other checks that the blob is fetched
-// from some remote syncbase.
-func fetchBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int, wantSize int64, alreadyPresent bool) error {
+// getTestStructAndBlobFromRow waits until the row with the key keyPrefix + pos
+// is accessible on the specified syncbase, then reads and returns the value
+// from that row, which is expected to be a testStruct. A blob handle from the
+// blobref is also returned.
+// This is a common subroutine used in several of the routines below.
+func getTestStructAndBlobFromRow(ctx *context.T, syncbaseName, keyPrefix string, pos int) (testStruct, syncbase.Blob, error) {
d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
c := d.CollectionForId(testCx)
key := fmt.Sprintf("%s%d", keyPrefix, pos)
r := c.Row(key)
- var s testStruct
// Try for 10 seconds to get the new value.
+ var s testStruct
var err error
for i := 0; i < 10; i++ {
// Note: the error is a decode error since the old value is a
@@ -179,10 +255,23 @@
}
if err != nil {
- return fmt.Errorf("r.Get() failed: %v", err)
+ return testStruct{}, nil, fmt.Errorf("r.Get() failed: %v", err)
+ }
+ return s, d.Blob(s.Blob), nil
+}
+
+// fetchBlob ensures that the blob named by key (keyPrefix, pos) can be fetched
+// by syncbaseName, and has size wantSize. If alreadyPresent is set, it checks
+// that the blob is already present, and other checks that the blob is fetched
+// from some remote syncbase.
+func fetchBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int, wantSize int64, alreadyPresent bool) error {
+ var b syncbase.Blob
+ var err error
+ _, b, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
+ if err != nil {
+ return err
}
- b := d.Blob(s.Blob)
bs, err := b.Fetch(ctx, 100)
if err != nil {
return fmt.Errorf("Fetch RPC failed, err %v", err)
@@ -230,29 +319,13 @@
}
func getBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int, wantVal []byte, offset int64) error {
- d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
- c := d.CollectionForId(testCx)
-
- key := fmt.Sprintf("%s%d", keyPrefix, pos)
- r := c.Row(key)
- var s testStruct
-
- // Try for 10 seconds to get the new value.
+ var b syncbase.Blob
var err error
- for i := 0; i < 10; i++ {
- // Note: the error is a decode error since the old value is a
- // string, and the new value is testStruct.
- if err = r.Get(ctx, &s); err == nil {
- break
- }
- time.Sleep(1 * time.Second)
- }
-
+ _, b, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
if err != nil {
- return fmt.Errorf("r.Get() failed: %v", err)
+ return err
}
- b := d.Blob(s.Blob)
br, err := b.Get(ctx, offset)
if err != nil {
return fmt.Errorf("GetBlob RPC failed, err %v", err)
@@ -322,29 +395,14 @@
}
func getBigBlob(ctx *context.T, syncbaseName, keyPrefix string, pos int) error {
- d := syncbase.NewService(syncbaseName).DatabaseForId(testDb, nil)
- c := d.CollectionForId(testCx)
-
- key := fmt.Sprintf("%s%d", keyPrefix, pos)
- r := c.Row(key)
var s testStruct
-
- // Try for 10 seconds to get the new value.
+ var b syncbase.Blob
var err error
- for i := 0; i < 10; i++ {
- // Note: the error is a decode error since the old value is a
- // string, and the new value is testStruct.
- if err = r.Get(ctx, &s); err == nil {
- break
- }
- time.Sleep(1 * time.Second)
- }
-
+ s, b, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
if err != nil {
- return fmt.Errorf("r.Get() failed: %v", err)
+ return err
}
- b := d.Blob(s.Blob)
br, err := b.Get(ctx, 0)
if err != nil {
return fmt.Errorf("GetBlob RPC failed, err %v", err)
@@ -366,6 +424,43 @@
return nil
}
+// getBlobOwnershipShares returns the ownership shares (summed across all syncgroups) for blob br.
+func getBlobOwnershipShares(ctx *context.T, syncbaseName string, keyPrefix string, pos int) (shares int) {
+ var s testStruct
+ var err error
+ s, _, err = getTestStructAndBlobFromRow(ctx, syncbaseName, keyPrefix, pos)
+ if err == nil {
+ var shareMap map[string]int32
+ shareMap, err = sc(syncbaseName).DevModeGetBlobShares(ctx, s.Blob)
+ if err == nil {
+ // Sum the shares across all syncgroups.
+ for _, shareCount := range shareMap {
+ shares += int(shareCount)
+ }
+ }
+ }
+ return shares
+}
+
+// checkShares checks that the shares of the blobs mentioned in rows fooN (for N in 0..len(sbs)-1)
+// are at the expected values for the various syncbases. The elements of expectedShares[]
+// are in the order sbs[0] for foo0, foo1, ..., followed by sbs[1] for foo0, foo1, ... etc.
+// An expected value of -1 means "don't care".
+func checkShares(t *testing.T, msg string, sbs []*testSyncbase, expectedShares []int) {
+ for sbIndex := 0; sbIndex != len(sbs); sbIndex++ {
+ for blobIndex := 0; blobIndex != len(sbs); blobIndex++ {
+ var expected int = expectedShares[blobIndex+len(sbs)*sbIndex]
+ if expected != -1 {
+ var shares int = getBlobOwnershipShares(sbs[sbIndex].clientCtx, sbs[sbIndex].sbName, "foo", blobIndex)
+ if expected != shares {
+ t.Errorf("%s: sb %d blob %d got %d shares, expected %d\n",
+ msg, sbIndex, blobIndex, shares, expected)
+ }
+ }
+ }
+ }
+}
+
// Copied from localblobstore/fs_cablobstore/fs_cablobstore.go.
//
// hashToString() returns a string representation of the hash.