syncbase blobs: Make servers fetch blobs automatically.
This change adds a syncbase mechanism that fetches blobs
automatically when the syncbase is a server within a
syncgroup that mentions the blob's id.
The main code is in the new file
v.io/x/ref/services/syncbase/vsync/server_blob_fetcher.go
It uses the Signpost database to discover blobrefs and whether it's a server,
and the blob database to determine whether the blobs are already present.
(Blobrefs are added to the Signpost database by processBlobRefs() routine in
services/syncbase/vsync/blob.go.)
Details:
v.io/x/ref/services/syncbase/server/interfaces/sync_types.vdl
Add "FetchAttempts" count to the Signpost data so that a server can
track how many times it has attempted to fetch a blob across reboots.
The value from a remote syncbase is always ignored, because
mergeSignposts() in blob.go does not transfer it.
v.io/x/ref/services/syncbase/server/service.go
Start the new ServerBlobFetcher() process when a syncbase starts.
v.io/x/ref/services/syncbase/vsync/blob.go
Fix a bug in syncService.FetchBlob(). I had added an element to a Go map
without first using make() to create the map.
Make these methods on syncDatabase become methods on syncService.
None need to know the datbase, and we need to call them from contexts where no database is known.
fetchBlobRemote()
locateBlob()
getMountTables()
v.io/x/ref/services/syncbase/vsync/parameters.go
Parameters to control the automatic blob fetching on servers.
v.io/x/ref/services/syncbase/vsync/server_blob_fetcher.go
v.io/x/ref/services/syncbase/vsync/server_blob_fetcher_test.go
The main blob fetcher code and unittest.
The integration test is in syncbase/featuretests/blob_v23_test.go
v.io/v23/syncbase/types.vdl
Change the order of the BlobDevType* constants so that "BlobDevTypeNormal"
has the zero value. This causes devices to be "normal" by default.
The ordering isn't used anywhere.
v.io/v23/syncbase/featuretests/blob_v23_test.go
Add an integration test for the automatic server blob fetching code.
Modify the fetchBlob() helper function so that the "skipIncStatus" is
used to indicate instead whether the blob is expected to be present,
and perform checks in either case. This doesn't change the existing
calls, because they were using this parameter to skip the checks
exactly when the blob was expected to be already present.
Pass an explicit SyncgroupMemberInfo struct to createSyncgroup() and
joinSyncgroup() helpers, so the new test can specify that some syncbases
are servers.
v.io/v23/syncbase/featuretests/test_util_test.go
Make createSyncgroup() and joinSyncgroup() helpers take an explicit
SyncgroupMemberInfo struct so some tests can specify that some
syncbases are servers.
v.io/v23/syncbase/featuretests/cr_v23_test.go
v.io/v23/syncbase/featuretests/ping_pong_test.go
v.io/v23/syncbase/featuretests/sync_v23_test.go
v.io/v23/syncbase/featuretests/syncgroup_v23_test.go
v.io/v23/syncbase/featuretests/vclock_v23_test.go
Pass an explicit SyncgroupMemberInfo struct to createSyncgroup() and
joinSyncgroup() helpers.
MultiPart: 2/2
Change-Id: I25e90d63a0b454265522e15a548cc33208815bb8
diff --git a/services/syncbase/server/interfaces/interfaces.vdl.go b/services/syncbase/server/interfaces/interfaces.vdl.go
index 6cc6dda..359db0f 100644
--- a/services/syncbase/server/interfaces/interfaces.vdl.go
+++ b/services/syncbase/server/interfaces/interfaces.vdl.go
@@ -2337,8 +2337,9 @@
// It represents the data known about a blob even when the blob itself is not
// present on the device.
type Signpost struct {
- Locations PeerToLocationDataMap // Maps name of syncbase that probably has the blob to a LocationData
- SgIds map[GroupId]struct{} // SyncGroups through which the BlobRef was learned.
+ Locations PeerToLocationDataMap // Maps name of syncbase that probably has the blob to a LocationData
+ SgIds map[GroupId]struct{} // SyncGroups through which the BlobRef was learned.
+ FetchAttempts uint32 // Number of attempts made to fetch the blob.
}
func (Signpost) __VDLReflect(struct {
@@ -2353,6 +2354,9 @@
if len(x.SgIds) != 0 {
return false
}
+ if x.FetchAttempts != 0 {
+ return false
+ }
return true
}
@@ -2376,6 +2380,11 @@
return err
}
}
+ if x.FetchAttempts != 0 {
+ if err := enc.NextFieldValueUint(2, vdl.Uint32Type, uint64(x.FetchAttempts)); err != nil {
+ return err
+ }
+ }
if err := enc.NextField(-1); err != nil {
return err
}
@@ -2414,6 +2423,13 @@
if err := __VDLReadAnon_set_3(dec, &x.SgIds); err != nil {
return err
}
+ case 2:
+ switch value, err := dec.ReadValueUint(32); {
+ case err != nil:
+ return err
+ default:
+ x.FetchAttempts = uint32(value)
+ }
}
}
}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 20dab50..418efae 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -228,6 +228,7 @@
// It represents the data known about a blob even when the blob itself is not
// present on the device.
type Signpost struct {
- Locations PeerToLocationDataMap // Maps name of syncbase that probably has the blob to a LocationData
- SgIds set[GroupId] // SyncGroups through which the BlobRef was learned.
+ Locations PeerToLocationDataMap // Maps name of syncbase that probably has the blob to a LocationData
+ SgIds set[GroupId] // SyncGroups through which the BlobRef was learned.
+ FetchAttempts uint32 // Number of attempts made to fetch the blob.
}
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
index cf33a9b..ab50975 100644
--- a/services/syncbase/vsync/blob.go
+++ b/services/syncbase/vsync/blob.go
@@ -281,7 +281,7 @@
return err
}
- return sd.fetchBlobRemote(ctx, br, nil, call, offset)
+ return ss.fetchBlobRemote(ctx, br, nil, call, offset)
}
func (sd *syncDatabase) FetchBlob(ctx *context.T, call wire.BlobManagerFetchBlobServerCall, br wire.BlobRef, priority uint64) error {
@@ -310,7 +310,7 @@
// TODO(hpucha): Implement a blob queue.
clientStream.Send(wire.BlobFetchStatus{State: wire.BlobFetchStatePending})
- return sd.fetchBlobRemote(ctx, br, call, nil, 0)
+ return ss.fetchBlobRemote(ctx, br, call, nil, 0)
}
func (sd *syncDatabase) PinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
@@ -362,6 +362,9 @@
localShares := blobMetadata.OwnerShares[sgId]
localSgPriority, gotLocalSgPriority := localSgPriorities[sgId]
if gotLocalSgPriority && localShares > 0 && sgPriorityLowerThan(&localSgPriority, &remoteSgPriority) {
+ if sharesToTransfer == nil {
+ sharesToTransfer = make(interfaces.BlobSharesBySyncgroup)
+ }
if remoteSgPriority.DevType == wire.BlobDevTypeServer {
// Caller is a server in this syncgroup----give it all the shares.
sharesToTransfer[sgId] = localShares
@@ -528,7 +531,7 @@
return nil
}
-func (sd *syncDatabase) fetchBlobRemote(ctx *context.T, br wire.BlobRef, statusCall wire.BlobManagerFetchBlobServerCall, dataCall wire.BlobManagerGetBlobServerCall, offset int64) error {
+func (s *syncService) fetchBlobRemote(ctx *context.T, br wire.BlobRef, statusCall wire.BlobManagerFetchBlobServerCall, dataCall wire.BlobManagerGetBlobServerCall, offset int64) error {
vlog.VI(4).Infof("sync: fetchBlobRemote: begin br %v, offset %v", br, offset)
defer vlog.VI(4).Infof("sync: fetchBlobRemote: end br %v, offset %v", br, offset)
@@ -558,7 +561,7 @@
}
// Locate blob.
- peer, size, err := sd.locateBlob(ctx, br)
+ peer, size, err := s.locateBlob(ctx, br)
if err != nil {
return err
}
@@ -569,8 +572,7 @@
statusStream.Send(status)
}
- ss := sd.sync.(*syncService)
- bst := ss.bst
+ bst := s.bst
bWriter, err := bst.NewBlobWriter(ctx, string(br))
if err != nil {
@@ -582,11 +584,11 @@
sgPriorities := make(interfaces.SgPriorities)
var signpost interfaces.Signpost
var blessingNames []string
- if ss.bst.GetSignpost(ctx, br, &signpost) == nil {
+ if s.bst.GetSignpost(ctx, br, &signpost) == nil {
blessingNames, err = getPeerBlessingsForFetchBlob(ctx, peer)
if err == nil {
- filterSignpost(ctx, blessingNames, ss, &signpost)
- addSyncgroupPriorities(ctx, ss.bst, signpost.SgIds, sgPriorities)
+ filterSignpost(ctx, blessingNames, s, &signpost)
+ addSyncgroupPriorities(ctx, s.bst, signpost.SgIds, sgPriorities)
}
}
@@ -632,7 +634,6 @@
if err == nil {
// We successfully fetched the blob. Maybe
// take ownership in one or more syncgroups.
- ss := sd.sync.(*syncService)
takingOwnership := make(interfaces.BlobSharesBySyncgroup)
for sgId, shares := range remoteSharesBySgId {
myPriority, havePriority := sgPriorities[sgId]
@@ -665,22 +666,22 @@
// blobs we already have, triggered perhaps via the RequestTakeBlob() call.
var peerName string
var peerKeepingBlob bool
- peerName, peerKeepingBlob, _ = c.AcceptedBlobOwnership(ctx, br, ss.name, takingOwnership)
+ peerName, peerKeepingBlob, _ = c.AcceptedBlobOwnership(ctx, br, s.name, takingOwnership)
var blobMetadata blob.BlobMetadata
- ss.bst.GetBlobMetadata(ctx, br, &blobMetadata)
+ s.bst.GetBlobMetadata(ctx, br, &blobMetadata)
for sgId, shares := range takingOwnership {
blobMetadata.OwnerShares[sgId] += shares
}
- ss.bst.SetBlobMetadata(ctx, br, &blobMetadata)
+ s.bst.SetBlobMetadata(ctx, br, &blobMetadata)
// Remove peer from local signpost if it's not keeping blob.
if !peerKeepingBlob {
var sp interfaces.Signpost
- if ss.bst.GetSignpost(ctx, br, &sp) == nil {
+ if s.bst.GetSignpost(ctx, br, &sp) == nil {
delete(sp.Locations, peerName)
- ss.bst.SetSignpost(ctx, br, &sp)
+ s.bst.SetSignpost(ctx, br, &sp)
}
}
}
@@ -837,13 +838,12 @@
}
// TODO(hpucha): Add syncgroup driven blob discovery.
-func (sd *syncDatabase) locateBlob(ctx *context.T, br wire.BlobRef) (string, int64, error) {
+func (s *syncService) locateBlob(ctx *context.T, br wire.BlobRef) (string, int64, error) {
vlog.VI(4).Infof("sync: locateBlob: begin br %v", br)
defer vlog.VI(4).Infof("sync: locateBlob: end br %v", br)
- ss := sd.sync.(*syncService)
var sp interfaces.Signpost
- err := ss.bst.GetSignpost(ctx, br, &sp)
+ err := s.bst.GetSignpost(ctx, br, &sp)
if err != nil {
return "", 0, err
}
@@ -863,7 +863,7 @@
var p string = locationList[i].peer
vlog.VI(4).Infof("sync: locateBlob: attempting %s", p)
// Get the mount tables for this peer.
- mtTables, err := sd.getMountTables(ctx, p)
+ mtTables, err := s.getMountTables(ctx, p)
if err != nil {
continue
}
@@ -874,7 +874,7 @@
size, remoteSp, err := c.HaveBlob(ctx, br)
if size >= 0 {
if updatedSp {
- ss.bst.SetSignpost(ctx, br, &sp)
+ s.bst.SetSignpost(ctx, br, &sp)
}
vlog.VI(4).Infof("sync: locateBlob: found blob on %s", absName)
return absName, size, nil
@@ -894,15 +894,14 @@
}
}
if updatedSp {
- ss.bst.SetSignpost(ctx, br, &sp)
+ s.bst.SetSignpost(ctx, br, &sp)
}
return "", 0, verror.New(verror.ErrInternal, ctx, "blob not found")
}
-func (sd *syncDatabase) getMountTables(ctx *context.T, peer string) (map[string]struct{}, error) {
- ss := sd.sync.(*syncService)
- mInfo := ss.copyMemberInfo(ctx, peer)
+func (s *syncService) getMountTables(ctx *context.T, peer string) (map[string]struct{}, error) {
+ mInfo := s.copyMemberInfo(ctx, peer)
return mInfo.mtTables, nil
}
diff --git a/services/syncbase/vsync/parameters.go b/services/syncbase/vsync/parameters.go
index 4b18430..5dbe2b4 100644
--- a/services/syncbase/vsync/parameters.go
+++ b/services/syncbase/vsync/parameters.go
@@ -75,4 +75,26 @@
// initialBlobOwnerShares is the initial number of ownership shares that
// a device gives itself when it introduces a BlobRef to a syncgroup.
initialBlobOwnerShares = int32(2)
+
+ // serverBlobFetchConcurrency is the maximum number of concurrent blob fetches
+ // that a server will initiate as background activity to pull blobs to it.
+ serverBlobFetchConcurrency = 4
+
+ // serverBlobFetchInitialScanDelay is the time before the first scan to
+ // find blobs for servers to fetch. This parameter exists primarily
+ // for testing. An alternative would be to find some means for a test
+ // to adjust the serverBlobFetchExtraScanDelay parameter.
+ serverBlobFetchInitialScanDelay = 10 * time.Second
+
+ // serverBlobFetchExtraScanDelay is the additional time between scans
+ // of the database to find blobs for servers to fetch, so the total time
+ // between scans is
+ // serverBlobFetchScanDelayMultiplier*time_for_last_scan+serverBlobFetchExtraScanDelay
+ serverBlobFetchExtraScanDelay = 300 * time.Second
+
+ // serverBlobFetchScanDelayMultiplier is a multiplier applied to the
+ // scan time to ensure that a server doesn't spent too much of its time scanning
+ // for blobs. The total time between scans is
+ // serverBlobFetchScanDelayMultiplier*time_for_last_scan+serverBlobFetchExtraScanDelay
+ serverBlobFetchScanDelayMultiplier time.Duration = 4
)
diff --git a/services/syncbase/vsync/server_blob_fetcher.go b/services/syncbase/vsync/server_blob_fetcher.go
new file mode 100644
index 0000000..ec38aea
--- /dev/null
+++ b/services/syncbase/vsync/server_blob_fetcher.go
@@ -0,0 +1,343 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import "container/heap"
+import "math/rand"
+import "time"
+
+import "v.io/v23/context"
+import wire "v.io/v23/services/syncbase"
+import "v.io/x/lib/vlog"
+import "v.io/x/lib/nsync"
+import blob "v.io/x/ref/services/syncbase/localblobstore"
+import "v.io/x/ref/services/syncbase/server/interfaces"
+
+// This file contains the machinery that runs in syncgroup "servers" that try
+// to fetch blobs within the syncgroup so that they may be stored on the server
+// for reliability and availability.
+
+// ---------------------------------
+
+// A BlobFetcherFunc can be passed into the public calls of this abstraction
+// to customize how to fetch a blob.
+type BlobFetcherFunc func(ctx *context.T, blobRef wire.BlobRef, clientData interface{}) error
+
+// ---------------------------------
+
+// A blobFetchState records the state of a blob that this server is attempting to fetch.
+type blobFetchState struct {
+ bf *blobFetcher // The associated blobFetcher.
+ bst blob.BlobStore // The associated blob store.
+
+ blobRef wire.BlobRef // the blob's Id.
+ clientData interface{} // provided by client, and passed to fetchFunc
+ fetchFunc BlobFetcherFunc // function to be used to fetch blob
+
+ // The fields below are protected by bf.mu.
+ fetchAttempts uint32 // incremented on start of fetch attempt
+ stopFetching bool // Whether to abandon in-progress fetches, and not restart. Monotonic: false to true.
+ nextAttempt time.Time // time of next attempted fetch.
+ err error // error recorded on the last fetch.
+ heapIndex int // index, if in a heap; -1 if being fetched.
+}
+
+// ---------------------------------
+
+// A blobFetchStateHeap is a heap of blobFetchState pointers ordered by nextAttempt, earliest first.
+type blobFetchStateHeap []*blobFetchState
+
+// The following routines conform to "container/heap".Interface
+func (h blobFetchStateHeap) Len() int { return len(h) }
+func (h blobFetchStateHeap) Less(i int, j int) bool { return h[i].nextAttempt.Before(h[j].nextAttempt) }
+func (h blobFetchStateHeap) Swap(i int, j int) {
+ h[i], h[j] = h[j], h[i]
+ h[i].heapIndex = i
+ h[j].heapIndex = j
+}
+func (h *blobFetchStateHeap) Push(x interface{}) {
+ bfs := x.(*blobFetchState)
+ bfs.heapIndex = len(*h)
+ *h = append(*h, bfs)
+}
+func (h *blobFetchStateHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ bfs := old[n-1]
+ bfs.heapIndex = -1
+ *h = old[0 : n-1]
+ return bfs
+}
+
+// ---------------------------------
+
+// A blobFetcher records the state of all blobs that this server is attempting to fetch.
+type blobFetcher struct {
+ ctx *context.T // Context passed to NewBlobFetcher().
+
+ mu nsync.Mu // protects fields below, plus most fields in blobFetchState.
+
+ blobMap map[wire.BlobRef]*blobFetchState // blobs that should be fetched
+ blobQueue blobFetchStateHeap // same elements as blobMap; heap prioritized by nextAttempt time
+
+ maxFetcherThreads int // number of threads allowed to fetch blobs.
+ curFetcherThreads int // number of fetcher threads that exist (<= maxFetcherThreads)
+
+ fetchStarterThreadShutdown bool // whether the fetchStarterThread thread has shut down. Monotonic: false to true.
+
+ startFetchThreadCV nsync.CV // a fetch thread can be started (canStartAFetchThread())
+ shutdownCV nsync.CV // shutdown is complete (isShutdown() returns true)
+}
+
+// canStartAFetchThread() returns whether a new fetcher thread should be
+// started by *bf's fetchStarterThread. The condition is that the number of
+// outstanding fetcher threads is below the maximum, and the next blob to fetch
+// has reached its "nextAttempt" deadline.
+// Places in *firstBlobDeadline the nextAttempt time of the blob to fetch,
+// or nsync.NoDeadline if none.
+// Called with bf.mu held.
+func (bf *blobFetcher) canStartAFetchThread(firstBlobDeadline *time.Time) (result bool) {
+ *firstBlobDeadline = nsync.NoDeadline
+ if bf.curFetcherThreads < bf.maxFetcherThreads && len(bf.blobQueue) != 0 { // a chance we could start a thread
+ result = bf.blobQueue[0].nextAttempt.Before(time.Now())
+ if !result { // failed only because it's not yet time; set the deadline to when it will be time
+ *firstBlobDeadline = bf.blobQueue[0].nextAttempt
+ }
+ }
+ return result
+}
+
+// isShutdown() returns whether *bf has been shut down. That is, the fetchStarterThread
+// and all fetcher threads have finished.
+func (bf *blobFetcher) isShutdown() bool {
+ return bf.fetchStarterThreadShutdown && bf.curFetcherThreads == 0
+}
+
+// fetchStarterThread() creates threads to fetch blobs (using fetchABlob()).
+// It runs until the ctx passed to NewBlobFetcher() becomes cancelled or
+// expires. On exit, it sets bf.fetchStarterThreadShutdown.
+func (bf *blobFetcher) fetchStarterThread() {
+ bf.mu.Lock()
+ for bf.ctx.Err() == nil {
+ var deadline time.Time
+ // Wait until either we're shut down (bf.ctx.Done() is closed),
+ // or we can start a new fetch thread on some blob.
+ // canStartAFetchThread() computes the deadline for the CV wait.
+ for !bf.canStartAFetchThread(&deadline) &&
+ bf.startFetchThreadCV.WaitWithDeadline(&bf.mu, deadline, bf.ctx.Done()) == nsync.OK {
+ }
+ if bf.ctx.Err() == nil && bf.canStartAFetchThread(&deadline) {
+ // Remove the first blob from the priority queue, and start a thread to fetch it.
+ var toFetch *blobFetchState = heap.Pop(&bf.blobQueue).(*blobFetchState)
+ if toFetch.heapIndex != -1 {
+ panic("blobFetchState unexpectedly in heap")
+ }
+ bf.curFetcherThreads++
+ go bf.fetchABlob(toFetch)
+ }
+ }
+ bf.fetchStarterThreadShutdown = true
+ if bf.isShutdown() {
+ bf.shutdownCV.Broadcast()
+ }
+ bf.mu.Unlock()
+}
+
+// fetchABlob() attempts to fetch the blob identified by *toFetch. If the
+// fetch was unsuccessful and if the client has not requested that fetching be
+// abandoned, the fetch is queued for retrying. Otherwise, the *toFetch
+// is removed from *bf.
+func (bf *blobFetcher) fetchABlob(toFetch *blobFetchState) {
+ bf.mu.Lock()
+ if toFetch.heapIndex != -1 {
+ panic("blobFetchState unexpectedly on heap")
+ }
+ bf.mu.Unlock()
+ var err error = toFetch.fetchFunc(bf.ctx, toFetch.blobRef, toFetch.clientData)
+ bf.mu.Lock()
+ toFetch.err = err
+ toFetch.fetchAttempts++
+ // Maintain fetchAttempts in the on-disc Signpost data structure.
+ var sp interfaces.Signpost
+ toFetch.bst.GetSignpost(bf.ctx, toFetch.blobRef, &sp)
+ // We may write the Signpost back, even if the GetSignpost() call failed.
+ // On failure, sp will be a canonical empty Signpost.
+ if toFetch.fetchAttempts > sp.FetchAttempts {
+ sp.FetchAttempts = toFetch.fetchAttempts
+ toFetch.bst.SetSignpost(bf.ctx, toFetch.blobRef, &sp)
+ }
+ if err == nil || bf.ctx.Err() != nil || toFetch.stopFetching { // fetched blob, or we're told not to retry.
+ delete(bf.blobMap, toFetch.blobRef)
+ } else { // failed to fetch blob; try again
+ toFetch.nextAttempt = time.Now().Add(bf.fetchDelay(toFetch.fetchAttempts))
+ heap.Push(&bf.blobQueue, toFetch)
+ }
+ if toFetch.heapIndex == 0 || bf.curFetcherThreads == bf.maxFetcherThreads {
+ // new lowest fetch time, or there were no free threads.
+ bf.startFetchThreadCV.Broadcast()
+ }
+ bf.curFetcherThreads--
+ if bf.isShutdown() {
+ bf.shutdownCV.Broadcast()
+ }
+ bf.mu.Unlock()
+}
+
+// fetchDelay() returns how long the blobFetcher should wait before attempting to fetch a blob
+// for which there have already been "fetchAttempts" failed fetch attempts.
+func (bf *blobFetcher) fetchDelay(fetchAttempts uint32) time.Duration {
+ // Delay has a random component, and is exponential in failures,
+ // between about 1.5s and about 11 days. (fetchAttempts will be 1 on
+ // the first call for a blob, since this is not invoked until there's
+ // been a failure.)
+ if fetchAttempts > 20 { // Limit delay to around a million seconds---11 days.
+ fetchAttempts = 20
+ }
+ return ((500 + time.Duration(rand.Int31n(500))) * time.Millisecond) << fetchAttempts
+}
+
+// StartFetchingBlob() adds a blobRef to blobFetcher *bf, if it's not already
+// known to it, and not shutting down. The client-provided function
+// fetchFunc() will be used to fetch the blob.
+func (bf *blobFetcher) StartFetchingBlob(bst blob.BlobStore, blobRef wire.BlobRef,
+ clientData interface{}, fetchFunc BlobFetcherFunc) {
+
+ bf.mu.Lock()
+ var bfs *blobFetchState
+ var found bool
+ bfs, found = bf.blobMap[blobRef]
+ if bf.ctx.Err() == nil && !found {
+ bfs = &blobFetchState{
+ bf: bf,
+ bst: bst,
+ blobRef: blobRef,
+ clientData: clientData,
+ fetchFunc: fetchFunc,
+ heapIndex: -1,
+ }
+ var sp interfaces.Signpost
+ if err := bst.GetSignpost(bf.ctx, blobRef, &sp); err == nil {
+ bfs.fetchAttempts = sp.FetchAttempts
+ bfs.nextAttempt = time.Now().Add(bf.fetchDelay(bfs.fetchAttempts))
+ }
+ bf.blobMap[blobRef] = bfs
+ heap.Push(&bf.blobQueue, bfs)
+ if bfs.heapIndex == 0 { // a new lowest fetch time
+ bf.startFetchThreadCV.Broadcast()
+ }
+ }
+ bf.mu.Unlock()
+}
+
+// StopFetchingBlob() removes blobRef from blobFetcher *bf.
+// It may still be being fetched, but failures will no longer be retried,
+// and an in progress fetch may be halted if possible.
+func (bf *blobFetcher) StopFetchingBlob(blobRef wire.BlobRef) {
+ bf.mu.Lock()
+ var bfs *blobFetchState
+ var found bool
+ if bfs, found = bf.blobMap[blobRef]; found {
+ bfs.stopFetching = true // tell any in-progress fetcher thread to stop if it can.
+ if bfs.heapIndex != -1 { // if not currently fetching, forget blob.
+ delete(bf.blobMap, bfs.blobRef)
+ heap.Remove(&bf.blobQueue, bfs.heapIndex)
+ } // else fetching thread will forget the blob when it finishes.
+ }
+ bf.mu.Unlock()
+}
+
+// NewBlobFetcher() returns a new blobFetcher that can use maxThreads fetcher threads.
+func NewBlobFetcher(ctx *context.T, maxThreads int) *blobFetcher {
+ bf := &blobFetcher{ctx: ctx, maxFetcherThreads: maxThreads, blobMap: make(map[wire.BlobRef]*blobFetchState)}
+ heap.Init(&bf.blobQueue) // "container/heap"'s spec requires this---apparently even on an empty heap.
+ go bf.fetchStarterThread()
+ return bf
+}
+
+// WaitForExit() waits *bf is fully shut down. Typically, this is used after
+// cancelling the context passed to NewBlobFetcher().
+func (bf *blobFetcher) WaitForExit() {
+ bf.mu.Lock()
+ for !bf.isShutdown() {
+ bf.shutdownCV.Wait(&bf.mu)
+ }
+ bf.mu.Unlock()
+}
+
+// ---------------------------------
+
+// serverBlobScan() scans the blobs in *s, and for those blobs for which it is
+// a server in some syncgroup that fetches the blobs, it gives the blob ids to
+// blob fetcher *bf. Servers are assumed to have space to keep all blobs.
+// The function fetchFunc() is used to fetch blobs.
+func (s *syncService) serverBlobScan(ctx *context.T, bf *blobFetcher,
+ clientData interface{}, fetchFunc BlobFetcherFunc) (err error) {
+ // Construct a map that indicates which blobs are available locally,
+ // and expunge blobFetchState records for such blobs.
+ haveBlob := make(map[wire.BlobRef]bool)
+ var bs blob.Stream = s.bst.ListBlobIds(ctx)
+ for bs.Advance() && ctx.Err() == nil {
+ br, brErr := s.bst.NewBlobReader(ctx, bs.Value())
+ if brErr == nil && br.IsFinalized() {
+ haveBlob[wire.BlobRef(bs.Value())] = true
+ }
+ }
+ err = bs.Err()
+ bs.Cancel() // in case we didn't finish advancing.
+
+ // For every blob whose id has been seen locally, there is a Signpost.
+ // Iterate over them, and fetch the ones not available locally for
+ // which the current syncbase is a "server" in some syncgroup.
+ if err == nil && ctx.Err() == nil {
+ var sps blob.SignpostStream = s.bst.NewSignpostStream(ctx)
+ for sps.Advance() && ctx.Err() == nil {
+ blobRef := sps.BlobId()
+ if !haveBlob[blobRef] &&
+ len(s.syncgroupsWithServer(ctx, wire.Id{}, s.name, sps.Signpost().SgIds)) != 0 {
+
+ // We do not have the blob locally, and this
+ // syncbase is a server in some syncgroup in
+ // which the blob has been seen.
+ bf.StartFetchingBlob(s.bst, blobRef, clientData, fetchFunc)
+ }
+ }
+ err = sps.Err()
+ sps.Cancel() // in case we didn't finish advancing.
+ }
+ return err
+}
+
+// defaultBlobFetcherFunc() is a BlobFetcherFunc that fetches blob blobRef in the normal way.
+func defaultBlobFetcherFunc(ctx *context.T, blobRef wire.BlobRef, clientData interface{}) error {
+ s := clientData.(*syncService)
+ return s.fetchBlobRemote(ctx, blobRef, nil, nil, 0)
+}
+
+// ServerBlobFetcher() calls serverBlobScan() repeatedly on ssm (which must contain a *syncService),
+// with gaps specified by parameters in parameters.go before scanning them all again. It
+// stops only if the context *ctx is cancelled. The function fetchFunc() will
+// be used to fetch blobs, and passed the argument clientData on each call.
+func ServerBlobFetcher(ctx *context.T, ssm interfaces.SyncServerMethods) {
+ bf := NewBlobFetcher(ctx, serverBlobFetchConcurrency)
+ ss := ssm.(*syncService)
+ var delay time.Duration = serverBlobFetchInitialScanDelay
+ errCount := 0 // state for limiting log records
+ for ctx.Err() == nil {
+ select {
+ case <-time.After(delay):
+ case <-ctx.Done():
+ }
+ startTime := time.Now()
+ if ctx.Err() == nil {
+ if err := ss.serverBlobScan(ctx, bf, ss, defaultBlobFetcherFunc); err != nil {
+ if (errCount & (errCount - 1)) == 0 { // errCount is 0 or a power of 2.
+ vlog.Errorf("ServerBlobFetcher:%d: %v", errCount, err)
+ }
+ errCount++
+ }
+ }
+ delay = serverBlobFetchExtraScanDelay + serverBlobFetchScanDelayMultiplier*time.Since(startTime)
+ }
+}
diff --git a/services/syncbase/vsync/server_blob_fetcher_test.go b/services/syncbase/vsync/server_blob_fetcher_test.go
new file mode 100644
index 0000000..672e487
--- /dev/null
+++ b/services/syncbase/vsync/server_blob_fetcher_test.go
@@ -0,0 +1,226 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import "container/heap"
+import "math/rand"
+import "testing"
+import "time"
+
+import "v.io/v23/context"
+import wire "v.io/v23/services/syncbase"
+import "v.io/v23/verror"
+import "v.io/x/lib/nsync"
+import blob "v.io/x/ref/services/syncbase/localblobstore"
+
+// TestBlobFetchStateHeap() tests that blobFetchStateHeap() does indeed act as a heap,
+// ordered by blobFetchState.nextAttempt
+func TestBlobFetchStateHeap(t *testing.T) {
+ var queue blobFetchStateHeap
+ var list []*blobFetchState
+ now := time.Now()
+ heap.Init(&queue) // "container/heap"'s spec requires this---apparently even on an empty heap.
+ for loop := 0; loop != 1000000; loop++ {
+ op := rand.Int31n(4)
+ switch op {
+ case 0, 1: // add an entry
+ bfs := &blobFetchState{nextAttempt: now.Add(time.Duration(rand.Int63()))}
+ heap.Push(&queue, bfs)
+ i := len(list)
+ list = append(list, bfs)
+ for i != 0 && bfs.nextAttempt.Before(list[i-1].nextAttempt) {
+ list[i] = list[i-1]
+ i--
+ }
+ list[i] = bfs
+ case 2: // remove an entry from the root, if there is one
+ if len(queue) != 0 {
+ bfs := heap.Pop(&queue).(*blobFetchState)
+ if bfs != list[0] {
+ t.Fatalf("loop %d: Pop: lowest in heap is not lowest in list", loop)
+ }
+ copy(list[:], list[1:])
+ list = list[0 : len(list)-1]
+ }
+ case 3: // remove an arbitrary entry, if there is one.
+ if len(queue) != 0 {
+ i := rand.Int31n(int32(len(list)))
+ heapIndex := list[i].heapIndex
+ bfs := heap.Remove(&queue, heapIndex).(*blobFetchState)
+ if bfs != list[i] {
+ t.Fatalf("loop %d: Remove() blobFetchState index %d differs from list index %d",
+ loop, heapIndex, i)
+ }
+ copy(list[i:], list[i+1:])
+ list = list[0 : len(list)-1]
+ }
+ }
+
+ // check heap consistency
+ if len(queue) != len(list) {
+ t.Fatalf("loop %d: len(queue)==%d != %d==len(list)", loop)
+ }
+ if len(queue) != 0 && queue[0] != list[0] {
+ t.Fatalf("loop %d: lowest in heap is not lowest in list", loop)
+ }
+ for i := 0; i != len(queue); i++ {
+ if i != queue[i].heapIndex {
+ t.Fatalf("loop %d: heapIndex incorrect: %d != %d", loop, i, queue[i].heapIndex)
+ }
+ }
+ }
+}
+
+// A fakeFetchData is passed to fakeBlobFetchFunc() via StartFetchingBlob()'s
+// clientData argument. It holds the data used to fake a fetch, and to
+// notify of its completion.
+type fakeFetchData struct {
+ t *testing.T
+ delay time.Duration // how long each fake fetch waits
+
+ mu nsync.Mu // protects fields below
+ fetchesRemaining int // number of outstanding fetches remaining; under mu
+ noFetchesRemaining nsync.CV // signalled when fetchesRemaining==0
+ errorsOn map[wire.BlobRef]int // report an error fetching blob b errorsOn[b] times
+}
+
+const pkgPath = "v.io/x/ref/services/syncbase/vsync"
+
+var errFakeBlobFetchError = verror.Register(pkgPath+".fakeBlobFetchError", verror.NoRetry, "{1:}{2:} fakeBlobFetch error")
+
+// fakeBlobFetchFunc() is a fake blob fetching routine. It's used in this unit test
+// to verify that the basic queueing of the server blob fetching mechanism works.
+// The real blob fetching path is tested by TestV23ServerBlobFetch in v.io/v23/syncbase/featuretests.
+func fakeBlobFetchFunc(ctx *context.T, blobRef wire.BlobRef, ffdi interface{}) error {
+ ffd := ffdi.(*fakeFetchData)
+ var err error
+
+ time.Sleep(ffd.delay) // simulate a delay in fetching the blob.
+
+ ffd.mu.Lock()
+ var errorCount int = ffd.errorsOn[blobRef]
+ if errorCount != 0 {
+ // If we were instructed to fake an error in fetching this blob, do it.
+ ffd.errorsOn[blobRef] = errorCount - 1
+ err = verror.New(errFakeBlobFetchError, ctx, blobRef)
+ } else {
+ // Otherwise, claim success, and report that the fetch of this blob is complete.
+ ffd.fetchesRemaining--
+ if ffd.fetchesRemaining < 0 {
+ panic("finished more fetches than were started")
+ }
+ if ffd.fetchesRemaining == 0 {
+ ffd.noFetchesRemaining.Broadcast()
+ }
+ }
+ ffd.mu.Unlock()
+
+ return err
+}
+
+// TestBlobFetch() is a unit test for the blob fetcher, using fake code to actually fetch blobs.
+func TestBlobFetchSimple(t *testing.T) {
+ svc := createService(t)
+
+ ctx, bfShutdown := context.RootContext()
+ defer bfShutdown()
+
+ var ss *syncService = svc.sync
+ var bst blob.BlobStore = ss.bst
+
+ const threadCount = 10 // number of threads in BlobFetcher
+ bf := NewBlobFetcher(ctx, threadCount)
+ ffd := fakeFetchData{
+ t: t,
+ delay: 500 * time.Millisecond, // a short delay so test doesn't run too long.
+ errorsOn: make(map[wire.BlobRef]int),
+ }
+
+ start := time.Now()
+
+ // Create fetchCount BlobRefs and fake a fetch on each.
+ // In this initial run, no errors are generated.
+ const fetchCount = 100
+ var blobRefsFetched []wire.BlobRef
+ for i := 0; i != fetchCount; i++ {
+ var writer blob.BlobWriter
+ var err error
+ writer, err = bst.NewBlobWriter(ctx, "")
+ if err != nil {
+ t.Fatalf("can't make a blob writer: %v\n", err)
+ }
+ blobRef := wire.BlobRef(writer.Name())
+ blobRefsFetched = append(blobRefsFetched, blobRef)
+ writer.CloseWithoutFinalize()
+
+ ffd.mu.Lock()
+ ffd.fetchesRemaining++
+ ffd.mu.Unlock()
+ bf.StartFetchingBlob(svc.sync.bst, blobRef, &ffd, fakeBlobFetchFunc)
+ }
+
+ // Wait until all fetching is done. The test would deadlock here if
+ // our fetch function was not called the right number of times.
+ ffd.mu.Lock()
+ for ffd.fetchesRemaining != 0 {
+ ffd.noFetchesRemaining.Wait(&ffd.mu)
+ }
+ ffd.mu.Unlock()
+ end := time.Now()
+
+ // Check that the fetching took about the correct amount of time,
+ // given the fetch delays and the amount of concurrency requested.
+ var elapsed time.Duration = end.Sub(start)
+ var expectedElapsed time.Duration = (ffd.delay * fetchCount) / threadCount
+ var expectedMinElapsed time.Duration = expectedElapsed / 2
+ var expectedMaxElapsed time.Duration = expectedElapsed * 2
+ if elapsed < expectedMinElapsed {
+ t.Errorf("BlobFetcher completed in %v, expected at least %v", elapsed, expectedMinElapsed)
+ }
+ if elapsed > expectedMaxElapsed {
+ t.Errorf("BlobFetcher completed in %v, expected at most %v", elapsed, expectedMaxElapsed)
+ }
+
+ // Now run the test again, but introduce errors on the first fetch of
+ // each blob, and issue duplicate requests to fetch each blob.
+ start = time.Now()
+ ffd.mu.Lock()
+ for i := 0; i != fetchCount; i++ {
+ ffd.fetchesRemaining++
+ ffd.errorsOn[blobRefsFetched[i]] = 1
+ for j := 0; j != 3; j++ {
+ // Issue duplicate requests; the duplicates will be ignored.
+ bf.StartFetchingBlob(svc.sync.bst, blobRefsFetched[i], &ffd, fakeBlobFetchFunc)
+ }
+ }
+ // Wait for fetches to complete. We would deadlock here if our fetch
+ // function didn't ultimately return true for each blob. The fake test
+ // function would panic() if it was called too many times, due to the
+ // duplicate requests.
+ for ffd.fetchesRemaining != 0 {
+ ffd.noFetchesRemaining.Wait(&ffd.mu)
+ }
+ ffd.mu.Unlock()
+
+ // Check that the fetching took about the correct amount of time,
+ // given the fetch delays and the amount of concurrency requested.
+ end = time.Now()
+ elapsed = end.Sub(start)
+ const retryDelay = 1000 * time.Millisecond // min time for first retry, with current parameters.
+ expectedElapsed = ((2 * ffd.delay * fetchCount) / threadCount) + retryDelay
+ expectedMinElapsed = expectedElapsed / 2
+ expectedMaxElapsed = expectedElapsed * 2
+ if elapsed < expectedMinElapsed {
+ t.Errorf("BlobFetcher completed in %v, expected at least %v", elapsed, expectedMinElapsed)
+ }
+ if elapsed > expectedMaxElapsed {
+ t.Errorf("BlobFetcher completed in %v, expected at most %v", elapsed, expectedMaxElapsed)
+ }
+
+ // Shut everything down.
+ bfShutdown()
+ bf.WaitForExit()
+ svc.shutdown()
+}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 20eabc5..3d7e4fe 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -234,6 +234,9 @@
// Start the discovery service thread to listen to neighborhood updates.
go s.discoverNeighborhood(ctx)
+ // Start a blob fetcher.
+ go ServerBlobFetcher(ctx, s)
+
return s, nil
}