blob: c2ac8932710aa16acf3860251a747b2f9ff93d0d [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Example code for transferring a blob from one device to another.
// See the simulateResumption constant to choose whether to simulate a full
// transfer or a resumed one.
package localblobstore_test
import "bytes"
import "io"
import "io/ioutil"
import "math/rand"
import "os"
import "testing"
import "v.io/v23/context"
import "v.io/v23/verror"
import "v.io/x/ref/services/syncbase/localblobstore"
import "v.io/x/ref/services/syncbase/localblobstore/fs_cablobstore"
import "v.io/x/ref/services/syncbase/store"
import "v.io/x/ref/test"
import _ "v.io/x/ref/runtime/factories/generic"
// These constants affect the type of transfer simulated by blobTransfer()
const (
simulateSimpleTransfer = 0 // the receiver has none of the blob.
simulateResumption = 1 // the receiver has an unfinalized prefix of the blob.
simulatePartialOverlap = 2 // the receiver has some of the blob, but is missing a prefix and a suffix.
)
// createBlobStore() returns a new BlobStore, and the name of the directory
// used to implement it.
func createBlobStore(ctx *context.T) (bs localblobstore.BlobStore, dirName string) {
var err error
if dirName, err = ioutil.TempDir("", "localblobstore_transfer_test"); err != nil {
panic(err)
}
if bs, err = fs_cablobstore.Create(ctx, store.EngineForTest, dirName); err != nil {
panic(err)
}
return bs, dirName
}
// createBlob writes a blob to bs of "count" 32kByte blocks drawn from a determinstic
// but arbitrary random stream, starting at block offset within that stream.
// Returns its name, which is "blob" if non-empty, and chosen arbitrarily otherwise.
// The blob is finalized iff "complete" is true.
func createBlob(ctx *context.T, bs localblobstore.BlobStore, blob string, complete bool, offset int, count int) string {
var bw localblobstore.BlobWriter
var err error
if bw, err = bs.NewBlobWriter(ctx, blob); err != nil {
panic(err)
}
blob = bw.Name()
var buffer [32 * 1024]byte
block := localblobstore.BlockOrFile{Block: buffer[:]}
r := rand.New(rand.NewSource(1)) // Always seed with 1 for repeatability.
for i := 0; i != offset+count; i++ {
for b := 0; b != len(buffer); b++ {
buffer[b] = byte(r.Int31n(256))
}
if i >= offset {
if err = bw.AppendBytes(block); err != nil {
panic(err)
}
}
}
if complete {
err = bw.Close()
} else {
err = bw.CloseWithoutFinalize()
}
if err != nil {
panic(err)
}
return blob
}
// A channelChunkStream turns a channel of chunk hashes into a ChunkStream.
type channelChunkStream struct {
channel <-chan []byte
ok bool
value []byte
}
// newChannelChunkStream returns a ChunkStream, given a channel containing the
// relevant chunk hashes.
func newChannelChunkStream(ch <-chan []byte) localblobstore.ChunkStream {
return &channelChunkStream{channel: ch, ok: true}
}
// The following are the standard ChunkStream methods.
func (cs *channelChunkStream) Advance() bool {
if cs.ok {
cs.value, cs.ok = <-cs.channel
}
return cs.ok
}
func (cs *channelChunkStream) Value(buf []byte) []byte { return cs.value }
func (cs *channelChunkStream) Err() error { return nil }
func (cs *channelChunkStream) Cancel() {}
// blobTransfer() demonstrates how to transfer a blob incrementally
// from one device's blob store to another. In this code, the communication
// between sender and receiver is modelled with Go channels.
// simulate affects what bytes the receiver already has when performing a transfer, see the constants
// simulateSimpleTransfer, simulateResumption, simulatePartialOverlap, above.
// The number of chunks transferred is returned.
func blobTransfer(ctx *context.T, simulate int) int {
// ----------------------------------------------
// Channels used to send chunk hashes to receiver always end in
// ToSender or ToReceiver.
type blobData struct {
name string
size int64
checksum []byte
}
blobDataToReceiver := make(chan blobData) // indicate basic data for blob
needChunksToSender := make(chan bool) // indicate receiver does not have entire blob
chunkHashesToReceiver := make(chan []byte) // for initial trasfer of chunk hashes
chunkHashesToSender := make(chan []byte) // to report which chunks receiver needs
chunksToReceiver := make(chan []byte) // to report which chunks receiver needs
sDone := make(chan bool) // closed when sender done
rDone := make(chan int) // closed when receiver done; receiver sends number of chunks transferred
// ----------------------------------------------
// The sender.
go func(ctx *context.T,
blobDataToReceiver chan<- blobData,
needChunksToSender <-chan bool,
chunkHashesToReceiver chan<- []byte,
chunkHashesToSender <-chan []byte,
chunksToReceiver chan<- []byte,
done chan<- bool) {
defer close(done)
var err error
bsS, bsSDir := createBlobStore(ctx)
defer os.RemoveAll(bsSDir)
blob := createBlob(ctx, bsS, "", true, 0, 32) // Create a 1M blob at the sender.
// 1. Send basic blob data to receiver.
var br localblobstore.BlobReader
if br, err = bsS.NewBlobReader(ctx, blob); err != nil {
panic(err)
}
blobDataToReceiver <- blobData{name: blob, size: br.Size(), checksum: br.Hash()}
br.Close()
close(blobDataToReceiver)
// 3. Get indication from receiver of whether it needs blob.
needChunks := <-needChunksToSender
if !needChunks { // Receiver has blob; done.
return
}
// 4. Send the chunk hashes to the receiver. This proceeds concurrently
// with the step below.
go func(ctx *context.T, blob string, chunkHashesToReceiver chan<- []byte) {
cs := bsS.BlobChunkStream(ctx, blob)
for cs.Advance() {
chunkHashesToReceiver <- cs.Value(nil)
}
if cs.Err() != nil {
panic(cs.Err())
}
close(chunkHashesToReceiver)
}(ctx, blob, chunkHashesToReceiver)
// 7. Get needed chunk hashes from receiver, find the relevant
// data, and send it back to the receiver.
var cbr localblobstore.BlobReader // Cached read handle on most-recent-read blob, or nil
// Given chunk hash h from chunkHashesToSender, send chunk to chunksToReceiver.
for h := range chunkHashesToSender {
loc, err := bsS.LookupChunk(ctx, h)
for err == nil && (cbr == nil || cbr.Name() != loc.BlobName) {
if cbr != nil && cbr.Name() != loc.BlobName {
cbr.Close()
cbr = nil
}
if cbr == nil {
if cbr, err = bsS.NewBlobReader(ctx, loc.BlobName); err != nil {
bsS.GC(ctx) // A partially-deleted blob may be confusing things.
loc, err = bsS.LookupChunk(ctx, h)
}
}
}
var i int = 1
var n int64
buffer := make([]byte, loc.Size) // buffer for current chunk
for n = int64(0); n != loc.Size && i != 0 && err == nil; n += int64(i) {
if i, err = cbr.ReadAt(buffer[n:loc.Size], n+loc.Offset); err == io.EOF {
err = nil // EOF is expected
}
}
if n == loc.Size { // Got chunk.
chunksToReceiver <- buffer[:loc.Size]
}
if err != nil {
break
}
}
close(chunksToReceiver)
if cbr != nil {
cbr.Close()
}
}(ctx, blobDataToReceiver, needChunksToSender, chunkHashesToReceiver, chunkHashesToSender, chunksToReceiver, sDone)
// ----------------------------------------------
// The receiver.
go func(ctx *context.T,
blobDataToReceiver <-chan blobData,
needChunksToSender chan<- bool,
chunkHashesToReceiver <-chan []byte,
chunkHashesToSender chan<- []byte,
chunksToReceiver <-chan []byte,
done chan<- int) {
defer close(done)
var err error
bsR, bsRDir := createBlobStore(ctx)
defer os.RemoveAll(bsRDir)
// 2. Receive basic blob data from sender.
blobInfo := <-blobDataToReceiver
switch simulate {
case simulateSimpleTransfer:
// Nothing to do.
case simulateResumption:
// Write a fraction of the (unfinalized) blob on the receiving side
// to check that the transfer process can resume a partial blob.
createBlob(ctx, bsR, blobInfo.name, false, 0, 10)
case simulatePartialOverlap:
// Write a blob with a different name that contains some of the same data.
// The prefix and suffix of the full blob will have to be transferred.
createBlob(ctx, bsR, "", true, 1, 10)
}
// 3. Tell sender whether the recevier already has the complete
// blob.
needChunks := true
var br localblobstore.BlobReader
if br, err = bsR.NewBlobReader(ctx, blobInfo.name); err == nil {
if br.IsFinalized() {
if len(br.Hash()) == len(blobInfo.checksum) && bytes.Compare(br.Hash(), blobInfo.checksum) != 0 {
panic("receiver has a finalized blob with same name but different hash")
}
needChunks = false // The receiver already has the blob.
}
br.Close()
}
needChunksToSender <- needChunks
close(needChunksToSender)
if !needChunks { // Receiver has blob; done.
return
}
// 5. Receive the chunk hashes from the sender, and turn them
// into a recipe.
cs := newChannelChunkStream(chunkHashesToReceiver)
rs := bsR.RecipeStreamFromChunkStream(ctx, cs)
// 6. The following thread sends the chunk hashes that the
// receiver does not have to the sender. It also makes
// a duplicate of the stream on the channel rsCopy. The
// buffering in rsCopy allows the receiver to put several
// chunks into a fragment.
rsCopy := make(chan localblobstore.RecipeStep, 100) // A buffered copy of the rs stream.
go func(ctx *context.T, rs localblobstore.RecipeStream, rsCopy chan<- localblobstore.RecipeStep, chunkHashesToSender chan<- []byte) {
for rs.Advance() {
step := rs.Value()
if step.Chunk != nil { // Data must be fetched from sender.
chunkHashesToSender <- step.Chunk
}
rsCopy <- step
}
close(chunkHashesToSender)
close(rsCopy)
}(ctx, rs, rsCopy, chunkHashesToSender)
// 8. The following thread splices the chunks from the sender
// (on chunksToReceiver) into the recipe stream copy
// (rsCopy) to generate a full recipe stream (rsFull) in
// which chunks are actual data, rather than just hashes.
rsFull := make(chan localblobstore.RecipeStep) // A recipe stream containing chunk data, not just hashes.
go func(ctx *context.T, rsCopy <-chan localblobstore.RecipeStep, chunksToReceiver <-chan []byte, rsFull chan<- localblobstore.RecipeStep) {
var ok bool
for step := range rsCopy {
if step.Chunk != nil { // Data must be fetched from sender.
if step.Chunk, ok = <-chunksToReceiver; !ok {
break
}
}
rsFull <- step
}
close(rsFull)
}(ctx, rsCopy, chunksToReceiver, rsFull)
// 9. Write the blob using the recipe.
var chunksTransferred int
const fragmentThreshold = 1024 * 1024 // Try to write on-disc fragments fragments at least this big.
var ignoreBytes int64
var bw localblobstore.BlobWriter
if bw, err = bsR.ResumeBlobWriter(ctx, blobInfo.name); err != nil {
bw, err = bsR.NewBlobWriter(ctx, blobInfo.name)
} else {
ignoreBytes = bw.Size()
}
if err == nil {
var fragment []localblobstore.BlockOrFile
var fragmentSize int64
for step := range rsFull {
if step.Chunk == nil { // Data can be obtained from local blob.
if ignoreBytes >= step.Size { // Ignore chunks we already have.
ignoreBytes -= step.Size
} else {
if err == nil && len(fragment) != 0 {
err = bw.AppendBytes(fragment...)
fragment = fragment[:0]
fragmentSize = 0
}
err = bw.AppendBlob(step.Blob, step.Size-ignoreBytes, step.Offset+ignoreBytes)
ignoreBytes = 0
}
} else if ignoreBytes >= int64(len(step.Chunk)) { // Ignore chunks we already have.
ignoreBytes -= int64(len(step.Chunk))
} else { // Data is from a chunk send by the sender.
chunksTransferred++
fragment = append(fragment, localblobstore.BlockOrFile{Block: step.Chunk[ignoreBytes:]})
fragmentSize += int64(len(step.Chunk)) - ignoreBytes
ignoreBytes = 0
if fragmentSize > fragmentThreshold {
err = bw.AppendBytes(fragment...)
fragment = fragment[:0]
fragmentSize = 0
}
}
if err != nil {
break
}
}
if err == nil && len(fragment) != 0 {
err = bw.AppendBytes(fragment...)
}
if err2 := bw.Close(); err == nil {
err = err2
}
if err != nil {
panic(verror.DebugString(err))
}
}
// 10. Verify that the blob was written correctly.
if br, err = bsR.NewBlobReader(ctx, blobInfo.name); err != nil {
panic(err)
}
if br.Size() != blobInfo.size {
panic("transferred blob has wrong size")
}
if len(br.Hash()) != len(blobInfo.checksum) || bytes.Compare(br.Hash(), blobInfo.checksum) != 0 {
panic("transferred blob has wrong checksum")
}
if err = br.Close(); err != nil {
panic(err)
}
done <- chunksTransferred
}(ctx, blobDataToReceiver, needChunksToSender, chunkHashesToReceiver, chunkHashesToSender, chunksToReceiver, rDone)
// ----------------------------------------------
// Wait for sender and receiver to finish, and return the number of chunks the receiver transferred.
_ = <-sDone
return <-rDone
}
func TestSimpleTransfer(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
const expectedChunksTransferred = 936
var chunksTransferred int = blobTransfer(ctx, simulateSimpleTransfer)
if chunksTransferred != expectedChunksTransferred {
t.Errorf("simple transfer, expected %d chunks transferred, got %d", expectedChunksTransferred, chunksTransferred)
}
}
func TestResumption(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
const expectedChunksTransferred = 635
var chunksTransferred int = blobTransfer(ctx, simulateResumption)
if chunksTransferred != expectedChunksTransferred {
t.Errorf("resumption transfer, expected %d chunks transferred, got %d", expectedChunksTransferred, chunksTransferred)
}
}
func TestPartialOverlap(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
const expectedChunksTransferred = 647
var chunksTransferred int = blobTransfer(ctx, simulatePartialOverlap)
if chunksTransferred != expectedChunksTransferred {
t.Errorf("partial overlap transfer, expected %d chunks transferred, got %d", expectedChunksTransferred, chunksTransferred)
}
}