blob: 99cd782924559b290de089f84f21a9ca236201e1 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// This file contains blob handling code.
//
// A summary of inter-device blob management:
//
// Syncbase implements a system of "blob ownership" to reduce the
// probability that a blob will be lost due to the loss of a mobile
// device. This works as follows.
//
// When a device that has a blob Put()s the blobref in an existing syncgroup,
// or makes a row containing a blobref syncable for the first time by creating
// a syncgroup, it assigns itself an initial number of "ownership shares" to
// the blob within that syncgroup. Usually the initial number of shares per
// blob per syncgroup is 2.
//
// If a device does not have the blob, or associates the blobref with a
// syncgroup via the creation of an overlapping syncgroup, or via syncing,
// no ownership shares are assigned. Instead, the blobref is allowed to
// propagate around the syncgroup until devices request
// the blob (note that servers always request blobs; see below).
//
// Any device with a non-zero number of ownership shares for a blob
// in a syncgroup has an obligation not to discard its copy of the
// blob, and not to reduce the number of ownership shares it has
// without first passing shares to another device within the
// syncgroup. This obligation lapses if the blob ceases to be
// accessible via the syncgroup.
//
// When a device acquires a copy of a blob, it may elect to take on
// the burden of one or more ownership shares in each syncgroup in
// which the blob is accessible. To decide whether to transfer a
// share for a syncgroup, devices compare their "ownership
// priorities" within the syncgroup. The priorities are defined in
// terms of metrics suggestive of the "sync distance" of the device
// from the "servers" for the relevant syncgroup.
//
// A device that is marked as a "server" within the syncgroup is
// expected:
// - to acquire and keep copies of every blob accessible within the
// syncgroup,
// - to make these copies available to other devices on demand, and
// - to use techniques such as replication and backup to make their
// loss unlikely.
// Thus, servers have the highest ownership priority. Other
// devices have lower ownership priorities according to their
// presumed ability to transfer the blob to the servers. Each server
// periodically attempts to fetch any blob that it does not yet have that
// is referenced by the syncgroup.
//
// If a syncgroup has no servers, it is more likely that the loss of a device
// will lead to the loss of a blob, and that other devices will fail to find
// the blob should they request it. This is because servers are more likely to
// be available and to store blobs reliably, and because the location hints
// work best when well-known servers exist.
//
// Each device computes its ownership priority by keeping track of various
// numbers in the SgPriority struct in
// v.io/x/ref/services/syncbase/server/interfaces/sync_types.vdl
// - DevType indicates the type of device.
// - Distance is the mean distance (in "hops") from the servers, maintained via
// decaying average.
// - ServerTime is the time on a server at which data created there has reached
// this device. Times within timeSlop (syncgroup.go) are considered equivalent.
//
// Device types are manually assigned to devices, perhaps by default by the
// device manufacturer. The set is
// BlobDevTypeServer (accummulates blobs when possible, like a cloud server),
// BlobDevTypeNormal,
// BlobDevTypeLeaf (sheds blobs when possible, like a camera),
// defined in v.io/v23/services/syncbase/nosql/types.vdl
//
// Priorities can be compared via sgPriorityLowerThan() in syncgroup.go.
//
// Each time a device "local" receives data from a peer "remote", the remote
// device sends its Priority value, and the local device then adjusts its
// priority. See updateSyncgroupPriority() in syncgroup.go
//
// The rules for transferring an ownership share when a device receives a copy
// of a blob are:
// - A server always sets its ownership shares to non-zero for any blob
// accessible within its syncgroups. That is, a server never
// unilaterally deletes a reachable blob.
// - A non-server may transfer all its ownership shares to a server, even if
// that server already has shares. (It can choose to transfer fewer; if it
// does so, the excess shares will likely be transferred in subsequent
// communications.)
// - A non-server with at least one share may transfer one share to a
// non-server with no shares and a higher priority.
//
// When a device syncs with a device to which it would transfer some ownership
// shares, it informs that device that it should take a copy of the blob and
// accept the share. The recipient then tells the first device that it may
// decrement its share count.
//
// Example: Suppose four devices are in a syncgroup:
// - a server S (BlobDevTypeServer)
// - a laptop L (BlobDevTypeNormal)
// - a tablet T (BlobDevTypeNormal)
// - a camera C that communicates with L and T, but not directly with S.
// (BlobDevTypeLeaf)
// C's images are turned into blobs accessible via the syncgroup.
//
// When online, L and T can communicate directly with the server, so
// L.ServerTime and T.ServerTime will be recent, while L.Distance and
// T.Distance will both be close to 1. C.ServerTime will be somewhat less
// recent, and C.Distance will be close to 2. The priorities will satisfy:
// S > T > C and S > L > C. Sometimes T > L and sometimes the reverse,
// depending on when they sync with S.
//
// If L, T, and C go offline (imagine taking a vacation cruise with them),
// their ServerTime values will remain unchanged from when they were last
// online. One of L and T will have a higher priority; its Distance metric
// will remain constant, while the other two devices' distances will increase
// somewhat. The relative priority ordering of the devices will then stay the
// same until contact is re-stablished with S.
//
// Suppose C takes a picture P giving it two shares, and syncs first with L.
// L will accept one share, and refuse to accept more while its share count is
// non-zero. L's share may be taken by T or by S (if L is on-line). L could
// then take C's second share. Alternatively, C might transfer its second
// share to T, if T has not received a share from L. At this point, C is free
// to delete its copy of P, knowing that either S has the blob, or at least two
// other devices do.
//
// -----------
//
// An additional mechanism of location hints, called Signposts, exists to allow
// devices to find blobs whose blobrefs they have received.
//
// A Signpost consists of two lists:
// - a list of syncgroups whose data mention the blobref
// - a list of devices that have had the blob, or at least been instrumental
// in forwarding its blobref.
// When a device receives a blobref, it constructs a Signpost that contains the
// syncgroups and peer that the blobref arrived through, plus the peer that
// inserted the blobref into the structured data (known through the log record)
//
// When a device requests a blob that the callee does not have, the callee may
// return its Signpost for the blob. The caller then merges this into its own
// Signpost. In this way, improved hints may propagate through the system.
// The list of devices in a Signpost is restricted in length so that Signposts
// do not become too large (see mergeSignposts()). Ultimately, devices are
// expected to request blobs from the server machines within the relevant
// syncgroups.
//
// In transferring Signposts, devices filter the lists they send to avoid
// revealing private information about syncgroups (see filterSignpost()). In
// particular, they avoid sending the name of a syncgroup to a peer that does
// not have permission to join the group unless the group is "public" (a
// property chosen at group creation). Also, device location hints are given
// only to peers who have permission to join a syncgroup to which the device
// belongs, or if the device is a server in one of the syncgroups. The
// expectation is that servers have no privacy constraints; many syncgroups and
// users may be associated with any given server.
import (
"io"
"sort"
"strings"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
"v.io/v23/vdl"
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/common"
blob "v.io/x/ref/services/syncbase/localblobstore"
_ "v.io/x/ref/services/syncbase/localblobstore/blobmap"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/store"
"v.io/x/ref/services/syncbase/store/watchable"
)
const (
chunkSize = 8 * 1024
)
////////////////////////////////////////////////////////////
// RPCs for managing blobs between Syncbase and its clients.
func (sd *syncDatabase) CreateBlob(ctx *context.T, call rpc.ServerCall) (wire.BlobRef, error) {
allowCreateBlob := []access.Tag{access.Write}
vlog.VI(2).Infof("sync: CreateBlob: begin")
defer vlog.VI(2).Infof("sync: CreateBlob: end")
if err := sd.db.CheckExists(ctx, call); err != nil {
return wire.NullBlobRef, err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowCreateBlob, sd.db.St()); err != nil {
return wire.NullBlobRef, err
}
// Get this Syncbase's blob store handle.
ss := sd.sync.(*syncService)
bst := ss.bst
writer, err := bst.NewBlobWriter(ctx, "")
if err != nil {
return wire.NullBlobRef, err
}
defer writer.CloseWithoutFinalize()
name := writer.Name()
vlog.VI(4).Infof("sync: CreateBlob: blob ref %s", name)
return wire.BlobRef(name), nil
}
func (sd *syncDatabase) PutBlob(ctx *context.T, call wire.BlobManagerPutBlobServerCall, br wire.BlobRef) error {
allowPutBlob := []access.Tag{access.Write}
vlog.VI(2).Infof("sync: PutBlob: begin br %v", br)
defer vlog.VI(2).Infof("sync: PutBlob: end br %v", br)
if err := sd.db.CheckExists(ctx, call); err != nil {
return err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowPutBlob, sd.db.St()); err != nil {
return err
}
// Get this Syncbase's blob store handle.
ss := sd.sync.(*syncService)
bst := ss.bst
writer, err := bst.ResumeBlobWriter(ctx, string(br))
if err != nil {
return err
}
defer writer.CloseWithoutFinalize()
stream := call.RecvStream()
for stream.Advance() {
item := blob.BlockOrFile{Block: stream.Value()}
if err = writer.AppendBytes(item); err != nil {
return err
}
}
return stream.Err()
}
func (sd *syncDatabase) CommitBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
allowCommitBlob := []access.Tag{access.Write}
vlog.VI(2).Infof("sync: CommitBlob: begin br %v", br)
defer vlog.VI(2).Infof("sync: CommitBlob: end br %v", br)
if err := sd.db.CheckExists(ctx, call); err != nil {
return err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowCommitBlob, sd.db.St()); err != nil {
return err
}
// Get this Syncbase's blob store handle.
ss := sd.sync.(*syncService)
bst := ss.bst
writer, err := bst.ResumeBlobWriter(ctx, string(br))
if err != nil {
return err
}
return writer.Close()
}
func (sd *syncDatabase) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (int64, error) {
allowGetBlobSize := wire.AllDatabaseTags
vlog.VI(2).Infof("sync: GetBlobSize: begin br %v", br)
defer vlog.VI(2).Infof("sync: GetBlobSize: end br %v", br)
if err := sd.db.CheckExists(ctx, call); err != nil {
return 0, err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowGetBlobSize, sd.db.St()); err != nil {
return 0, err
}
// Get this Syncbase's blob store handle.
ss := sd.sync.(*syncService)
bst := ss.bst
reader, err := bst.NewBlobReader(ctx, string(br))
if err != nil {
return 0, err
}
defer reader.Close()
return reader.Size(), nil
}
func (sd *syncDatabase) DeleteBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
allowDeleteBlob := wire.AllDatabaseTags
if err := sd.db.CheckExists(ctx, call); err != nil {
return err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowDeleteBlob, sd.db.St()); err != nil {
return err
}
return verror.NewErrNotImplemented(ctx)
}
func (sd *syncDatabase) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
allowGetBlob := wire.AllDatabaseTags
vlog.VI(2).Infof("sync: GetBlob: begin br %v", br)
defer vlog.VI(2).Infof("sync: GetBlob: end br %v", br)
if err := sd.db.CheckExists(ctx, call); err != nil {
return err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowGetBlob, sd.db.St()); err != nil {
return err
}
// First get the blob locally if available.
ss := sd.sync.(*syncService)
err := getLocalBlob(ctx, call.SendStream(), ss.bst, br, offset)
if err == nil || verror.ErrorID(err) == wire.ErrBlobNotCommitted.ID {
return err
}
return ss.fetchBlobRemote(ctx, br, nil, call, offset)
}
func (sd *syncDatabase) FetchBlob(ctx *context.T, call wire.BlobManagerFetchBlobServerCall, br wire.BlobRef, priority uint64) error {
allowFetchBlob := wire.AllDatabaseTags
vlog.VI(2).Infof("sync: FetchBlob: begin br %v", br)
defer vlog.VI(2).Infof("sync: FetchBlob: end br %v", br)
if err := sd.db.CheckExists(ctx, call); err != nil {
return err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowFetchBlob, sd.db.St()); err != nil {
return err
}
clientStream := call.SendStream()
// Check if BlobRef already exists locally.
ss := sd.sync.(*syncService)
bst := ss.bst
bReader, err := bst.NewBlobReader(ctx, string(br))
if err == nil {
finalized := bReader.IsFinalized()
bReader.Close()
if !finalized {
return wire.NewErrBlobNotCommitted(ctx)
}
clientStream.Send(wire.BlobFetchStatus{State: wire.BlobFetchStateDone})
return nil
}
// Wait for this blob's turn.
// TODO(hpucha): Implement a blob queue.
clientStream.Send(wire.BlobFetchStatus{State: wire.BlobFetchStatePending})
return ss.fetchBlobRemote(ctx, br, call, nil, 0)
}
func (sd *syncDatabase) PinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
allowPinBlob := []access.Tag{access.Write}
if err := sd.db.CheckExists(ctx, call); err != nil {
return err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowPinBlob, sd.db.St()); err != nil {
return err
}
return verror.NewErrNotImplemented(ctx)
}
func (sd *syncDatabase) UnpinBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
allowUnpinBlob := wire.AllDatabaseTags
if err := sd.db.CheckExists(ctx, call); err != nil {
return err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowUnpinBlob, sd.db.St()); err != nil {
return err
}
return verror.NewErrNotImplemented(ctx)
}
func (sd *syncDatabase) KeepBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef, rank uint64) error {
allowKeepBlob := wire.AllDatabaseTags
if err := sd.db.CheckExists(ctx, call); err != nil {
return err
}
// Check permissions on Database.
if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowKeepBlob, sd.db.St()); err != nil {
return err
}
return verror.NewErrNotImplemented(ctx)
}
////////////////////////////////////////////////////////////
// RPC for blob fetch between Syncbases.
func (s *syncService) FetchBlob(ctx *context.T, call interfaces.SyncFetchBlobServerCall, br wire.BlobRef,
remoteSgPriorities interfaces.SgPriorities) (sharesToTransfer interfaces.BlobSharesBySyncgroup, err error) {
vlog.VI(2).Infof("sync: FetchBlob: sb-sb begin br %v", br)
defer vlog.VI(2).Infof("sync: FetchBlob: sb-sb end br %v", br)
err = getLocalBlob(ctx, call.SendStream(), s.bst, br, 0)
if err == nil {
// Compute how many shares in each syncgroup this syncbase should
// request that the caller take from it.
var blobMetadata blob.BlobMetadata
// Start by computing the total shares this syncbase has in all
// syncgroups. We save time later if it has none.
var totalShares int32
if s.bst.GetBlobMetadata(ctx, br, &blobMetadata) == nil {
for _, shares := range blobMetadata.OwnerShares {
totalShares += shares
}
}
if totalShares != 0 {
// For each syncgroup, compute whether to transfer shares.
// At present, we offer only one per syncgroup, unless
// the caller is a server, and could take all of them.
// No need to filter localSgPriorities explicitly; they
// will be filtered against the remoteSgPriorities in
// the loop below.
localSgPriorities := make(interfaces.SgPriorities)
if addBlobSyncgroupPriorities(ctx, s.bst, br, localSgPriorities) == nil {
// We will request that the caller take different numbers of shares
// depending on whether it is a "server" in the relevant syncgroup.
for sgId, remoteSgPriority := range remoteSgPriorities {
localShares := blobMetadata.OwnerShares[sgId]
localSgPriority, gotLocalSgPriority := localSgPriorities[sgId]
if gotLocalSgPriority && localShares > 0 && sgPriorityLowerThan(&localSgPriority, &remoteSgPriority) {
if sharesToTransfer == nil {
sharesToTransfer = make(interfaces.BlobSharesBySyncgroup)
}
if remoteSgPriority.DevType == wire.BlobDevTypeServer {
// Caller is a server in this syncgroup----give it all the shares.
sharesToTransfer[sgId] = localShares
} else { // Caller is not a server, give it one share.
sharesToTransfer[sgId] = 1
}
}
}
}
}
}
return sharesToTransfer, err
}
func (s *syncService) HaveBlob(ctx *context.T, call rpc.ServerCall,
br wire.BlobRef) (size int64, signpost interfaces.Signpost, err error) {
vlog.VI(2).Infof("sync: HaveBlob: begin br %v", br)
defer vlog.VI(2).Infof("sync: HaveBlob: end br %v", br)
// In this routine we do not set err!=nil if the blob is unavailable.
// Instead set size==-1, and set signpost.
size = -1
if bReader, err2 := s.bst.NewBlobReader(ctx, string(br)); err2 == nil {
if bReader.IsFinalized() { // found blob, and it's complete
size = bReader.Size()
}
bReader.Close()
}
if size == -1 { // can't find blob; try to return signpost
err = s.bst.GetSignpost(ctx, br, &signpost)
if err == nil {
var blessingNames []string
blessingNames, _ = security.RemoteBlessingNames(ctx, call.Security())
filterSignpost(ctx, blessingNames, s, &signpost)
}
}
return size, signpost, err
}
func (s *syncService) FetchBlobRecipe(ctx *context.T, call interfaces.SyncFetchBlobRecipeServerCall,
br wire.BlobRef, callerName string, remoteSgPriorities interfaces.SgPriorities) (interfaces.BlobSharesBySyncgroup, error) {
return nil, verror.NewErrNotImplemented(ctx)
}
func (s *syncService) FetchChunks(ctx *context.T, call interfaces.SyncFetchChunksServerCall) error {
return verror.NewErrNotImplemented(ctx)
}
// RequestTakeBlob tells the server that client wishes the server to take some
// ownership shares for the blob br.
func (s *syncService) RequestTakeBlob(ctx *context.T, call rpc.ServerCall,
br wire.BlobRef, callerName string, shares interfaces.BlobSharesBySyncgroup) error {
return verror.NewErrNotImplemented(ctx)
}
// AcceptedBlobOwnership tells the server that the caller has accepted
// ownership shares of the blob, detailed in acceptedSharesBySyncgroup.
// TODO(m3b): need to pass mttables?
func (s *syncService) AcceptedBlobOwnership(ctx *context.T, call rpc.ServerCall, br wire.BlobRef, callerName string,
acceptedSharesBySyncgroup interfaces.BlobSharesBySyncgroup) (serverName string, keepingBlob bool, err error) {
// TODO(m3b): Perhaps verify that the caller matches the ACL on the
// syncgroups on which it's accepting ownership shares.
// TODO(m3b): Add synchronization so that two calls to
// AcceptedBlobOwnership() or calls assigning ownership in
// processBlobRefs for the same blob won't overlap. This may cause
// shares either to be lost or gained accidentally.
var blobMetadata blob.BlobMetadata
err = s.bst.GetBlobMetadata(ctx, br, &blobMetadata)
var totalShares int32
var mutatedBlobMetdata bool
if err == nil {
// Get the syncgroups associated with this blob into the sgSet, sgs.
var sgs sgSet = make(sgSet)
for groupId := range blobMetadata.OwnerShares {
sgs[groupId] = struct{}{}
}
// Get the list of syncgroups in sgs for which callerName is a server in
// sgs.
var serverSgsForCaller sgSet = s.syncgroupsWithServer(ctx, wire.Id{}, callerName, sgs)
// For each syncgroup for which the client will accept some
// shares, decrement our ownership count. Keep track of how
// many shares this syncbase has kept.
for groupId, gotShares := range blobMetadata.OwnerShares {
acceptedShares := acceptedSharesBySyncgroup[groupId]
if acceptedShares > 0 && gotShares > 0 {
if _, callerIsServer := serverSgsForCaller[groupId]; !callerIsServer {
acceptedShares = 1 // callerName not a server; give it only one share
} // else callerName is a server in this group; it can take all the shares.
if acceptedShares >= gotShares {
gotShares = 0
delete(blobMetadata.OwnerShares, groupId) // Last share taken.
} else { // Otherwise, the caller may not take our last share.
gotShares -= acceptedShares
blobMetadata.OwnerShares[groupId] = gotShares
}
mutatedBlobMetdata = true
}
totalShares += gotShares
}
if mutatedBlobMetdata {
err = s.bst.SetBlobMetadata(ctx, br, &blobMetadata)
}
if mutatedBlobMetdata && err == nil && totalShares == 0 {
// This device successfully reduced its total shares to zero,
// and may therefore discard the blob. The device that just
// accepted the shares will keep it, so add that device to the
// Signpost.
newLocData := peerLocationData(len(serverSgsForCaller) != 0, false)
sp := interfaces.Signpost{Locations: interfaces.PeerToLocationDataMap{callerName: newLocData}}
s.addToBlobSignpost(ctx, br, &sp)
}
}
// TODO(m3b): return mttables, as well as just name of syncbase?
return s.name, err == nil && totalShares > 0, err
}
////////////////////////////////////////////////////////////
// Helpers.
type byteStream interface {
Send(item []byte) error
}
// getLocalBlob looks for a blob in the local store and, if found, reads the
// blob and sends it to the client. If the blob is found, it starts reading it
// from the given offset and sends its bytes into the client stream.
func getLocalBlob(ctx *context.T, stream byteStream, bst blob.BlobStore, br wire.BlobRef, offset int64) error {
vlog.VI(4).Infof("sync: getLocalBlob: begin br %v, offset %v", br, offset)
defer vlog.VI(4).Infof("sync: getLocalBlob: end br %v, offset %v", br, offset)
reader, err := bst.NewBlobReader(ctx, string(br))
if err != nil {
return err
}
defer reader.Close()
if !reader.IsFinalized() {
return wire.NewErrBlobNotCommitted(ctx)
}
buf := make([]byte, chunkSize)
for {
nbytes, err := reader.ReadAt(buf, offset)
if err != nil && err != io.EOF {
return err
}
if nbytes <= 0 {
break
}
offset += int64(nbytes)
stream.Send(buf[:nbytes])
if err == io.EOF {
break
}
}
return nil
}
func (s *syncService) fetchBlobRemote(ctx *context.T, br wire.BlobRef, statusCall wire.BlobManagerFetchBlobServerCall, dataCall wire.BlobManagerGetBlobServerCall, offset int64) error {
vlog.VI(4).Infof("sync: fetchBlobRemote: begin br %v, offset %v", br, offset)
defer vlog.VI(4).Infof("sync: fetchBlobRemote: end br %v, offset %v", br, offset)
// TODO(m3b): If this is called concurrently on the same blobref, we'll do redundant work.
// We might also transfer too many ownership shares.
var sendStatus, sendData bool
var statusStream interface {
Send(item wire.BlobFetchStatus) error
}
var dataStream interface {
Send(item []byte) error
}
if statusCall != nil {
sendStatus = true
statusStream = statusCall.SendStream()
}
if dataCall != nil {
sendData = true
dataStream = dataCall.SendStream()
}
if sendStatus {
// Start blob source discovery.
statusStream.Send(wire.BlobFetchStatus{State: wire.BlobFetchStateLocating})
}
// Locate blob.
peer, size, err := s.locateBlob(ctx, br)
if err != nil {
return err
}
// Start blob fetching.
status := wire.BlobFetchStatus{State: wire.BlobFetchStateFetching, Total: size}
if sendStatus {
statusStream.Send(status)
}
bst := s.bst
bWriter, err := bst.NewBlobWriter(ctx, string(br))
if err != nil {
return err
}
// Get the syncgroup priorities for the blob that the peer is permitted
// to know about.
sgPriorities := make(interfaces.SgPriorities)
var signpost interfaces.Signpost
var blessingNames []string
if s.bst.GetSignpost(ctx, br, &signpost) == nil {
blessingNames, err = getPeerBlessingsForFetchBlob(ctx, peer)
if err == nil {
filterSignpost(ctx, blessingNames, s, &signpost)
addSyncgroupPriorities(ctx, s.bst, signpost.SgIds, sgPriorities)
}
}
c := interfaces.SyncClient(peer)
ctxPeer, cancel := context.WithRootCancel(ctx)
// Run FetchBlob(), but checking that the peer has at least the
// blessing names used above to generate the syncgroup priorities.
stream, err := c.FetchBlob(ctxPeer, br, sgPriorities,
options.ServerAuthorizer{namesAuthorizer{expNames: blessingNames}})
if err == nil {
peerStream := stream.RecvStream()
for peerStream.Advance() {
item := blob.BlockOrFile{Block: peerStream.Value()}
if err = bWriter.AppendBytes(item); err != nil {
break
}
curSize := int64(len(item.Block))
status.Received += curSize
if sendStatus {
statusStream.Send(status)
}
if sendData {
if curSize <= offset {
offset -= curSize
} else if offset != 0 {
dataStream.Send(item.Block[offset:])
offset = 0
} else {
dataStream.Send(item.Block)
}
}
}
if err != nil {
cancel()
stream.Finish()
} else {
err = peerStream.Err()
remoteSharesBySgId, terr := stream.Finish()
if err == nil {
err = terr
}
if err == nil {
// We successfully fetched the blob. Maybe
// take ownership in one or more syncgroups.
takingOwnership := make(interfaces.BlobSharesBySyncgroup)
for sgId, shares := range remoteSharesBySgId {
myPriority, havePriority := sgPriorities[sgId]
if shares > 0 && havePriority {
if myPriority.DevType != wire.BlobDevTypeServer && shares > 1 {
// Non server devices never accept more than one share.
shares = 1
}
takingOwnership[sgId] = shares
}
}
// If we are accepting ownership shares, tell the peer.
if len(takingOwnership) != 0 {
// Don't worry if the following call fails; its
// safe for this syncbase to treat
// itself as an owner even if the peer
// has not relinquished ownership.
// TODO(m3b): With the current code, a peer accepts blob ownership only if
// it retrieves the blob. This may mean that a laptop accepts some of
// the shares for an image from a camera, but if the laptop keeps a copy
// despite passing its shares to a server, it may never come back to
// accept the last share, forcing the camera to keep the blob forever.
// Among the possible fixes:
// a) accept all ownership shares (undesirable, to protect against
// loss of device accepting shares), or
// b) somehow (in signposts?) communicate to peers when the blob has
// reached a "server" so that they may unilaterally drop their shares, or
// c) (most likely) sometimes accept shares when we have none even for
// blobs we already have, triggered perhaps via the RequestTakeBlob() call.
var peerName string
var peerKeepingBlob bool
peerName, peerKeepingBlob, _ = c.AcceptedBlobOwnership(ctx, br, s.name, takingOwnership)
var blobMetadata blob.BlobMetadata
s.bst.GetBlobMetadata(ctx, br, &blobMetadata)
for sgId, shares := range takingOwnership {
blobMetadata.OwnerShares[sgId] += shares
}
s.bst.SetBlobMetadata(ctx, br, &blobMetadata)
// Remove peer from local signpost if it's not keeping blob.
if !peerKeepingBlob {
var sp interfaces.Signpost
if s.bst.GetSignpost(ctx, br, &sp) == nil {
delete(sp.Locations, peerName)
s.bst.SetSignpost(ctx, br, &sp)
}
}
}
}
cancel()
}
}
bWriter.Close()
if err != nil {
// Clean up the blob with failed download, so that it can be
// downloaded again. Ignore any error from deletion.
bst.DeleteBlob(ctx, string(br))
} else {
status := wire.BlobFetchStatus{State: wire.BlobFetchStateDone}
if sendStatus {
statusStream.Send(status)
}
}
return err
}
// getPeerBlessingsForFetchBlob returns the list of blessing names for
// the given peer by invoking a null FetchBlob call on that peer.
func getPeerBlessingsForFetchBlob(ctx *context.T, peer string) (blessingNames []string, err error) {
var call rpc.ClientCall
call, err = v23.GetClient(ctx).StartCall(ctx, peer, "FetchBlob",
[]interface{}{wire.BlobRef(""), interfaces.SgPriorities{}})
if err == nil {
blessingNames, _ = call.RemoteBlessings()
call.Finish()
}
return blessingNames, err
}
// filterSignpost removes from Signpost signpost any information that cannot be
// given to an endpoint with blessings blessingNames[], or wouldn't be useful.
func filterSignpost(ctx *context.T, blessingNames []string, s *syncService, signpost *interfaces.Signpost) {
keepPeer := make(map[string]bool) // Location hints to keep.
s.forEachDatabaseStore(ctx, func(dbId wire.Id, st *watchable.Store) bool {
// For each database, fetch its syncgroup data entries by scanning their
// prefix range. Use a database snapshot for the scan.
snapshot := st.NewSnapshot()
defer snapshot.Abort()
forEachSyncgroup(snapshot, func(gid interfaces.GroupId, sg *interfaces.Syncgroup) bool {
_, sgIsPresent := signpost.SgIds[gid]
if sgIsPresent {
// Reveal a hinted syncgroup only if not private, or
// the caller has permission to join it.
isVisible := !sg.Spec.IsPrivate || isAuthorizedForTag(sg.Spec.Perms, access.Read, blessingNames)
if !isVisible { // Otherwise omit the syncgroup.
delete(signpost.SgIds, gid)
}
// Reveal a hinted location only if either:
// - the location is a public server (marked in the Signpost), or
// - the location is in a hinted syncgroup, and either is a server,
// or that syncgroup is being revealed to the caller.
for peer := range signpost.Locations {
if signpost.Locations[peer].IsServer {
keepPeer[peer] = true
} else {
sgMember, joinerInSg := sg.Joiners[peer]
if joinerInSg && (isVisible || sgMember.MemberInfo.BlobDevType == byte(wire.BlobDevTypeServer)) {
keepPeer[peer] = true
}
}
}
}
return false // from forEachSyncgroup closure
})
return false // from forEachDatabaseStore closure
})
for peer := range signpost.Locations {
if !keepPeer[peer] {
delete(signpost.Locations, peer)
}
}
}
// addBlobSyncgroupPriorities inserts into map sgPriMap the syncgroup
// priorities for the syncgroups in blob br's Signpost.
// This routine does not filter the information---this is done by the calling routine.
func addBlobSyncgroupPriorities(ctx *context.T, bst blob.BlobStore, br wire.BlobRef, sgPriMap interfaces.SgPriorities) error {
var signpost interfaces.Signpost
err := bst.GetSignpost(ctx, br, &signpost)
if err == nil {
return addSyncgroupPriorities(ctx, bst, signpost.SgIds, sgPriMap)
}
return err
}
// A peerAndLocData is a pair (peer, interfaces.LocationData),
// which represent the entries in an interfaces.PeerToLocationDataMap.
type peerAndLocData struct {
peer string
locData interfaces.LocationData
}
// A peerAndLocDataVector is a slice of peerAndLocData.
// It is used to sort the list, to allow the list to be pruned.
type peerAndLocDataVector []peerAndLocData
// The following functions implement sort.Interface for peerAndLocDataVector.
// It's used to keep the top few values for each Signpost.
func (v peerAndLocDataVector) Len() int { return len(v) }
func (v peerAndLocDataVector) Swap(i int, j int) { v[i], v[j] = v[j], v[i] }
func (v peerAndLocDataVector) Less(i int, j int) bool {
if v[i].locData.IsServer && !v[j].locData.IsServer { // Prefer to keep servers.
return true
}
if v[i].locData.IsProxy && !v[j].locData.IsProxy { // Prefer to keep proxies.
return true
}
// Prefer to keep entries with later timestamps.
return v[j].locData.WhenSeen.Before(v[i].locData.WhenSeen)
}
// mergeSignposts merges data from a source Signpost into a target Spignpost.
func mergeSignposts(targetSp *interfaces.Signpost, sourceSp *interfaces.Signpost) {
// Target's maps exist because GetSignpost() ensures they do.
if targetSp.Locations == nil || targetSp.SgIds == nil {
panic("mergeSignposts called with targetSp with nil map")
}
// Merge the source data into the target.
for sgId := range sourceSp.SgIds {
targetSp.SgIds[sgId] = struct{}{}
}
for peer, sourceLocData := range sourceSp.Locations {
if targetLocData, merging := targetSp.Locations[peer]; !merging {
targetSp.Locations[peer] = sourceLocData
} else if targetLocData.WhenSeen.Before(sourceLocData.WhenSeen) {
targetSp.Locations[peer] = sourceLocData
}
}
// If there are too many locations in the target Signpost, trim it.
if len(targetSp.Locations) > maxLocationsInSignpost {
locList := make(peerAndLocDataVector, 0, len(targetSp.Locations))
for peer, locData := range targetSp.Locations {
locList = append(locList, peerAndLocData{peer: peer, locData: locData})
}
sort.Sort(locList) // Sort by WhenSeen timestamp.
for i := maxLocationsInSignpost; i != len(locList); i++ {
delete(targetSp.Locations, locList[i].peer)
}
}
}
// TODO(hpucha): Add syncgroup driven blob discovery.
func (s *syncService) locateBlob(ctx *context.T, br wire.BlobRef) (string, int64, error) {
vlog.VI(4).Infof("sync: locateBlob: begin br %v", br)
defer vlog.VI(4).Infof("sync: locateBlob: end br %v", br)
var sp interfaces.Signpost
err := s.bst.GetSignpost(ctx, br, &sp)
if err != nil {
return "", 0, err
}
var updatedSp bool // whether "sp" has been updated since being fetched
// Search for blob amongst the Locations in the Signpost.
// Move list of peers into a slice so that we can extend iteration as
// more hints are found. Never look at more than maxLocationsInSignpost hints.
var locationList peerAndLocDataVector
locationMap := make(map[string]bool) // Set of peers in locationList.
for p, locData := range sp.Locations {
locationList = append(locationList, peerAndLocData{peer: p, locData: locData})
locationMap[p] = true
}
for i := 0; i != len(locationList) && i != maxLocationsInSignpost; i++ {
var p string = locationList[i].peer
vlog.VI(4).Infof("sync: locateBlob: attempting %s", p)
// Get the mount tables for this peer.
mtTables, err := s.getMountTables(ctx, p)
if err != nil {
continue
}
for mt := range mtTables {
absName := naming.Join(mt, p, common.SyncbaseSuffix)
c := interfaces.SyncClient(absName)
size, remoteSp, err := c.HaveBlob(ctx, br)
if size >= 0 {
if updatedSp {
s.bst.SetSignpost(ctx, br, &sp)
}
vlog.VI(4).Infof("sync: locateBlob: found blob on %s", absName)
return absName, size, nil
} else if err == nil { // no size, but remoteSp is valid.
// Add new locations to locationList so
// subsequent iterations use them.
for newPeer, locData := range remoteSp.Locations {
if !locationMap[newPeer] {
locationList = append(locationList, peerAndLocData{peer: newPeer, locData: locData})
locationMap[newPeer] = true
}
}
sort.Sort(locationList[i+1:]) // sort yet to be visited locations, so loop picks freshest one next
mergeSignposts(&sp, &remoteSp)
updatedSp = true
}
}
}
if updatedSp {
s.bst.SetSignpost(ctx, br, &sp)
}
return "", 0, verror.New(verror.ErrInternal, ctx, "blob not found")
}
func (s *syncService) getMountTables(ctx *context.T, peer string) (map[string]struct{}, error) {
mInfo := s.copyMemberInfo(ctx, peer)
return mInfo.mtTables, nil
}
func (s *syncService) addToBlobSignpost(ctx *context.T, br wire.BlobRef, sp *interfaces.Signpost) (err error) {
var curSp interfaces.Signpost
if err = s.bst.GetSignpost(ctx, br, &curSp); err == nil {
mergeSignposts(&curSp, sp)
err = s.bst.SetSignpost(ctx, br, &curSp)
} else {
err = s.bst.SetSignpost(ctx, br, sp)
}
return err
}
// syncgroupsWithServer returns an sgSet containing those syncgroups in sgs for
// which "peer" is known to be a server. If dbId is the zero value, the routine
// searches across all available dbId values.
func (s *syncService) syncgroupsWithServer(ctx *context.T, dbId wire.Id, peer string, sgs sgSet) sgSet {
// Fill in serverSgsForPeer with the list of syncgroups in which "peer" is a server.
serverSgsForPeer := make(sgSet)
s.allMembersLock.Lock()
member := s.allMembers.members[peer]
if member != nil {
var sgMemberInfoMaps map[wire.Id]sgMember = member.db2sg // All dbId entries.
if dbId != (wire.Id{}) {
// If dbId was specified, pick that one entry.
sgMemberInfoMaps = map[wire.Id]sgMember{dbId: member.db2sg[dbId]}
}
for _, sgMemberInfoMap := range sgMemberInfoMaps {
for gid := range sgs {
if int32(sgMemberInfoMap[gid].MemberInfo.BlobDevType) == wire.BlobDevTypeServer {
serverSgsForPeer[gid] = struct{}{}
}
}
}
}
s.allMembersLock.Unlock()
return serverSgsForPeer
}
// peerLocationData returns current LocationData for peer, based on
// the other arguments.
func peerLocationData(isServer bool, isProxy bool) interfaces.LocationData {
return interfaces.LocationData{
WhenSeen: time.Now(),
IsServer: isServer,
IsProxy: isProxy,
}
}
// processBlobRefs decodes the VOM-encoded value in the buffer and extracts from
// it all blob refs. For each of these blob refs, it updates the blob metadata
// to associate to it the sync peer, the source, and the matching syncgroups.
// isCreator indicates whether the current device is the likely initial creator of the blob.
// allSgPrefixes contains all the syncgroups in the db that the current device is aware of.
// sharedSgPrefixes contains only those shared with a peer device that provided this data;
// it is nil if the data was created locally.
func (s *syncService) processBlobRefs(ctx *context.T, dbId wire.Id, st store.StoreReader, peer string, isCreator bool,
allSgPrefixes map[string]sgSet, sharedSgPrefixes map[string]sgSet, m *interfaces.LogRecMetadata, rawValue *vom.RawBytes) error {
objid := m.ObjId
srcPeer := syncbaseIdToName(m.Id)
vlog.VI(4).Infof("sync: processBlobRefs: begin: objid %s, peer %s, src %s", objid, peer, srcPeer)
defer vlog.VI(4).Infof("sync: processBlobRefs: end: objid %s, peer %s, src %s", objid, peer, srcPeer)
if rawValue == nil {
return nil
}
var val *vdl.Value
if err := rawValue.ToValue(&val); err != nil {
// If we cannot decode the value, ignore blob processing and
// continue. This is fine since all stored values need not be
// vom encoded.
return nil
}
brs := extractBlobRefs(val)
if len(brs) == 0 {
return nil // no BlobRefs => nothing to do
}
// The key (objid) starts with one of the store's reserved prefixes for
// managed namespaces. Remove that prefix to be able to compare it with
// the syncgroup prefixes which are defined by the application.
appKey := common.StripFirstKeyPartOrDie(objid)
// Determine the set of syncgroups that cover this application key, both locally
// and shared with the peer.
allSgIds := make(sgSet)
for p, sgs := range allSgPrefixes {
if strings.HasPrefix(appKey, p) {
for sg := range sgs {
allSgIds[sg] = struct{}{}
}
}
}
sharedSgIds := make(sgSet)
for p, sgs := range sharedSgPrefixes {
if strings.HasPrefix(appKey, p) {
for sg := range sgs {
sharedSgIds[sg] = struct{}{}
}
}
}
// Associate the blob metadata with each blob ref. Create a separate
// copy of the syncgroup set for each blob ref.
for br := range brs {
vlog.VI(4).Infof("sync: processBlobRefs: found blobref %v, sgs %v", br, allSgIds)
sp := interfaces.Signpost{Locations: make(interfaces.PeerToLocationDataMap), SgIds: make(sgSet)}
var peerSyncgroups sgSet = s.syncgroupsWithServer(ctx, dbId, peer, allSgIds)
var srcPeerSyncgroups sgSet
sp.Locations[peer] = peerLocationData(len(peerSyncgroups) != 0, false /*not proxy*/)
if peer != srcPeer {
srcPeerSyncgroups = s.syncgroupsWithServer(ctx, dbId, srcPeer, allSgIds)
sp.Locations[srcPeer] = peerLocationData(len(srcPeerSyncgroups) != 0, false /*not proxy*/)
}
for gid := range allSgIds {
sp.SgIds[gid] = struct{}{}
}
plausibleProxy := false
if !isCreator && len(sharedSgIds) < len(allSgIds) {
// BlobRef was received via syncing and this device
// puts it in more syncgroups than the peer, so it's a
// plausible proxy.
plausibleProxy = true
} else if isCreator {
var curSp interfaces.Signpost
if err := s.bst.GetSignpost(ctx, br, &curSp); err == nil && len(curSp.SgIds) > 0 {
// BlobRef is known to be associated with some syncgroups already.
for gid := range sp.SgIds {
if _, inExistingSyncgroup := curSp.SgIds[gid]; !inExistingSyncgroup {
// BlobRef is being added to at least one syncgroup
// different from those previously known. So this
// device is a plausible proxy.
plausibleProxy = true
break
}
}
}
}
if plausibleProxy {
var selfSyncgroups sgSet
if s.name == peer {
selfSyncgroups = peerSyncgroups
} else if s.name == srcPeer {
selfSyncgroups = srcPeerSyncgroups
} else {
selfSyncgroups = s.syncgroupsWithServer(ctx, dbId, s.name, allSgIds)
}
sp.Locations[s.name] = peerLocationData(len(selfSyncgroups) != 0, true /*proxy*/)
}
if err := s.addToBlobSignpost(ctx, br, &sp); err != nil {
return err
}
}
if isCreator { // This device put this BlobRef in the syncgroups; assign ownership shares.
for br := range brs {
reader, _ := s.bst.NewBlobReader(ctx, string(br))
if reader != nil { // blob is present on device
reader.Close()
var bmd blob.BlobMetadata
var changed bool
// TODO(m3b): put a lock around the GetBlobMetadata() and the subsequent SetBlobMetadata
// to avoid losing or gaining ownership shares via concurrent updates.
if s.bst.GetBlobMetadata(ctx, br, &bmd) != nil { // no BlobMetadata yet available
bmd.Referenced = time.Now()
bmd.Accessed = bmd.Referenced
changed = true
}
// Set the initial ownership shares for each syncgroup for which it's not yet set.
for gid := range allSgIds {
if _, isPresent := bmd.OwnerShares[gid]; !isPresent {
bmd.OwnerShares[gid] = initialBlobOwnerShares
changed = true
}
}
if changed {
if err := s.bst.SetBlobMetadata(ctx, br, &bmd); err != nil {
return err
}
}
}
}
}
return nil
}
// extractBlobRefs traverses a VDL value and extracts blob refs from it.
func extractBlobRefs(val *vdl.Value) map[wire.BlobRef]struct{} {
brs := make(map[wire.BlobRef]struct{})
extractBlobRefsInternal(val, brs)
return brs
}
// extractBlobRefsInternal traverses a VDL value recursively and extracts blob
// refs from it. The blob refs are accumulated in the given map of blob refs.
// The function returns true if the data may contain blob refs, which means it
// either contains blob refs or contains dynamic data types (VDL union or any)
// which may in some instances contain blob refs. Otherwise the function
// returns false to indicate that the data definitely cannot contain blob refs.
func extractBlobRefsInternal(val *vdl.Value, brs map[wire.BlobRef]struct{}) bool {
mayContain := false
if val != nil {
switch val.Kind() {
case vdl.String:
// Could be a BlobRef.
var br wire.BlobRef
if val.Type() == vdl.TypeOf(br) {
mayContain = true
if b := wire.BlobRef(val.RawString()); b != wire.NullBlobRef {
brs[b] = struct{}{}
}
}
case vdl.Struct:
for i := 0; i < val.Type().NumField(); i++ {
if extractBlobRefsInternal(val.StructField(i), brs) {
mayContain = true
}
}
case vdl.Array, vdl.List:
for i := 0; i < val.Len(); i++ {
if extractBlobRefsInternal(val.Index(i), brs) {
mayContain = true
} else {
// Look no further, no blob refs in the rest.
break
}
}
case vdl.Map:
lookInKey, lookInVal := true, true
for _, v := range val.Keys() {
if lookInKey {
if extractBlobRefsInternal(v, brs) {
mayContain = true
} else {
// No need to look in the keys anymore.
lookInKey = false
}
}
if lookInVal {
if extractBlobRefsInternal(val.MapIndex(v), brs) {
mayContain = true
} else {
// No need to look in values anymore.
lookInVal = false
}
}
if !lookInKey && !lookInVal {
// Look no further, no blob refs in the rest.
break
}
}
case vdl.Set:
for _, v := range val.Keys() {
if extractBlobRefsInternal(v, brs) {
mayContain = true
} else {
// Look no further, no blob refs in the rest.
break
}
}
case vdl.Union:
_, val = val.UnionField()
extractBlobRefsInternal(val, brs)
mayContain = true
case vdl.Any, vdl.Optional:
extractBlobRefsInternal(val.Elem(), brs)
mayContain = true
}
}
return mayContain
}