Merge "syncbase/vsync: * Integration test for bi-directional syncing. * Made client/servers independent principals in integration tests."
diff --git a/x/ref/services/syncbase/localblobstore/chunker/chunker.go b/x/ref/services/syncbase/localblobstore/chunker/chunker.go
index 84abf6f..cb07533 100644
--- a/x/ref/services/syncbase/localblobstore/chunker/chunker.go
+++ b/x/ref/services/syncbase/localblobstore/chunker/chunker.go
@@ -30,8 +30,17 @@
// http://www.hpl.hp.com/techreports/2005/HPL-2005-30R1.pdf
import "io"
+import "sync"
import "v.io/syncbase/x/ref/services/syncbase/localblobstore/crc64window"
+import "v.io/v23/context"
+import "v.io/v23/verror"
+
+const pkgPath = "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
+
+var (
+ errStreamCancelled = verror.Register(pkgPath+".errStreamCancelled", verror.NoRetry, "{1:}{2:} Advance() called on cancelled stream{:_}")
+)
// A Param contains the parameters for chunking.
//
@@ -69,10 +78,13 @@
// stream.
type Stream struct {
param Param // chunking parameters
+ ctx *context.T // context of creator
window *crc64window.Window // sliding window for computing the hash
buf []byte // buffer of data
rd io.Reader // source of data
err error // error from rd
+ mu sync.Mutex // protects cancelled
+ cancelled bool // whether the stream has been cancelled
bufferChunks bool // whether to buffer entire chunks
// Invariant: bufStart <= chunkStart <= chunkEnd <= bufEnd
bufStart int64 // offset in rd of first byte in buf[]
@@ -86,9 +98,10 @@
// newStream() returns a pointer to a new Stream instance, with the
// parameters in *param. This internal version of NewStream() allows the caller
// to specify via bufferChunks whether entire chunks should be buffered.
-func newStream(param *Param, rd io.Reader, bufferChunks bool) *Stream {
+func newStream(ctx *context.T, param *Param, rd io.Reader, bufferChunks bool) *Stream {
s := new(Stream)
s.param = *param
+ s.ctx = ctx
s.window = crc64window.New(crc64window.ECMA, s.param.WindowWidth)
bufSize := int64(8192)
if bufferChunks {
@@ -107,8 +120,16 @@
// NewStream() returns a pointer to a new Stream instance, with the
// parameters in *param.
-func NewStream(param *Param, rd io.Reader) *Stream {
- return newStream(param, rd, true)
+func NewStream(ctx *context.T, param *Param, rd io.Reader) *Stream {
+ return newStream(ctx, param, rd, true)
+}
+
+// isCancelled() returns whether s.Cancel() has been called.
+func (s *Stream) isCancelled() (cancelled bool) {
+ s.mu.Lock()
+ cancelled = s.cancelled
+ s.mu.Unlock()
+ return cancelled
}
// Advance() stages the next chunk so that it may be retrieved via Value().
@@ -132,7 +153,7 @@
s.bufStart = s.chunkEnd
}
// Fill buffer with data, unless error/EOF.
- for s.err == nil && s.bufEnd < s.bufStart+int64(len(s.buf)) {
+ for s.err == nil && s.bufEnd < s.bufStart+int64(len(s.buf)) && !s.isCancelled() {
var n int
n, s.err = s.rd.Read(s.buf[s.bufEnd-s.bufStart:])
s.bufEnd += int64(n)
@@ -148,7 +169,7 @@
// While not end of chunk...
for s.windowEnd != maxChunk &&
(s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) &&
- (s.windowEnd != s.bufEnd || s.err == nil) {
+ (s.windowEnd != s.bufEnd || s.err == nil) && !s.isCancelled() {
// Fill the buffer if empty, and there's more data to read.
if s.windowEnd == s.bufEnd && s.err == nil {
@@ -167,7 +188,10 @@
bufLimit = s.bufEnd
}
// Advance window until both MinChunk reached and primary boundary found.
- for s.windowEnd != bufLimit && (s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) {
+ for s.windowEnd != bufLimit &&
+ (s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) &&
+ !s.isCancelled() {
+
// Advance the window by one byte.
s.hash = s.window.Advance(s.buf[s.windowEnd-s.bufStart])
s.windowEnd++
@@ -185,7 +209,7 @@
s.chunkEnd = s.windowEnd
}
- return s.chunkStart != s.chunkEnd // We have a non-empty chunk to return.
+ return !s.isCancelled() && s.chunkStart != s.chunkEnd // We have a non-empty chunk to return.
}
// Value() returns the chunk that was staged by Advance(). May panic if
@@ -196,12 +220,26 @@
// Err() returns any error encountered by Advance(). Never blocks.
func (s *Stream) Err() (err error) {
+ s.mu.Lock()
+ if s.cancelled && (s.err == nil || s.err == io.EOF) {
+ s.err = verror.New(errStreamCancelled, s.ctx)
+ }
+ s.mu.Unlock()
if s.err != io.EOF { // Do not consider EOF to be an error.
err = s.err
}
return err
}
+// Cancel() causes the next call to Advance() to return false.
+// It should be used when the client does not wish to iterate to the end of the stream.
+// Never blocks. May be called concurrently with other method calls on s.
+func (s *Stream) Cancel() {
+ s.mu.Lock()
+ s.cancelled = true
+ s.mu.Unlock()
+}
+
// ----------------------------------
// A PosStream is just like a Stream, except that the Value() method returns only
@@ -214,9 +252,9 @@
// NewPosStream() returns a pointer to a new PosStream instance, with the
// parameters in *param.
-func NewPosStream(param *Param, rd io.Reader) *PosStream {
+func NewPosStream(ctx *context.T, param *Param, rd io.Reader) *PosStream {
ps := new(PosStream)
- ps.s = newStream(param, rd, false)
+ ps.s = newStream(ctx, param, rd, false)
return ps
}
@@ -237,3 +275,10 @@
func (ps *PosStream) Err() error {
return ps.s.Err()
}
+
+// Cancel() causes the next call to Advance() to return false.
+// It should be used when the client does not wish to iterate to the end of the stream.
+// Never blocks. May be called concurrently with other method calls on ps.
+func (ps *PosStream) Cancel() {
+ ps.s.Cancel()
+}
diff --git a/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go b/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go
index 6de3304..57c6fed 100644
--- a/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go
+++ b/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go
@@ -9,68 +9,28 @@
import "crypto/md5"
import "fmt"
import "io"
-import "math/rand"
import "testing"
import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
-
-// A RandReader contains a pointer to a rand.Read, and a size limit. Its
-// pointers implement the Read() method from io.Reader, which yields bytes
-// obtained from the random number generator.
-type RandReader struct {
- rand *rand.Rand // Source of random bytes.
- pos int // Number of bytes read.
- limit int // Max number of bytes that may be read.
- insertInterval int // If non-zero, number of bytes between insertions of zero bytes.
- eofErr error // error to be returned at the end of the stream
-}
-
-// NewRandReader() returns a new RandReader with the specified seed and size limit.
-// It yields eofErr when the end of the stream is reached.
-// If insertInterval is non-zero, a zero byte is inserted into the stream every
-// insertInterval bytes, before resuming getting bytes from the random number
-// generator.
-func NewRandReader(seed int64, limit int, insertInterval int, eofErr error) *RandReader {
- r := new(RandReader)
- r.rand = rand.New(rand.NewSource(seed))
- r.limit = limit
- r.insertInterval = insertInterval
- r.eofErr = eofErr
- return r
-}
-
-// Read() implements the io.Reader Read() method for *RandReader.
-func (r *RandReader) Read(buf []byte) (n int, err error) {
- // Generate bytes up to the end of the stream, or the end of the buffer.
- max := r.limit - r.pos
- if len(buf) < max {
- max = len(buf)
- }
- for ; n != max; n++ {
- if r.insertInterval == 0 || (r.pos%r.insertInterval) != 0 {
- buf[n] = byte(r.rand.Int31n(256))
- } else {
- buf[n] = 0
- }
- r.pos++
- }
- if r.pos == r.limit {
- err = r.eofErr
- }
- return n, err
-}
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
+import "v.io/v23/context"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
// TestChunksPartitionStream() tests that the chunker partitions its input
// stream into reasonable sized chunks, which when concatenated form the
// original stream.
func TestChunksPartitionStream(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
var err error
totalLength := 1024 * 1024
// Compute the md5 of an arbiotrary stream. We will later compare this
// with the md5 of the concanenation of chunks from an equivalent
// stream.
- r := NewRandReader(1, totalLength, 0, io.EOF)
+ r := localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF)
hStream := md5.New()
buf := make([]byte, 8192)
for err == nil {
@@ -81,12 +41,12 @@
checksumStream := hStream.Sum(nil)
// Using an equivalent stream, break it into chunks.
- r = NewRandReader(1, totalLength, 0, io.EOF)
+ r = localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF)
param := &chunker.DefaultParam
hChunked := md5.New()
length := 0
- s := chunker.NewStream(param, r)
+ s := chunker.NewStream(ctx, param, r)
for s.Advance() {
chunk := s.Value()
length += len(chunk)
@@ -116,10 +76,15 @@
// TestPosStream() tests that a PosStream leads to the same chunks as an Stream.
func TestPosStream(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
totalLength := 1024 * 1024
- s := chunker.NewStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
- ps := chunker.NewPosStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
+ s := chunker.NewStream(ctx, &chunker.DefaultParam,
+ localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF))
+ ps := chunker.NewPosStream(ctx, &chunker.DefaultParam,
+ localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF))
itReady := s.Advance()
pitReady := ps.Advance()
@@ -150,8 +115,8 @@
// chunkSums() returns a vector of md5 checksums for the chunks of the
// specified Reader, using the default chunking parameters.
-func chunkSums(r io.Reader) (sums [][md5.Size]byte) {
- s := chunker.NewStream(&chunker.DefaultParam, r)
+func chunkSums(ctx *context.T, r io.Reader) (sums [][md5.Size]byte) {
+ s := chunker.NewStream(ctx, &chunker.DefaultParam, r)
for s.Advance() {
sums = append(sums, md5.Sum(s.Value()))
}
@@ -161,14 +126,17 @@
// TestInsertions() tests the how chunk sequences differ when bytes are
// periodically inserted into a stream.
func TestInsertions(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
totalLength := 1024 * 1024
insertionInterval := 20 * 1024
bytesInserted := totalLength / insertionInterval
// Get the md5 sums of the chunks of two similar streams, where the
// second has an extra bytes every 20k bytes.
- sums0 := chunkSums(NewRandReader(1, totalLength, 0, io.EOF))
- sums1 := chunkSums(NewRandReader(1, totalLength, insertionInterval, io.EOF))
+ sums0 := chunkSums(ctx, localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF))
+ sums1 := chunkSums(ctx, localblobstore_testlib.NewRandReader(1, totalLength, insertionInterval, io.EOF))
// Iterate over chunks of second stream, counting which are in common
// with first stream. We expect to find common chunks within 10 of the
@@ -208,10 +176,13 @@
// TestError() tests the behaviour of a chunker when given an error by its
// reader.
func TestError(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
notEOF := fmt.Errorf("not EOF")
totalLength := 50 * 1024
- r := NewRandReader(1, totalLength, 0, notEOF)
- s := chunker.NewStream(&chunker.DefaultParam, r)
+ r := localblobstore_testlib.NewRandReader(1, totalLength, 0, notEOF)
+ s := chunker.NewStream(ctx, &chunker.DefaultParam, r)
length := 0
for s.Advance() {
chunk := s.Value()
diff --git a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
index f08b796..a13cf9f 100644
--- a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
+++ b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
@@ -8,6 +8,7 @@
package chunkmap
import "encoding/binary"
+import "sync"
import "v.io/syncbase/x/ref/services/syncbase/store"
import "v.io/syncbase/x/ref/services/syncbase/store/leveldb"
@@ -75,7 +76,7 @@
// A Location describes chunk's location within a blob.
type Location struct {
- Blob []byte // ID of blob
+ BlobID []byte // ID of blob
Offset int64 // byte offset of chunk within blob
Size int64 // size of chunk
}
@@ -104,8 +105,8 @@
// associated with the specified Location.
func (cm *ChunkMap) AssociateChunkWithLocation(ctx *context.T, chunk []byte, loc Location) (err error) {
// Check of expected lengths explicitly in routines that modify the database.
- if len(loc.Blob) != blobIDLen {
- err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(loc.Blob), blobIDLen)
+ if len(loc.BlobID) != blobIDLen {
+ err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(loc.BlobID), blobIDLen)
} else if len(chunk) != chunkHashLen {
err = verror.New(errBadChunkHashLen, ctx, cm.dir, len(chunk), chunkHashLen)
} else {
@@ -115,7 +116,7 @@
// Put the blob-to-chunk entry first, since it's used
// to garbage collect the other.
keyLen := copy(key[:], blobPrefix)
- keyLen += copy(key[keyLen:], loc.Blob)
+ keyLen += copy(key[keyLen:], loc.BlobID)
binary.BigEndian.PutUint64(key[keyLen:], uint64(loc.Offset))
keyLen += offsetLen
@@ -126,7 +127,7 @@
if err == nil {
keyLen = copy(key[:], chunkPrefix)
keyLen += copy(key[keyLen:], chunk)
- keyLen += copy(key[keyLen:], loc.Blob)
+ keyLen += copy(key[keyLen:], loc.BlobID)
valLen = binary.PutVarint(val[:], loc.Offset)
valLen += binary.PutVarint(val[valLen:], loc.Size)
@@ -202,12 +203,12 @@
// been deleted, the client should remove the blob from the ChunkMap using
// DeleteBlob(loc.Blob), and try again. (The client may also wish to
// arrange at some point to call GC() on the blob store.)
-func (cm *ChunkMap) LookupChunk(ctx *context.T, chunk []byte) (loc Location, err error) {
+func (cm *ChunkMap) LookupChunk(ctx *context.T, chunkHash []byte) (loc Location, err error) {
var start [maxKeyLen]byte
var limit [maxKeyLen]byte
startLen := copy(start[:], chunkPrefix)
- startLen += copy(start[startLen:], chunk)
+ startLen += copy(start[startLen:], chunkHash)
limitLen := copy(limit[:], start[:startLen])
limitLen += copy(limit[limitLen:], blobLimit)
@@ -220,13 +221,13 @@
var n int
key := s.Key(keyBuf[:])
value := s.Value(valBuf[:])
- loc.Blob = key[len(chunkPrefix)+chunkHashLen:]
+ loc.BlobID = key[len(chunkPrefix)+chunkHashLen:]
loc.Offset, n = binary.Varint(value)
if n > 0 {
loc.Size, n = binary.Varint(value[n:])
}
if n <= 0 {
- err = verror.New(errMalformedChunkEntry, ctx, cm.dir, chunk, key, value)
+ err = verror.New(errMalformedChunkEntry, ctx, cm.dir, chunkHash, key, value)
}
s.Cancel()
} else {
@@ -234,23 +235,23 @@
err = s.Err()
}
if err == nil {
- err = verror.New(errNoSuchChunk, ctx, cm.dir, chunk)
+ err = verror.New(errNoSuchChunk, ctx, cm.dir, chunkHash)
}
}
return loc, err
}
-// A BlobStream allows the client to iterate over the chunks in a blob:
-// bs := cm.NewBlobStream(ctx, blob)
-// for bs.Advance() {
-// chunkHash := bs.Value()
+// A ChunkStream allows the client to iterate over the chunks in a blob:
+// cs := cm.NewChunkStream(ctx, blob)
+// for cs.Advance() {
+// chunkHash := cs.Value()
// ...process chunkHash...
// }
-// if bs.Err() != nil {
+// if cs.Err() != nil {
// ...there was an error...
// }
-type BlobStream struct {
+type ChunkStream struct {
cm *ChunkMap
ctx *context.T
stream store.Stream
@@ -264,9 +265,9 @@
more bool // whether stream may be consulted again
}
-// NewBlobStream() returns a pointer to a new BlobStream that allows the client
+// NewChunkStream() returns a pointer to a new ChunkStream that allows the client
// to enumerate the chunk hashes in a blob, in order.
-func (cm *ChunkMap) NewBlobStream(ctx *context.T, blob []byte) *BlobStream {
+func (cm *ChunkMap) NewChunkStream(ctx *context.T, blob []byte) *ChunkStream {
var start [maxKeyLen]byte
var limit [maxKeyLen]byte
@@ -276,13 +277,13 @@
limitLen := copy(limit[:], start[:startLen])
limitLen += copy(limit[limitLen:], offsetLimit)
- bs := new(BlobStream)
- bs.cm = cm
- bs.ctx = ctx
- bs.stream = cm.st.Scan(start[:startLen], limit[:limitLen])
- bs.more = true
+ cs := new(ChunkStream)
+ cs.cm = cm
+ cs.ctx = ctx
+ cs.stream = cm.st.Scan(start[:startLen], limit[:limitLen])
+ cs.more = true
- return bs
+ return cs
}
// Advance() stages an element so the client can retrieve the chunk hash with
@@ -291,27 +292,27 @@
// Value() or Location() The client must call Cancel if it does not iterate
// through all elements (i.e. until Advance() returns false). Advance() may
// block if an element is not immediately available.
-func (bs *BlobStream) Advance() (ok bool) {
- if bs.more && bs.err == nil {
- if !bs.stream.Advance() {
- bs.err = bs.stream.Err()
- bs.more = false // no more stream, even if no error
+func (cs *ChunkStream) Advance() (ok bool) {
+ if cs.more && cs.err == nil {
+ if !cs.stream.Advance() {
+ cs.err = cs.stream.Err()
+ cs.more = false // no more stream, even if no error
} else {
- bs.key = bs.stream.Key(bs.keyBuf[:])
- bs.value = bs.stream.Value(bs.valBuf[:])
- ok = (len(bs.value) >= chunkHashLen) &&
- (len(bs.key) == len(blobPrefix)+blobIDLen+offsetLen)
+ cs.key = cs.stream.Key(cs.keyBuf[:])
+ cs.value = cs.stream.Value(cs.valBuf[:])
+ ok = (len(cs.value) >= chunkHashLen) &&
+ (len(cs.key) == len(blobPrefix)+blobIDLen+offsetLen)
if ok {
var n int
- bs.loc.Blob = make([]byte, blobIDLen)
- copy(bs.loc.Blob, bs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
- bs.loc.Offset = int64(binary.BigEndian.Uint64(bs.key[len(blobPrefix)+blobIDLen:]))
- bs.loc.Size, n = binary.Varint(bs.value[chunkHashLen:])
+ cs.loc.BlobID = make([]byte, blobIDLen)
+ copy(cs.loc.BlobID, cs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
+ cs.loc.Offset = int64(binary.BigEndian.Uint64(cs.key[len(blobPrefix)+blobIDLen:]))
+ cs.loc.Size, n = binary.Varint(cs.value[chunkHashLen:])
ok = (n > 0)
}
if !ok {
- bs.err = verror.New(errMalformedBlobEntry, bs.ctx, bs.cm.dir, bs.key, bs.value)
- bs.stream.Cancel()
+ cs.err = verror.New(errMalformedBlobEntry, cs.ctx, cs.cm.dir, cs.key, cs.value)
+ cs.stream.Cancel()
}
}
}
@@ -323,19 +324,141 @@
// enough to hold the entire value. Otherwise, a newly allocated slice will be
// returned. It is valid to pass a nil buf. Value() may panic if Advance()
// returned false or was not called at all. Value() does not block.
-func (bs *BlobStream) Value(buf []byte) (result []byte) {
+func (cs *ChunkStream) Value(buf []byte) (result []byte) {
if len(buf) < chunkHashLen {
buf = make([]byte, chunkHashLen)
}
- copy(buf, bs.value[:chunkHashLen])
+ copy(buf, cs.value[:chunkHashLen])
return buf[:chunkHashLen]
}
// Location() returns the Location associated with the chunk staged by
// Advance(). Location() may panic if Advance() returned false or was not
// called at all. Location() does not block.
-func (bs *BlobStream) Location() Location {
- return bs.loc
+func (cs *ChunkStream) Location() Location {
+ return cs.loc
+}
+
+// Err() returns a non-nil error iff the stream encountered any errors. Err()
+// does not block.
+func (cs *ChunkStream) Err() error {
+ return cs.err
+}
+
+// Cancel() notifies the stream provider that it can stop producing elements.
+// The client must call Cancel() if it does not iterate through all elements
+// (i.e. until Advance() returns false). Cancel() is idempotent and can be
+// called concurrently with a goroutine that is iterating via Advance() and
+// Value(). Cancel() causes Advance() to subsequently return false.
+// Cancel() does not block.
+func (cs *ChunkStream) Cancel() {
+ cs.stream.Cancel()
+}
+
+// A BlobStream allows the client to iterate over the blobs in ChunkMap:
+// bs := cm.NewBlobStream(ctx)
+// for bs.Advance() {
+// blobID := bs.Value()
+// ...process blobID...
+// }
+// if bs.Err() != nil {
+// ...there was an error...
+// }
+type BlobStream struct {
+ cm *ChunkMap
+ ctx *context.T
+
+ key []byte // key for current element
+ keyBuf [maxKeyLen]byte // buffer for keys
+ err error // error encountered.
+ mu sync.Mutex // protects "more", which may be written in Cancel()
+ more bool // whether stream may be consulted again
+}
+
+// keyLimit is the key for limit in store.Scan() calls within a BlobStream.
+var keyLimit []byte
+
+func init() {
+ // The limit key is the maximum length key, all ones after the blobPrefix.
+ keyLimit = make([]byte, maxKeyLen)
+ for i := copy(keyLimit, blobPrefix); i != len(keyLimit); i++ {
+ keyLimit[i] = 0xff
+ }
+}
+
+// NewBlobStream() returns a pointer to a new BlobStream that allows the client
+// to enumerate the blobs ChunkMap, in lexicographic order.
+func (cm *ChunkMap) NewBlobStream(ctx *context.T) *BlobStream {
+ bs := new(BlobStream)
+ bs.cm = cm
+ bs.ctx = ctx
+ bs.more = true
+ return bs
+}
+
+// Advance() stages an element so the client can retrieve the next blob ID with
+// Value(). Advance() returns true iff there is an element to retrieve. The
+// client must call Advance() before calling Value(). The client must call
+// Cancel if it does not iterate through all elements (i.e. until Advance()
+// returns false). Advance() may block if an element is not immediately
+// available.
+func (bs *BlobStream) Advance() (ok bool) {
+ bs.mu.Lock()
+ ok = bs.more
+ bs.mu.Unlock()
+ if ok {
+ prefixAndKeyLen := len(blobPrefix) + blobIDLen
+ // Compute the next key to search for.
+ if len(bs.key) == 0 { // First time through: anything starting with blobPrefix.
+ n := copy(bs.keyBuf[:], blobPrefix)
+ bs.key = bs.keyBuf[:n]
+ } else {
+ // Increment the blobID to form the next possible key.
+ i := prefixAndKeyLen - 1
+ for ; i != len(blobPrefix)-1 && bs.keyBuf[i] == 0xff; i-- {
+ bs.keyBuf[i] = 0
+ }
+ if i == len(blobPrefix)-1 { // End of database
+ ok = false
+ } else {
+ bs.keyBuf[i]++
+ }
+ bs.key = bs.keyBuf[:prefixAndKeyLen]
+ }
+ if ok {
+ stream := bs.cm.st.Scan(bs.key, keyLimit)
+ if !stream.Advance() {
+ bs.err = stream.Err()
+ ok = false // no more stream, even if no error
+ } else {
+ bs.key = stream.Key(bs.keyBuf[:])
+ if len(bs.key) < prefixAndKeyLen {
+ bs.err = verror.New(errMalformedBlobEntry, bs.ctx, bs.cm.dir, bs.key, stream.Value(nil))
+ ok = false
+ }
+ stream.Cancel() // We get at most one element from each stream.
+ }
+ }
+ if !ok {
+ bs.mu.Lock()
+ bs.more = false
+ bs.mu.Unlock()
+ }
+ }
+ return ok
+}
+
+// Value() returns the blob ID staged by Advance(). The returned slice may be
+// a sub-slice of buf if buf is large enough to hold the entire value.
+// Otherwise, a newly allocated slice will be returned. It is valid to pass a
+// nil buf. Value() may panic if Advance() returned false or was not called at
+// all. Value() does not block.
+func (bs *BlobStream) Value(buf []byte) (result []byte) {
+ if len(buf) < blobIDLen {
+ buf = make([]byte, blobIDLen)
+ }
+ copy(buf, bs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
+ return buf[:blobIDLen]
}
// Err() returns a non-nil error iff the stream encountered any errors. Err()
@@ -351,5 +474,7 @@
// Value(). Cancel() causes Advance() to subsequently return false.
// Cancel() does not block.
func (bs *BlobStream) Cancel() {
- bs.stream.Cancel()
+ bs.mu.Lock()
+ bs.more = false
+ bs.mu.Unlock()
}
diff --git a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
index ab961a7..b7ab2df 100644
--- a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
+++ b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
@@ -9,12 +9,12 @@
import "io/ioutil"
import "math/rand"
import "os"
+import "runtime"
import "testing"
import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap"
import "v.io/v23/context"
-// import "v.io/v23/verror"
import "v.io/x/ref/test"
import _ "v.io/x/ref/runtime/factories/generic"
@@ -27,78 +27,125 @@
return v
}
-// verifyNoBlob() tests that blob b[blobi] is not present in *cm.
-// callSite is a callsite identifier, output in all error messages.
-func verifyNoBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, callSite int) {
- bs := cm.NewBlobStream(ctx, b[blobi])
- for i := 0; bs.Advance(); i++ {
- t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: %v",
- callSite, blobi, i, bs.Value(nil))
+// verifyBlobs() tests that the blobs in *cm are those in b[], as revealed via
+// the BlobStream() interface.
+func verifyBlobs(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, b [][]byte) {
+ _, _, callerLine, _ := runtime.Caller(1)
+ seen := make([]bool, len(b)) // seen[i] == whether b[i] seen in *cm
+ bs := cm.NewBlobStream(ctx)
+ var i int
+ for i = 0; bs.Advance(); i++ {
+ blob := bs.Value(nil)
+ var j int
+ for j = 0; j != len(b) && bytes.Compare(b[j], blob) != 0; j++ {
+ }
+ if j == len(b) {
+ t.Errorf("chunkmap_test: line %d: unexpected blob %v present in ChunkMap",
+ callerLine, blob)
+ } else if seen[j] {
+ t.Errorf("chunkmap_test: line %d: blob %v seen twice in ChunkMap",
+ callerLine, blob)
+ } else {
+ seen[j] = true
+ }
+ }
+ if i != len(b) {
+ t.Errorf("chunkmap_test: line %d: found %d blobs in ChunkMap, but expected %d",
+ callerLine, i, len(b))
+ }
+ for j := range seen {
+ if !seen[j] {
+ t.Errorf("chunkmap_test: line %d: blob %v not seen un ChunkMap",
+ callerLine, b[j])
+ }
}
if bs.Err() != nil {
- t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Advance: unexpected error %v",
- callSite, blobi, bs.Err())
+ t.Errorf("chunkmap_test: line %d: BlobStream.Advance: unexpected error %v",
+ callerLine, bs.Err())
}
}
-// verifyBlob() tests that blob b[blobi] in *cm contains the expected chunks from c[].
-// Each blob is expected to have 8 chunks, 0...7, except that b[1] has c[8] instead of c[4] for chunk 4.
-// callSite is a callsite identifier, output in all error messages.
-func verifyBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, c [][]byte, callSite int) {
+// verifyNoChunksInBlob() tests that blob b[blobi] has no chunks in *cm, as
+// revealed by the ChunkStream interface.
+func verifyNoChunksInBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte) {
+ _, _, callerLine, _ := runtime.Caller(1)
+ cs := cm.NewChunkStream(ctx, b[blobi])
+ for i := 0; cs.Advance(); i++ {
+ t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: %v",
+ callerLine, blobi, i, cs.Value(nil))
+ }
+ if cs.Err() != nil {
+ t.Errorf("chunkmap_test: line %d: blob %d: ChunkStream.Advance: unexpected error %v",
+ callerLine, blobi, cs.Err())
+ }
+}
+
+// verifyChunksInBlob() tests that blob b[blobi] in *cm contains the expected
+// chunks from c[]. Each blob is expected to have 8 chunks, 0...7, except that
+// b[1] has c[8] instead of c[4] for chunk 4.
+func verifyChunksInBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, c [][]byte) {
+ _, _, callerLine, _ := runtime.Caller(1)
var err error
var i int
- bs := cm.NewBlobStream(ctx, b[blobi])
- for i = 0; bs.Advance(); i++ {
- chunk := bs.Value(nil)
+ cs := cm.NewChunkStream(ctx, b[blobi])
+ for i = 0; cs.Advance(); i++ {
+ chunk := cs.Value(nil)
chunki := i
if blobi == 1 && i == 4 { // In blob 1, c[4] is replaced by c[8]
chunki = 8
}
if bytes.Compare(c[chunki], chunk) != 0 {
- t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: got %v, expected %v",
- callSite, blobi, i, chunk, c[chunki])
+ t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: got %v, expected %v",
+ callerLine, blobi, i, chunk, c[chunki])
}
var loc chunkmap.Location
loc, err = cm.LookupChunk(ctx, chunk)
if err != nil {
- t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: LookupChunk got unexpected error: %v",
- callSite, blobi, i, err)
+ t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: LookupChunk got unexpected error: %v",
+ callerLine, blobi, i, err)
} else {
if i == 4 {
- if bytes.Compare(loc.Blob, b[blobi]) != 0 {
- t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Blob got %v, expected %v",
- callSite, blobi, i, loc.Blob, b[blobi])
+ if bytes.Compare(loc.BlobID, b[blobi]) != 0 {
+ t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.BlobID got %v, expected %v",
+ callerLine, blobi, i, loc.BlobID, b[blobi])
}
} else {
- if bytes.Compare(loc.Blob, b[0]) != 0 && bytes.Compare(loc.Blob, b[1]) != 0 {
- t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Blob got %v, expected %v",
- callSite, blobi, i, loc.Blob, b[blobi])
+ if bytes.Compare(loc.BlobID, b[0]) != 0 && bytes.Compare(loc.BlobID, b[1]) != 0 {
+ t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.BlobID got %v, expected %v",
+ callerLine, blobi, i, loc.BlobID, b[blobi])
}
}
if loc.Offset != int64(i) {
- t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Offset got %d, expected %d",
- callSite, blobi, i, loc.Offset, i)
+ t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.Offset got %d, expected %d",
+ callerLine, blobi, i, loc.Offset, i)
}
if loc.Size != 1 {
- t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Size got %d, expected 1",
- callSite, blobi, i, loc.Size)
+ t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.Size got %d, expected 1",
+ callerLine, blobi, i, loc.Size)
}
- loc2 := bs.Location()
- if bytes.Compare(loc.Blob, loc.Blob) != 0 || loc.Offset != loc2.Offset || loc.Size != loc2.Size {
- t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: disagreement about location: LookupChunk %v vs BlobStream %v",
- callSite, blobi, i, loc, loc2)
+ // The offsets and sizes will match, between the result
+ // from the stream and the result from LookupChunk(),
+ // because for all chunks written to both, they are
+ // written to the same places. However, the blob need
+ // not match, since LookupChunk() will return an
+ // arbitrary Location in the store that contains the
+ // chunk.
+ loc2 := cs.Location()
+ if loc.Offset != loc2.Offset || loc.Size != loc2.Size {
+ t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: disagreement about location: LookupChunk %v vs ChunkStream %v",
+ callerLine, blobi, i, loc, loc2)
}
}
}
- if bs.Err() != nil {
- t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Err() unepxected error %v",
- callSite, blobi, bs.Err())
+ if cs.Err() != nil {
+ t.Errorf("chunkmap_test: line %d: blob %d: ChunkStream.Err() unepxected error %v",
+ callerLine, blobi, cs.Err())
}
if i != 8 {
- t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Advance unexpectedly saw %d chunks, expected 8",
- callSite, blobi, i)
+ t.Errorf("chunkmap_test: line %d: blob %d: ChunkStream.Advance unexpectedly saw %d chunks, expected 8",
+ callerLine, blobi, i)
}
}
@@ -131,9 +178,10 @@
// Nine chunks: c[0 .. 8]
c := [][]byte{id(), id(), id(), id(), id(), id(), id(), id(), id()}
- // Verify that there are no chunks in blobs initially.
- verifyNoBlob(t, ctx, cm, 0, b, 0)
- verifyNoBlob(t, ctx, cm, 1, b, 1)
+ // Verify that there are no blobs, or chunks in blobs initially.
+ verifyBlobs(t, ctx, cm, nil)
+ verifyNoChunksInBlob(t, ctx, cm, 0, b)
+ verifyNoChunksInBlob(t, ctx, cm, 1, b)
// Verify that all chunks have no locations initially.
for chunki := range c {
@@ -152,7 +200,7 @@
chunki = 8
}
err = cm.AssociateChunkWithLocation(ctx, c[chunki],
- chunkmap.Location{Blob: b[blobi], Offset: int64(i), Size: 1})
+ chunkmap.Location{BlobID: b[blobi], Offset: int64(i), Size: 1})
if err != nil {
t.Errorf("chunkmap_test: blob %d: AssociateChunkWithLocation: unexpected error: %v",
blobi, err)
@@ -160,9 +208,10 @@
}
}
- // Verify that the blobs contain the chunks specified.
- verifyBlob(t, ctx, cm, 0, b, c, 2)
- verifyBlob(t, ctx, cm, 1, b, c, 3)
+ // Verify that the blobs are present, with the chunks specified.
+ verifyBlobs(t, ctx, cm, b)
+ verifyChunksInBlob(t, ctx, cm, 0, b, c)
+ verifyChunksInBlob(t, ctx, cm, 1, b, c)
// Verify that all chunks now have locations.
for chunki := range c {
@@ -197,8 +246,9 @@
}
// Verify that blob 0 is gone, but blob 1 remains.
- verifyNoBlob(t, ctx, cm, 0, b, 4)
- verifyBlob(t, ctx, cm, 1, b, c, 5)
+ verifyBlobs(t, ctx, cm, b[1:])
+ verifyNoChunksInBlob(t, ctx, cm, 0, b)
+ verifyChunksInBlob(t, ctx, cm, 1, b, c)
// Delete b[1].
err = cm.DeleteBlob(ctx, b[1])
@@ -207,9 +257,10 @@
err)
}
- // Verify that there are no chunks in blobs initially.
- verifyNoBlob(t, ctx, cm, 0, b, 6)
- verifyNoBlob(t, ctx, cm, 1, b, 7)
+ // Verify that there are no blobs, or chunks in blobs once more.
+ verifyBlobs(t, ctx, cm, nil)
+ verifyNoChunksInBlob(t, ctx, cm, 0, b)
+ verifyNoChunksInBlob(t, ctx, cm, 1, b)
// Verify that all chunks have no locations once more.
for chunki := range c {
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
index 43cc542..cb8bca8 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -8,13 +8,17 @@
package fs_cablobstore
// Internals:
-// The blobstore consists of a directory with "blob", "cas", and "tmp"
-// subdirectories.
+// Blobs are partitioned into two types of unit: "fragments" and "chunks".
+// A fragment is stored in a single file on disc. A chunk is a unit of network
+// transmission.
+//
+// The blobstore consists of a directory with "blob", "cas", "chunk", and
+// "tmp" subdirectories.
// - "tmp" is used for temporary files that are moved into place via
// link()/unlink() or rename(), depending on what's available.
// - "cas" contains files whose names are content hashes of the files being
// named. A few slashes are thrown into the name near the front so that no
-// single directory gets too large.
+// single directory gets too large. These files are called "fragments".
// - "blob" contains files whose names are random numbers. These names are
// visible externally as "blob names". Again, a few slashes are thrown
// into the name near the front so that no single directory gets too large.
@@ -26,13 +30,17 @@
// <offset> bytes into <cas-fragment>, which is in the "cas" subtree. The
// "f" line indicates that the blob is "finalized" and gives its complete
// md5 hash. No fragments may be appended to a finalized blob.
+// - "chunk" contains a store (currently implemented with leveldb) that
+// maps chunks of blobs to content hashes and vice versa.
import "bufio"
+import "bytes"
import "crypto/md5"
import "fmt"
import "hash"
import "io"
import "io/ioutil"
+import "math"
import "math/rand"
import "os"
import "path/filepath"
@@ -42,6 +50,8 @@
import "time"
import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap"
import "v.io/v23/context"
import "v.io/v23/verror"
@@ -62,35 +72,37 @@
errCantDeleteBlob = verror.Register(pkgPath+".errCantDeleteBlob", verror.NoRetry, "{1:}{2:} Can't delete blob {3}{:_}")
errBlobDeleted = verror.Register(pkgPath+".errBlobDeleted", verror.NoRetry, "{1:}{2:} Blob is deleted{:_}")
errSizeTooBigForFragment = verror.Register(pkgPath+".errSizeTooBigForFragment", verror.NoRetry, "{1:}{2:} writing blob {1}, size too big for fragment{:1}")
+ errStreamCancelled = verror.Register(pkgPath+".errStreamCancelled", verror.NoRetry, "{1:}{2:} Advance() called on cancelled stream{:_}")
)
// For the moment, we disallow others from accessing the tree where blobs are
-// stored. We could in the future relax this to 0711 or 0755.
+// stored. We could in the future relax this to 0711/0755, and 0644.
const dirPermissions = 0700
+const filePermissions = 0600
// Subdirectories of the blobstore's tree
const (
- blobDir = "blob" // Subdirectory where blobs are indexed by blob id.
- casDir = "cas" // Subdirectory where blobs are indexed by content hash.
- tmpDir = "tmp" // Subdirectory where temporary files are created.
+ blobDir = "blob" // Subdirectory where blobs are indexed by blob id.
+ casDir = "cas" // Subdirectory where fragments are indexed by content hash.
+ chunkDir = "chunk" // Subdirectory where chunks are indexed by content hash.
+ tmpDir = "tmp" // Subdirectory where temporary files are created.
)
// An FsCaBlobStore represents a simple, content-addressable store.
type FsCaBlobStore struct {
- rootName string // The name of the root of the store.
+ rootName string // The name of the root of the store.
+ cm *chunkmap.ChunkMap // Mapping from chunks to blob locations and vice versa.
- mu sync.Mutex // Protects fields below, plus
- // blobDesc.fragment, blobDesc.activeDescIndex,
- // and blobDesc.refCount.
+ // mu protects fields below, plus most fields in each blobDesc when used from a BlobWriter.
+ mu sync.Mutex
activeDesc []*blobDesc // The blob descriptors in use by active BlobReaders and BlobWriters.
toDelete []*map[string]bool // Sets of items that active GC threads are about to delete. (Pointers to maps, to allow pointer comparison.)
}
-// hashToFileName() returns the content-addressed name of the data with the
-// specified hash, given its content hash. Requires len(hash)==16. An md5
-// hash is suitable.
-func hashToFileName(hash []byte) string {
- return filepath.Join(casDir,
+// hashToFileName() returns the name of the binary ID with the specified
+// prefix. Requires len(id)==16. An md5 hash is suitable.
+func hashToFileName(prefix string, hash []byte) string {
+ return filepath.Join(prefix,
fmt.Sprintf("%02x", hash[0]),
fmt.Sprintf("%02x", hash[1]),
fmt.Sprintf("%02x", hash[2]),
@@ -101,6 +113,23 @@
hash[12], hash[13], hash[14], hash[15]))
}
+// fileNameToHash() converts a file name in the format generated by
+// hashToFileName(prefix, ...) to a vector of 16 bytes. If the string is
+// malformed, the nil slice is returned.
+func fileNameToHash(prefix string, s string) []byte {
+ idStr := strings.TrimPrefix(filepath.ToSlash(s), prefix+"/")
+ hash := make([]byte, 16, 16)
+ n, err := fmt.Sscanf(idStr, "%02x/%02x/%02x/%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+ &hash[0], &hash[1], &hash[2], &hash[3],
+ &hash[4], &hash[5], &hash[6], &hash[7],
+ &hash[8], &hash[9], &hash[10], &hash[11],
+ &hash[12], &hash[13], &hash[14], &hash[15])
+ if n != 16 || err != nil {
+ hash = nil
+ }
+ return hash
+}
+
// newBlobName() returns a new random name for a blob.
func newBlobName() string {
return filepath.Join(blobDir,
@@ -143,7 +172,7 @@
// Create() returns a pointer to an FsCaBlobStore stored in the file system at
// "rootName". If the directory rootName does not exist, it is created.
func Create(ctx *context.T, rootName string) (fscabs *FsCaBlobStore, err error) {
- dir := []string{tmpDir, casDir, blobDir}
+ dir := []string{tmpDir, casDir, chunkDir, blobDir}
for i := 0; i != len(dir) && err == nil; i++ {
fullName := filepath.Join(rootName, dir[i])
os.MkdirAll(fullName, dirPermissions)
@@ -153,9 +182,14 @@
err = verror.New(errNotADir, ctx, fullName)
}
}
+ var cm *chunkmap.ChunkMap
+ if err == nil {
+ cm, err = chunkmap.New(ctx, filepath.Join(rootName, chunkDir))
+ }
if err == nil {
fscabs = new(FsCaBlobStore)
fscabs.rootName = rootName
+ fscabs.cm = cm
}
return fscabs, err
}
@@ -169,12 +203,15 @@
func (fscabs *FsCaBlobStore) DeleteBlob(ctx *context.T, blobName string) (err error) {
// Disallow deletions of things outside the blob tree, or that may contain "..".
// For simplicity, the code currently disallows '.'.
- if !strings.HasPrefix(blobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
+ blobID := fileNameToHash(blobDir, blobName)
+ if blobID == nil || strings.IndexByte(blobName, '.') != -1 {
err = verror.New(errInvalidBlobName, ctx, blobName)
} else {
err = os.Remove(filepath.Join(fscabs.rootName, blobName))
if err != nil {
err = verror.New(errCantDeleteBlob, ctx, blobName, err)
+ } else {
+ err = fscabs.cm.DeleteBlob(ctx, blobID)
}
}
return err
@@ -250,7 +287,7 @@
// addFragment() ensures that the store *fscabs contains a fragment comprising
// the catenation of the byte vectors named by item[..].block and the contents
-// of the files named by item[..].filename. The block field is ignored if
+// of the files named by item[..].fileName. The block field is ignored if
// fileName!="". The fragment is not physically added if already present.
// The fragment is added to the fragment list of the descriptor *desc.
func (fscabs *FsCaBlobStore) addFragment(ctx *context.T, extHasher hash.Hash,
@@ -305,7 +342,7 @@
// Compute the hash, and form the file name in the respository.
hash := hasher.Sum(nil)
- relFileName := hashToFileName(hash)
+ relFileName := hashToFileName(casDir, hash)
absFileName := filepath.Join(fscabs.rootName, relFileName)
// Add the fragment's name to *desc's fragments so the garbage
@@ -316,7 +353,6 @@
size: size,
offset: 0,
fileName: relFileName})
- desc.size += size
fscabs.mu.Unlock()
// If the file does not already exist, ...
@@ -365,7 +401,11 @@
// Remove the entry added to fragment list above.
fscabs.mu.Lock()
desc.fragment = desc.fragment[0 : len(desc.fragment)-1]
- desc.size -= size
+ fscabs.mu.Unlock()
+ } else { // commit the change by updating the size
+ fscabs.mu.Lock()
+ desc.size += size
+ desc.cv.Broadcast() // Tell chunkmap BlobReader there's more to read.
fscabs.mu.Unlock()
}
@@ -385,16 +425,24 @@
activeDescIndex int // Index into fscabs.activeDesc if refCount>0; under fscabs.mu.
refCount int // Reference count; under fscabs.mu.
- name string // Name of the blob.
- fragment []blobFragment // All the fragements in this blob;
- // modified under fscabs.mu and in BlobWriter
- // owner's thread; may be read by GC under
- // fscabs.mu.
- size int64 // Total size of the blob.
- finalized bool // Whether the blob has been finalized.
+ name string // Name of the blob.
+
+ // The following fields are modified under fscabs.mu and in BlobWriter
+ // owner's thread; they may be read by GC (when obtained from
+ // fscabs.activeDesc) and the chunk writer under fscabs.mu. In the
+ // BlobWriter owner's thread, reading does not require a lock, but
+ // writing does. In other contexts (BlobReader, or a desc that has
+ // just been allocated by getBlob()), no locking is needed.
+
+ fragment []blobFragment // All the fragments in this blob
+ size int64 // Total size of the blob.
+ finalized bool // Whether the blob has been finalized.
// A finalized blob has a valid hash field, and no new bytes may be added
// to it. A well-formed hash has 16 bytes.
hash []byte
+
+ openWriter bool // Whether this descriptor is being written by an open BlobWriter.
+ cv *sync.Cond // signalled when a BlobWriter writes or closes.
}
// isBeingDeleted() returns whether fragment fragName is about to be deleted
@@ -456,7 +504,8 @@
// getBlob() returns the in-memory blob descriptor for the named blob.
func (fscabs *FsCaBlobStore) getBlob(ctx *context.T, blobName string) (desc *blobDesc, err error) {
- if !strings.HasPrefix(blobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
+ slashBlobName := filepath.ToSlash(blobName)
+ if !strings.HasPrefix(slashBlobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
err = verror.New(errInvalidBlobName, ctx, blobName)
} else {
absBlobName := filepath.Join(fscabs.rootName, blobName)
@@ -467,6 +516,7 @@
desc = new(blobDesc)
desc.activeDescIndex = -1
desc.name = blobName
+ desc.cv = sync.NewCond(&fscabs.mu)
scanner := bufio.NewScanner(fh)
for scanner.Scan() {
field := strings.Split(scanner.Text(), " ")
@@ -528,32 +578,49 @@
desc *blobDesc // Description of the blob being written.
f *file // The file being written.
hasher hash.Hash // Running hash of blob.
+
+ // Fields to allow the ChunkMap to be written.
+ csBr *BlobReader // Reader over the blob that's currently being written.
+ cs *chunker.Stream // Stream of chunks derived from csBr
+ csErr chan error // writeChunkMap() sends its result here; Close/CloseWithoutFinalize receives it.
}
-// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on a newly
-// created blob name, which can be found using the Name() method. BlobWriters
-// should not be used concurrently by multiple threads. The returned handle
-// should be closed with either the Close() or CloseWithoutFinalize() method to
-// avoid leaking file handles.
-func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (localblobstore.BlobWriter, error) {
+// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
+// a newly created blob. If "name" is non-empty, it is used to name
+// the blob, and it must be in the format of a name returned by this
+// interface (probably by another instance on another device).
+// Otherwise, a new name is created, which can be found using
+// the Name() method. It is an error to attempt to overwrite a blob
+// that already exists in this blob store. BlobWriters should not be
+// used concurrently by multiple threads. The returned handle should
+// be closed with either the Close() or CloseWithoutFinalize() method
+// to avoid leaking file handles.
+func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T, name string) (localblobstore.BlobWriter, error) {
var bw *BlobWriter
- newName := newBlobName()
- fileName := filepath.Join(fscabs.rootName, newName)
+ if name == "" {
+ name = newBlobName()
+ }
+ fileName := filepath.Join(fscabs.rootName, name)
os.MkdirAll(filepath.Dir(fileName), dirPermissions)
- f, err := newFile(os.Create(fileName))
+ f, err := newFile(os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, filePermissions))
if err == nil {
bw = new(BlobWriter)
bw.fscabs = fscabs
bw.ctx = ctx
bw.desc = new(blobDesc)
bw.desc.activeDescIndex = -1
- bw.desc.name = newName
+ bw.desc.name = name
+ bw.desc.cv = sync.NewCond(&fscabs.mu)
+ bw.desc.openWriter = true
bw.f = f
bw.hasher = md5.New()
if !fscabs.descRef(bw.desc) {
// Can't happen; descriptor refers to no fragments.
panic(verror.New(errBlobDeleted, ctx, bw.desc.name))
}
+ // Write the chunks of this blob into the ChunkMap, as they are
+ // written by this writer.
+ bw.forkWriteChunkMap()
}
return bw, err
}
@@ -562,14 +629,17 @@
// old, but unfinalized blob name.
func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (localblobstore.BlobWriter, error) {
var err error
- bw := new(BlobWriter)
- bw.fscabs = fscabs
- bw.ctx = ctx
- bw.desc, err = bw.fscabs.getBlob(ctx, blobName)
- if err == nil && bw.desc.finalized {
+ var bw *BlobWriter
+ var desc *blobDesc
+ desc, err = fscabs.getBlob(ctx, blobName)
+ if err == nil && desc.finalized {
err = verror.New(errBlobAlreadyFinalized, ctx, blobName)
- bw = nil
- } else {
+ } else if err == nil {
+ bw = new(BlobWriter)
+ bw.fscabs = fscabs
+ bw.ctx = ctx
+ bw.desc = desc
+ bw.desc.openWriter = true
fileName := filepath.Join(fscabs.rootName, bw.desc.name)
bw.f, err = newFile(os.OpenFile(fileName, os.O_WRONLY|os.O_APPEND, 0666))
bw.hasher = md5.New()
@@ -581,7 +651,7 @@
// non-zero.
panic(verror.New(errBlobDeleted, ctx, fileName))
}
- br := fscabs.blobReaderFromDesc(ctx, bw.desc)
+ br := fscabs.blobReaderFromDesc(ctx, bw.desc, dontWaitForWriter)
buf := make([]byte, 8192, 8192)
for err == nil {
var n int
@@ -592,10 +662,89 @@
if err == io.EOF { // EOF is expected.
err = nil
}
+ if err == nil {
+ // Write the chunks of this blob into the ChunkMap, as
+ // they are written by this writer.
+ bw.forkWriteChunkMap()
+ }
}
return bw, err
}
+// forkWriteChunkMap() creates a new thread to run writeChunkMap(). It adds
+// the chunks written to *bw to the blob store's ChunkMap. The caller is
+// expected to call joinWriteChunkMap() at some later point.
+func (bw *BlobWriter) forkWriteChunkMap() {
+ // The descRef's ref count is incremented here to compensate
+ // for the decrement it will receive in br.Close() in joinWriteChunkMap.
+ if !bw.fscabs.descRef(bw.desc) {
+ // Can't happen; descriptor's ref count was already non-zero.
+ panic(verror.New(errBlobDeleted, bw.ctx, bw.desc.name))
+ }
+ bw.csBr = bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc, waitForWriter)
+ bw.cs = chunker.NewStream(bw.ctx, &chunker.DefaultParam, bw.csBr)
+ bw.csErr = make(chan error)
+ go bw.writeChunkMap()
+}
+
+// insertChunk() inserts chunk into the blob store's ChunkMap, associating it
+// with the specified byte offset in the blob blobID being written by *bw. The byte
+// offset of the next chunk is returned.
+func (bw *BlobWriter) insertChunk(blobID []byte, chunkHash []byte, offset int64, size int64) (int64, error) {
+ err := bw.fscabs.cm.AssociateChunkWithLocation(bw.ctx, chunkHash[:],
+ chunkmap.Location{BlobID: blobID, Offset: offset, Size: size})
+ if err != nil {
+ bw.cs.Cancel()
+ }
+ return offset + size, err
+}
+
+// writeChunkMap() iterates over the chunk in stream bw.cs, and associates each
+// one with the blob being written.
+func (bw *BlobWriter) writeChunkMap() {
+ var err error
+ var offset int64
+ blobID := fileNameToHash(blobDir, bw.desc.name)
+ // Associate each chunk only after the next chunk has been seen (or
+ // the blob finalized), to avoid recording an artificially short chunk
+ // at the end of a partial transfer.
+ var chunkHash [md5.Size]byte
+ var chunkLen int64
+ if bw.cs.Advance() {
+ chunk := bw.cs.Value()
+ // Record the hash and size, since chunk's underlying buffer
+ // may be reused by the next call to Advance().
+ chunkHash = md5.Sum(chunk)
+ chunkLen = int64(len(chunk))
+ for bw.cs.Advance() {
+ offset, err = bw.insertChunk(blobID, chunkHash[:], offset, chunkLen)
+ chunk = bw.cs.Value()
+ chunkHash = md5.Sum(chunk)
+ chunkLen = int64(len(chunk))
+ }
+ }
+ if err == nil {
+ err = bw.cs.Err()
+ }
+ bw.fscabs.mu.Lock()
+ if err == nil && chunkLen != 0 && bw.desc.finalized {
+ offset, err = bw.insertChunk(blobID, chunkHash[:], offset, chunkLen)
+ }
+ bw.fscabs.mu.Unlock()
+ bw.csErr <- err // wake joinWriteChunkMap()
+}
+
+// joinWriteChunkMap waits for the completion of the thread forked by forkWriteChunkMap().
+// It returns when the chunks in the blob have been written to the blob store's ChunkMap.
+func (bw *BlobWriter) joinWriteChunkMap(err error) error {
+ err2 := <-bw.csErr // read error from end of writeChunkMap()
+ if err == nil {
+ err = err2
+ }
+ bw.csBr.Close()
+ return err
+}
+
// Close() finalizes *bw, and indicates that the client will perform no further
// append operations on *bw. Any internal open file handles are closed.
func (bw *BlobWriter) Close() (err error) {
@@ -608,7 +757,12 @@
_, err = fmt.Fprintf(bw.f.writer, "f %s\n", hashToString(h)) // finalize
_, err = bw.f.close(bw.ctx, err)
bw.f = nil
+ bw.fscabs.mu.Lock()
bw.desc.finalized = true
+ bw.desc.openWriter = false
+ bw.desc.cv.Broadcast() // Tell chunkmap BlobReader that writing has ceased.
+ bw.fscabs.mu.Unlock()
+ err = bw.joinWriteChunkMap(err)
bw.fscabs.descUnref(bw.desc)
}
return err
@@ -622,8 +776,13 @@
if bw.f == nil {
err = verror.New(errAlreadyClosed, bw.ctx, bw.desc.name)
} else {
+ bw.fscabs.mu.Lock()
+ bw.desc.openWriter = false
+ bw.desc.cv.Broadcast() // Tell chunkmap BlobReader that writing has ceased.
+ bw.fscabs.mu.Unlock()
_, err = bw.f.close(bw.ctx, err)
bw.f = nil
+ err = bw.joinWriteChunkMap(err)
bw.fscabs.descUnref(bw.desc)
}
return err
@@ -688,6 +847,7 @@
offset: offset + desc.fragment[i].offset,
fileName: desc.fragment[i].fileName})
bw.desc.size += consume
+ bw.desc.cv.Broadcast() // Tell chunkmap BlobReader there's more to read.
bw.fscabs.mu.Unlock()
}
offset = 0
@@ -701,7 +861,7 @@
// non-zero.
panic(verror.New(errBlobDeleted, bw.ctx, blobName))
}
- br := bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc)
+ br := bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc, dontWaitForWriter)
if err == nil {
_, err = br.Seek(origSize, 0)
}
@@ -752,7 +912,8 @@
fscabs *FsCaBlobStore
ctx *context.T
- desc *blobDesc // A description of the blob being read.
+ desc *blobDesc // A description of the blob being read.
+ waitForWriter bool // whether this reader should wait for a concurrent BlobWriter
pos int64 // The next position we will read from (used by Read/Seek, not ReadAt).
@@ -761,14 +922,23 @@
fh *os.File // non-nil iff fragmentIndex != -1.
}
+// constants to make the calls to blobReaderFromDesc invocations more readable
+const (
+ dontWaitForWriter = false
+ waitForWriter = true
+)
+
// blobReaderFromDesc() returns a pointer to a newly allocated BlobReader given
-// a pre-existing blobDesc.
-func (fscabs *FsCaBlobStore) blobReaderFromDesc(ctx *context.T, desc *blobDesc) *BlobReader {
+// a pre-existing blobDesc. If waitForWriter is true, the reader will wait for
+// any BlobWriter to finish writing the part of the blob the reader is trying
+// to read.
+func (fscabs *FsCaBlobStore) blobReaderFromDesc(ctx *context.T, desc *blobDesc, waitForWriter bool) *BlobReader {
br := new(BlobReader)
br.fscabs = fscabs
br.ctx = ctx
br.fragmentIndex = -1
br.desc = desc
+ br.waitForWriter = waitForWriter
return br
}
@@ -779,7 +949,7 @@
var desc *blobDesc
desc, err = fscabs.getBlob(ctx, blobName)
if err == nil {
- br = fscabs.blobReaderFromDesc(ctx, desc)
+ br = fscabs.blobReaderFromDesc(ctx, desc, dontWaitForWriter)
}
return br, err
}
@@ -821,24 +991,41 @@
return lo
}
+// waitUntilAvailable() waits until position pos within *br is available for
+// reading, if this reader is waiting for writers. This may be because:
+// - *br is on an already written blob.
+// - *br is on a blob being written that has been closed, or whose writes have
+// passed position pos.
+// The value pos==math.MaxInt64 can be used to mean "until the writer is closed".
+// Requires br.fscabs.mu held.
+func (br *BlobReader) waitUntilAvailable(pos int64) {
+ for br.waitForWriter && br.desc.openWriter && br.desc.size < pos {
+ br.desc.cv.Wait()
+ }
+}
+
// ReadAt() fills b[] with up to len(b) bytes of data starting at position "at"
// within the blob that *br indicates, and returns the number of bytes read.
func (br *BlobReader) ReadAt(b []byte, at int64) (n int, err error) {
+ br.fscabs.mu.Lock()
+ br.waitUntilAvailable(at + int64(len(b)))
i := findFragment(br.desc.fragment, at)
if i < len(br.desc.fragment) && at <= br.desc.size {
+ fragmenti := br.desc.fragment[i] // copy fragment data to allow releasing lock
+ br.fscabs.mu.Unlock()
if i != br.fragmentIndex {
br.closeInternal()
}
if br.fragmentIndex == -1 {
- br.fh, err = os.Open(filepath.Join(br.fscabs.rootName, br.desc.fragment[i].fileName))
+ br.fh, err = os.Open(filepath.Join(br.fscabs.rootName, fragmenti.fileName))
if err == nil {
br.fragmentIndex = i
} else {
br.closeInternal()
}
}
- var offset int64 = at - br.desc.fragment[i].pos + br.desc.fragment[i].offset
- consume := br.desc.fragment[i].size - (at - br.desc.fragment[i].pos)
+ var offset int64 = at - fragmenti.pos + fragmenti.offset
+ consume := fragmenti.size - (at - fragmenti.pos)
if int64(len(b)) < consume {
consume = int64(len(b))
}
@@ -847,10 +1034,11 @@
} else if err == nil {
panic("failed to open blob fragment")
}
+ br.fscabs.mu.Lock()
// Return io.EOF if the Read reached the end of the last
// fragment, but not if it's merely the end of some interior
- // fragment.
- if int64(n)+at >= br.desc.size {
+ // fragment or the blob is still being extended.
+ if int64(n)+at >= br.desc.size && !(br.waitForWriter && br.desc.openWriter) {
if err == nil {
err = io.EOF
}
@@ -860,6 +1048,7 @@
} else {
err = verror.New(errIllegalPositionForRead, br.ctx, br.pos, br.desc.size)
}
+ br.fscabs.mu.Unlock()
return n, err
}
@@ -879,11 +1068,13 @@
// offset+current_seek_position if whence==1, and offset+end_of_blob if
// whence==2, and then returns the current seek position.
func (br *BlobReader) Seek(offset int64, whence int) (result int64, err error) {
+ br.fscabs.mu.Lock()
if whence == 0 {
result = offset
} else if whence == 1 {
result = offset + br.pos
} else if whence == 2 {
+ br.waitUntilAvailable(math.MaxInt64)
result = offset + br.desc.size
} else {
err = verror.New(errBadSeekWhence, br.ctx, whence)
@@ -898,17 +1089,26 @@
} else if err == nil {
br.pos = result
}
+ br.fscabs.mu.Unlock()
return result, err
}
// IsFinalized() returns whether *br has been finalized.
func (br *BlobReader) IsFinalized() bool {
- return br.desc.finalized
+ br.fscabs.mu.Lock()
+ br.waitUntilAvailable(math.MaxInt64)
+ finalized := br.desc.finalized
+ br.fscabs.mu.Unlock()
+ return finalized
}
// Size() returns *br's size.
func (br *BlobReader) Size() int64 {
- return br.desc.size
+ br.fscabs.mu.Lock()
+ br.waitUntilAvailable(math.MaxInt64)
+ size := br.desc.size
+ br.fscabs.mu.Unlock()
+ return size
}
// Name() returns *br's name.
@@ -918,7 +1118,11 @@
// Hash() returns *br's hash. It may be nil if the blob is not finalized.
func (br *BlobReader) Hash() []byte {
- return br.desc.hash
+ br.fscabs.mu.Lock()
+ br.waitUntilAvailable(math.MaxInt64)
+ hash := br.desc.hash
+ br.fscabs.mu.Unlock()
+ return hash
}
// -----------------------------------------------------------
@@ -936,6 +1140,10 @@
fscabs *FsCaBlobStore // The parent FsCaBlobStore.
err error // If non-nil, the error that terminated iteration.
stack []dirListing // The stack of dirListings leading to the current entry.
+ ctx *context.T // context passed to ListBlobIds() or ListCAIds()
+
+ mu sync.Mutex // Protects cancelled.
+ cancelled bool // Whether Cancel() has been called.
}
// ListBlobIds() returns an iterator that can be used to enumerate the blobs in
@@ -947,10 +1155,10 @@
// if fscabsi.Err() != nil {
// // The loop terminated early due to an error.
// }
-func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Iter {
+func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Stream {
stack := make([]dirListing, 1)
stack[0] = dirListing{pos: -1, nameList: []string{blobDir}}
- return &FsCasIter{fscabs: fscabs, stack: stack}
+ return &FsCasIter{fscabs: fscabs, stack: stack, ctx: ctx}
}
// ListCAIds() returns an iterator that can be used to enumerate the
@@ -963,10 +1171,18 @@
// if fscabsi.Err() != nil {
// // The loop terminated early due to an error.
// }
-func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Iter {
+func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Stream {
stack := make([]dirListing, 1)
stack[0] = dirListing{pos: -1, nameList: []string{casDir}}
- return &FsCasIter{fscabs: fscabs, stack: stack}
+ return &FsCasIter{fscabs: fscabs, stack: stack, ctx: ctx}
+}
+
+// isCancelled() returns whether Cancel() has been called.
+func (fscabsi *FsCasIter) isCancelled() bool {
+ fscabsi.mu.Lock()
+ cancelled := fscabsi.cancelled
+ fscabsi.mu.Unlock()
+ return cancelled
}
// Advance() stages an item so that it may be retrieved via Value. Returns
@@ -976,7 +1192,7 @@
stack := fscabsi.stack
err := fscabsi.err
- for err == nil && !advanced && len(stack) != 0 {
+ for err == nil && !advanced && len(stack) != 0 && !fscabsi.isCancelled() {
last := len(stack) - 1
stack[last].pos++
if stack[last].pos == len(stack[last].nameList) {
@@ -1005,6 +1221,13 @@
}
}
+ if fscabsi.isCancelled() {
+ if err == nil {
+ fscabsi.err = verror.New(errStreamCancelled, fscabsi.ctx)
+ }
+ advanced = false
+ }
+
fscabsi.err = err
return advanced
}
@@ -1027,6 +1250,163 @@
return fscabsi.err
}
+// Cancel() indicates that the iteration stream should terminate early.
+// Never blocks. May be called concurrently with other methods on fscabsi.
+func (fscabsi *FsCasIter) Cancel() {
+ fscabsi.mu.Lock()
+ fscabsi.cancelled = true
+ fscabsi.mu.Unlock()
+}
+
+// -----------------------------------------------------------
+
+// An errorChunkStream is a localblobstore.ChunkStream that yields an error.
+type errorChunkStream struct {
+ err error
+}
+
+func (*errorChunkStream) Advance() bool { return false }
+func (*errorChunkStream) Value([]byte) []byte { return nil }
+func (ecs *errorChunkStream) Err() error { return ecs.err }
+func (*errorChunkStream) Cancel() {}
+
+// BlobChunkStream() returns a ChunkStream that can be used to read the ordered
+// list of content hashes of chunks in blob blobName. It is expected that this
+// list will be presented to RecipeFromChunks() on another device, to create a
+// recipe for transmitting the blob efficiently to that other device.
+func (fscabs *FsCaBlobStore) BlobChunkStream(ctx *context.T, blobName string) (cs localblobstore.ChunkStream) {
+ blobID := fileNameToHash(blobDir, blobName)
+ if blobID == nil {
+ cs = &errorChunkStream{err: verror.New(errInvalidBlobName, ctx, blobName)}
+ } else {
+ cs = fscabs.cm.NewChunkStream(ctx, blobID)
+ }
+ return cs
+}
+
+// -----------------------------------------------------------
+
+// LookupChunk returns the location of a chunk with the specified chunk hash
+// within the store.
+func (fscabs *FsCaBlobStore) LookupChunk(ctx *context.T, chunkHash []byte) (loc localblobstore.Location, err error) {
+ var chunkMapLoc chunkmap.Location
+ chunkMapLoc, err = fscabs.cm.LookupChunk(ctx, chunkHash)
+ if err == nil {
+ loc.BlobName = hashToFileName(blobDir, chunkMapLoc.BlobID)
+ loc.Size = chunkMapLoc.Size
+ loc.Offset = chunkMapLoc.Offset
+ }
+ return loc, err
+}
+
+// -----------------------------------------------------------
+
+// A RecipeStream implements localblobstore.RecipeStream. It allows the client
+// to iterate over the recipe steps to recreate a blob identified by a stream
+// of chunk hashes (from chunkStream), but using parts of blobs in the current
+// blob store where possible.
+type RecipeStream struct {
+ fscabs *FsCaBlobStore
+ ctx *context.T
+
+ chunkStream localblobstore.ChunkStream // the stream of chunks in the blob
+ pendingChunkBuf [16]byte // a buffer for pendingChunk
+ pendingChunk []byte // the last unprocessed chunk hash read chunkStream, or nil if none
+ step localblobstore.RecipeStep // the recipe step to be returned by Value()
+ mu sync.Mutex // protects cancelled
+ cancelled bool // whether Cancel() has been called
+}
+
+// RecipeStreamFromChunkStream() returns a pointer to a RecipeStream that allows
+// the client to iterate over each RecipeStep needed to create the blob formed
+// by the chunks in chunkStream.
+func (fscabs *FsCaBlobStore) RecipeStreamFromChunkStream(ctx *context.T, chunkStream localblobstore.ChunkStream) localblobstore.RecipeStream {
+ rs := new(RecipeStream)
+ rs.fscabs = fscabs
+ rs.ctx = ctx
+ rs.chunkStream = chunkStream
+ return rs
+}
+
+// isCancelled() returns whether rs.Cancel() has been called.
+func (rs *RecipeStream) isCancelled() bool {
+ rs.mu.Lock()
+ cancelled := rs.cancelled
+ rs.mu.Unlock()
+ return cancelled
+}
+
+// Advance() stages an item so that it may be retrieved via Value().
+// Returns true iff there is an item to retrieve. Advance() must be
+// called before Value() is called. The caller is expected to read
+// until Advance() returns false, or to call Cancel().
+func (rs *RecipeStream) Advance() (ok bool) {
+ if rs.pendingChunk == nil && rs.chunkStream.Advance() {
+ rs.pendingChunk = rs.chunkStream.Value(rs.pendingChunkBuf[:])
+ }
+ for !ok && rs.pendingChunk != nil && !rs.isCancelled() {
+ var err error
+ var loc0 chunkmap.Location
+ loc0, err = rs.fscabs.cm.LookupChunk(rs.ctx, rs.pendingChunk)
+ if err == nil {
+ blobName := hashToFileName(blobDir, loc0.BlobID)
+ var blobDesc *blobDesc
+ if blobDesc, err = rs.fscabs.getBlob(rs.ctx, blobName); err != nil {
+ // The ChunkMap contained a reference to a
+ // deleted blob. Delete the reference in the
+ // ChunkMap; the next loop iteration will
+ // consider the chunk again.
+ rs.fscabs.cm.DeleteBlob(rs.ctx, loc0.BlobID)
+ } else {
+ rs.fscabs.descUnref(blobDesc)
+ // The chunk is in a known blob. Combine
+ // contiguous chunks into a single recipe
+ // entry.
+ rs.pendingChunk = nil // consumed
+ for rs.pendingChunk == nil && rs.chunkStream.Advance() {
+ rs.pendingChunk = rs.chunkStream.Value(rs.pendingChunkBuf[:])
+ var loc chunkmap.Location
+ loc, err = rs.fscabs.cm.LookupChunk(rs.ctx, rs.pendingChunk)
+ if err == nil && bytes.Compare(loc0.BlobID, loc.BlobID) == 0 && loc.Offset == loc0.Offset+loc0.Size {
+ loc0.Size += loc.Size
+ rs.pendingChunk = nil // consumed
+ }
+ }
+ rs.step = localblobstore.RecipeStep{Blob: blobName, Offset: loc0.Offset, Size: loc0.Size}
+ ok = true
+ }
+ } else { // The chunk is not in the ChunkMap; yield a single chunk hash.
+ rs.step = localblobstore.RecipeStep{Chunk: rs.pendingChunk}
+ rs.pendingChunk = nil // consumed
+ ok = true
+ }
+ }
+ return ok && !rs.isCancelled()
+}
+
+// Value() returns the item that was staged by Advance(). May panic if
+// Advance() returned false or was not called. Never blocks.
+func (rs *RecipeStream) Value() localblobstore.RecipeStep {
+ return rs.step
+}
+
+// Err() returns any error encountered by Advance. Never blocks.
+func (rs *RecipeStream) Err() error {
+ // There are no errors to return here. The errors encountered in
+ // Advance() are expected and recoverable.
+ return nil
+}
+
+// Cancel() indicates that the client wishes to cease reading from the stream.
+// It causes the next call to Advance() to return false. Never blocks.
+// It may be called concurrently with other calls on the stream.
+func (rs *RecipeStream) Cancel() {
+ rs.mu.Lock()
+ rs.cancelled = true
+ rs.mu.Unlock()
+ rs.chunkStream.Cancel()
+}
+
// -----------------------------------------------------------
// gcTemp() attempts to delete files in dirName older than threshold.
@@ -1057,16 +1437,46 @@
for caIter.Advance() {
caSet[caIter.Value()] = true
}
+ err = caIter.Err()
- // Remove from caSet all fragments referenced by extant blobs.
- blobIter := fscabs.ListBlobIds(ctx)
- for blobIter.Advance() {
- var blobDesc *blobDesc
- if blobDesc, err = fscabs.getBlob(ctx, blobIter.Value()); err == nil {
- for i := range blobDesc.fragment {
- delete(caSet, blobDesc.fragment[i].fileName)
+ // cmBlobs maps the names of blobs found in the ChunkMap to their IDs.
+ // (The IDs can be derived from the names; the map is really being used
+ // to record which blobs exist, and the value merely avoids repeated
+ // conversions.)
+ cmBlobs := make(map[string][]byte)
+ if err == nil {
+ // Record all the blobs known to the ChunkMap;
+ bs := fscabs.cm.NewBlobStream(ctx)
+ for bs.Advance() {
+ blobID := bs.Value(nil)
+ cmBlobs[hashToFileName(blobDir, blobID)] = blobID
+ }
+ }
+
+ if err == nil {
+ // Remove from cmBlobs all extant blobs, and remove from
+ // caSet all their fragments.
+ blobIter := fscabs.ListBlobIds(ctx)
+ for blobIter.Advance() {
+ var blobDesc *blobDesc
+ if blobDesc, err = fscabs.getBlob(ctx, blobIter.Value()); err == nil {
+ delete(cmBlobs, blobDesc.name)
+ for i := range blobDesc.fragment {
+ delete(caSet, blobDesc.fragment[i].fileName)
+ }
+ fscabs.descUnref(blobDesc)
}
- fscabs.descUnref(blobDesc)
+ }
+ }
+
+ if err == nil {
+ // Remove all blobs still mentioned in cmBlobs from the ChunkMap;
+ // these are the ones that no longer exist in the blobs directory.
+ for _, blobID := range cmBlobs {
+ err = fscabs.cm.DeleteBlob(ctx, blobID)
+ if err != nil {
+ break
+ }
}
}
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
index 886f5c4..75bc54e 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
@@ -41,3 +41,31 @@
// Test it.
localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
}
+
+// This test case tests the incremental transfer of blobs via chunks.
+func TestWritingViaChunks(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ var err error
+
+ // Make a pair of blobstores, each in its own temporary directory.
+ const nBlobStores = 2
+ var testDirName [nBlobStores]string
+ var bs [nBlobStores]localblobstore.BlobStore
+ for i := 0; i != nBlobStores; i++ {
+ testDirName[i], err = ioutil.TempDir("", "localblobstore_test")
+ if err != nil {
+ t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+ }
+ defer os.RemoveAll(testDirName[i])
+
+ bs[i], err = fs_cablobstore.Create(ctx, testDirName[i])
+ if err != nil {
+ t.Fatalf("fs_cablobstore.Create failed: %v", err)
+ }
+ }
+
+ // Test it.
+ localblobstore_testlib.WriteViaChunks(t, ctx, bs)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_test.go b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
index 973a9c8..49beec8 100644
--- a/x/ref/services/syncbase/localblobstore/localblobstore_test.go
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
@@ -41,3 +41,31 @@
// Test it.
localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
}
+
+// This test case tests the incremental transfer of blobs via chunks.
+func TestWritingViaChunks(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ var err error
+
+ // Make a pair of blobstores, each in its own temporary directory.
+ const nBlobStores = 2
+ var testDirName [nBlobStores]string
+ var bs [nBlobStores]localblobstore.BlobStore
+ for i := 0; i != nBlobStores; i++ {
+ testDirName[i], err = ioutil.TempDir("", "localblobstore_test")
+ if err != nil {
+ t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+ }
+ defer os.RemoveAll(testDirName[i])
+
+ bs[i], err = fs_cablobstore.Create(ctx, testDirName[i])
+ if err != nil {
+ t.Fatalf("fs_cablobstore.Create failed: %v", err)
+ }
+ }
+
+ // Test it.
+ localblobstore_testlib.WriteViaChunks(t, ctx, bs)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
index 662e964..481bea1 100644
--- a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
@@ -14,6 +14,7 @@
import "testing"
import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
import "v.io/v23/context"
import "v.io/v23/verror"
@@ -59,7 +60,7 @@
content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
var bw localblobstore.BlobWriter
var err error
- bw, err = bs.NewBlobWriter(ctx)
+ bw, err = bs.NewBlobWriter(ctx, "")
if err != nil {
t.Errorf("localblobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
}
@@ -546,3 +547,276 @@
checkAllBlobs(t, ctx, bs, blobVector, testDirName)
checkFragments(t, ctx, bs, fragmentMap, testDirName)
}
+
+// writeBlobFromReader() writes the contents of rd to blobstore bs, as blob
+// "name", or picks a name name if "name" is empty. It returns the name of the
+// blob. Errors cause the test to terminate. Error messages contain the
+// "callSite" value to allow the test to tell which call site is which.
+func writeBlobFromReader(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, name string, rd io.Reader, callSite int) string {
+ var err error
+ var bw localblobstore.BlobWriter
+ if bw, err = bs.NewBlobWriter(ctx, name); err != nil {
+ t.Fatalf("callSite %d: NewBlobWriter failed: %v", callSite, err)
+ }
+ blobName := bw.Name()
+ buf := make([]byte, 8192) // buffer for data read from rd.
+ for i := 0; err == nil; i++ {
+ var n int
+ if n, err = rd.Read(buf); err != nil && err != io.EOF {
+ t.Fatalf("callSite %d: unexpected error from reader: %v", callSite, err)
+ }
+ if n > 0 {
+ if err = bw.AppendFragment(localblobstore.BlockOrFile{Block: buf[:n]}); err != nil {
+ t.Fatalf("callSite %d: BlobWriter.AppendFragment failed: %v", callSite, err)
+ }
+ // Every so often, close without finalizing, and reopen.
+ if (i % 7) == 0 {
+ if err = bw.CloseWithoutFinalize(); err != nil {
+ t.Fatalf("callSite %d: BlobWriter.CloseWithoutFinalize failed: %v", callSite, err)
+ }
+ if bw, err = bs.ResumeBlobWriter(ctx, blobName); err != nil {
+ t.Fatalf("callSite %d: ResumeBlobWriter %q failed: %v", callSite, blobName, err)
+ }
+ }
+ }
+ }
+ if err = bw.Close(); err != nil {
+ t.Fatalf("callSite %d: BlobWriter.Close failed: %v", callSite, err)
+ }
+ return blobName
+}
+
+// checkBlobAgainstReader() verifies that the blob blobName has the same bytes as the reader rd.
+// Errors cause the test to terminate. Error messages contain the
+// "callSite" value to allow the test to tell which call site is which.
+func checkBlobAgainstReader(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobName string, rd io.Reader, callSite int) {
+ // Open a reader on the blob.
+ var blob_rd io.Reader
+ var blob_err error
+ if blob_rd, blob_err = bs.NewBlobReader(ctx, blobName); blob_err != nil {
+ t.Fatalf("callSite %d: NewBlobReader on %q failed: %v", callSite, blobName, blob_err)
+ }
+
+ // Variables for reading the two streams, indexed by "reader" and "blob".
+ type stream struct {
+ name string
+ rd io.Reader // Reader for this stream
+ buf []byte // buffer for data
+ i int // bytes processed within current buffer
+ n int // valid bytes in current buffer
+ err error // error, or nil
+ }
+
+ s := [2]stream{
+ {name: "reader", rd: rd, buf: make([]byte, 8192)},
+ {name: blobName, rd: blob_rd, buf: make([]byte, 8192)},
+ }
+
+ // Descriptive names for the two elements of s, when we aren't treating them the same.
+ reader := &s[0]
+ blob := &s[1]
+
+ var pos int // position within file, for error reporting.
+
+ for x := 0; x != 2; x++ {
+ s[x].n, s[x].err = s[x].rd.Read(s[x].buf)
+ s[x].i = 0
+ }
+ for blob.n != 0 && reader.n != 0 {
+ for reader.i != reader.n && blob.i != blob.n && reader.buf[reader.i] == blob.buf[blob.i] {
+ pos++
+ blob.i++
+ reader.i++
+ }
+ if reader.i != reader.n && blob.i != blob.n {
+ t.Fatalf("callSite %d: BlobStore %q: BlobReader on blob %q and rd reader generated different bytes at position %d: 0x%x vs 0x%x",
+ callSite, bs.Root(), blobName, pos, reader.buf[reader.i], blob.buf[blob.i])
+ }
+ for x := 0; x != 2; x++ { // read more data from each reader, if needed
+ if s[x].i == s[x].n {
+ s[x].i = 0
+ s[x].n = 0
+ if s[x].err == nil {
+ s[x].n, s[x].err = s[x].rd.Read(s[x].buf)
+ }
+ }
+ }
+ }
+ for x := 0; x != 2; x++ {
+ if s[x].err != io.EOF {
+ t.Fatalf("callSite %d: %s got error %v", callSite, s[x].name, s[x].err)
+ }
+ if s[x].n != 0 {
+ t.Fatalf("callSite %d: %s is longer than %s", callSite, s[x].name, s[1-x].name)
+ }
+ }
+}
+
+// checkBlobAgainstReader() verifies that the blob blobName has the same chunks
+// (according to BlobChunkStream) as a chunker applied to the reader rd.
+// Errors cause the test to terminate. Error messages contain the
+// "callSite" value to allow the test to tell which call site is which.
+func checkBlobChunksAgainstReader(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobName string, rd io.Reader, callSite int) {
+ buf := make([]byte, 8192) // buffer used to hold data from the chunk stream from rd.
+ rawChunks := chunker.NewStream(ctx, &chunker.DefaultParam, rd)
+ cs := bs.BlobChunkStream(ctx, blobName)
+ pos := 0 // byte position within the blob, to be retported in error messages
+ i := 0 // chunk index, to be reported in error messages
+ rawMore, more := rawChunks.Advance(), cs.Advance()
+ for rawMore && more {
+ c := rawChunks.Value()
+ rawChunk := md5.Sum(rawChunks.Value())
+ chunk := cs.Value(buf)
+ if bytes.Compare(rawChunk[:], chunk) != 0 {
+ t.Errorf("raw random stream and chunk record for blob %q have different chunk %d:\n\t%v\nvs\n\t%v\n\tpos %d\n\tlen %d\n\tc %v",
+ blobName, i, rawChunk, chunk, pos, len(c), c)
+ }
+ pos += len(c)
+ i++
+ rawMore, more = rawChunks.Advance(), cs.Advance()
+ }
+ if rawMore {
+ t.Fatalf("callSite %d: blob %q has fewer chunks than raw stream", callSite, blobName)
+ }
+ if more {
+ t.Fatalf("callSite %d: blob %q has more chunks than raw stream", callSite, blobName)
+ }
+ if rawChunks.Err() != nil {
+ t.Fatalf("callSite %d: error reading raw chunk stream: %v", callSite, rawChunks.Err())
+ }
+ if cs.Err() != nil {
+ t.Fatalf("callSite %d: error reading chunk stream for blob %q; %v", callSite, blobName, cs.Err())
+ }
+}
+
+// WriteViaChunks() tests that a large blob in one blob store can be transmitted
+// to another incrementally, without transferring chunks already in the other blob store.
+func WriteViaChunks(t *testing.T, ctx *context.T, bs [2]localblobstore.BlobStore) {
+ // The original blob will be a megabyte.
+ totalLength := 1024 * 1024
+
+ // Write a random blob to bs[0], using seed 1, then check that the
+ // bytes and chunk we get from the blob just written are the same as
+ // those obtained from an identical byte stream.
+ blob0 := writeBlobFromReader(t, ctx, bs[0], "", NewRandReader(1, totalLength, 0, io.EOF), 0)
+ checkBlobAgainstReader(t, ctx, bs[0], blob0, NewRandReader(1, totalLength, 0, io.EOF), 1)
+ checkBlobChunksAgainstReader(t, ctx, bs[0], blob0, NewRandReader(1, totalLength, 0, io.EOF), 2)
+
+ // ---------------------------------------------------------------------
+ // Write into bs[1] a blob that is similar to blob0, but not identical, and check it as above.
+ insertionInterval := 20 * 1024
+ blob1 := writeBlobFromReader(t, ctx, bs[1], "", NewRandReader(1, totalLength, insertionInterval, io.EOF), 3)
+ checkBlobAgainstReader(t, ctx, bs[1], blob1, NewRandReader(1, totalLength, insertionInterval, io.EOF), 4)
+ checkBlobChunksAgainstReader(t, ctx, bs[1], blob1, NewRandReader(1, totalLength, insertionInterval, io.EOF), 5)
+
+ // ---------------------------------------------------------------------
+ // Count the number of chunks, and the number of steps in the recipe
+ // for copying blob0 from bs[0] to bs[1]. We expect the that the
+ // former to be significantly bigger than the latter, because the
+ // insertionInterval is significantly larger than the expected chunk
+ // size.
+ cs := bs[0].BlobChunkStream(ctx, blob0) // Stream of chunks in blob0
+ rs := bs[1].RecipeStreamFromChunkStream(ctx, cs) // Recipe from bs[1]
+
+ recipeLen := 0
+ chunkCount := 0
+ for rs.Advance() {
+ step := rs.Value()
+ if step.Chunk != nil {
+ chunkCount++
+ }
+ recipeLen++
+ }
+ if rs.Err() != nil {
+ t.Fatalf("RecipeStream got error: %v", rs.Err())
+ }
+
+ cs = bs[0].BlobChunkStream(ctx, blob0) // Get the original chunk count.
+ origChunkCount := 0
+ for cs.Advance() {
+ origChunkCount++
+ }
+ if cs.Err() != nil {
+ t.Fatalf("ChunkStream got error: %v", cs.Err())
+ }
+ if origChunkCount < chunkCount*5 {
+ t.Errorf("expected fewer chunks in repipe: recipeLen %d chunkCount %d origChunkCount %d\n",
+ recipeLen, chunkCount, origChunkCount)
+ }
+
+ // Copy blob0 from bs[0] to bs[1], using chunks from blob1 (already in bs[1]) where possible.
+ cs = bs[0].BlobChunkStream(ctx, blob0) // Stream of chunks in blob0
+ // In a real application, at this point the stream cs would be sent to the device with bs[1].
+ rs = bs[1].RecipeStreamFromChunkStream(ctx, cs) // Recipe from bs[1]
+ // Write blob with known blob name.
+ var bw localblobstore.BlobWriter
+ var err error
+ if bw, err = bs[1].NewBlobWriter(ctx, blob0); err != nil {
+ t.Fatalf("bs[1].NewBlobWriter yields error: %v", err)
+ }
+ var br localblobstore.BlobReader
+ const maxFragment = 1024 * 1024
+ blocks := make([]localblobstore.BlockOrFile, maxFragment/chunker.DefaultParam.MinChunk)
+ for gotStep := rs.Advance(); gotStep; {
+ step := rs.Value()
+ if step.Chunk == nil {
+ // This part of the blob can be read from an existing blob locally (at bs[1]).
+ if err = bw.AppendBlob(step.Blob, step.Size, step.Offset); err != nil {
+ t.Fatalf("AppendBlob(%v) yields error: %v", step, err)
+ }
+ gotStep = rs.Advance()
+ } else {
+ var fragmentSize int64
+ // In a real application, the sequence of chunk hashes
+ // in recipe steps would be communicated back to bs[0],
+ // which then finds the associated chunks.
+ var b int
+ for b = 0; gotStep && step.Chunk != nil && fragmentSize+chunker.DefaultParam.MaxChunk < maxFragment; b++ {
+ var loc localblobstore.Location
+ if loc, err = bs[0].LookupChunk(ctx, step.Chunk); err != nil {
+ t.Fatalf("bs[0] unexpectedly does not have chunk %v", step.Chunk)
+ }
+ if br != nil && br.Name() != loc.BlobName { // Close blob if we need a different one.
+ if err = br.Close(); err != nil {
+ t.Fatalf("unexpected error in BlobReader.Close(): %v", err)
+ }
+ br = nil
+ }
+ if br == nil { // Open blob if needed.
+ if br, err = bs[0].NewBlobReader(ctx, loc.BlobName); err != nil {
+ t.Fatalf("unexpected failure to create BlobReader on %q: %v", loc.BlobName, err)
+ }
+ }
+ if loc.Size > chunker.DefaultParam.MaxChunk {
+ t.Fatalf("chunk exceeds max chunk size: %d vs %d", loc.Size, chunker.DefaultParam.MaxChunk)
+ }
+ fragmentSize += loc.Size
+ if blocks[b].Block == nil {
+ blocks[b].Block = make([]byte, chunker.DefaultParam.MaxChunk)
+ }
+ blocks[b].Block = blocks[b].Block[:loc.Size]
+ var i int
+ var n int64
+ for n = int64(0); n != loc.Size; n += int64(i) {
+ if i, err = br.ReadAt(blocks[b].Block[n:loc.Size], n+loc.Offset); err != nil && err != io.EOF {
+ t.Fatalf("ReadAt on %q failed: %v", br.Name(), err)
+ }
+ }
+ if gotStep = rs.Advance(); gotStep {
+ step = rs.Value()
+ }
+ }
+ if err = bw.AppendFragment(blocks[:b]...); err != nil {
+ t.Fatalf("AppendFragment on %q failed: %v", bw.Name(), err)
+ }
+ }
+ }
+ if err = bw.Close(); err != nil {
+ t.Fatalf("BlobWriter.Close on %q failed: %v", bw.Name(), err)
+ }
+
+ // Check that the transferred blob in bs[1] is the same as the original
+ // stream used to make the blob in bs[0].
+ checkBlobAgainstReader(t, ctx, bs[1], blob0, NewRandReader(1, totalLength, 0, io.EOF), 6)
+ checkBlobChunksAgainstReader(t, ctx, bs[1], blob0, NewRandReader(1, totalLength, 0, io.EOF), 7)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/randreader.go b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/randreader.go
new file mode 100644
index 0000000..85e32d3
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/randreader.go
@@ -0,0 +1,53 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package localblobstore_testlib
+
+import "math/rand"
+
+// A RandReader contains a pointer to a rand.Read, and a size limit. Its
+// pointers implement the Read() method from io.Reader, which yields bytes
+// obtained from the random number generator.
+type RandReader struct {
+ rand *rand.Rand // Source of random bytes.
+ pos int // Number of bytes read.
+ limit int // Max number of bytes that may be read.
+ insertInterval int // If non-zero, number of bytes between insertions of zero bytes.
+ eofErr error // error to be returned at the end of the stream
+}
+
+// NewRandReader() returns a new RandReader with the specified seed and size limit.
+// It yields eofErr when the end of the stream is reached.
+// If insertInterval is non-zero, a zero byte is inserted into the stream every
+// insertInterval bytes, before resuming getting bytes from the random number
+// generator.
+func NewRandReader(seed int64, limit int, insertInterval int, eofErr error) *RandReader {
+ r := new(RandReader)
+ r.rand = rand.New(rand.NewSource(seed))
+ r.limit = limit
+ r.insertInterval = insertInterval
+ r.eofErr = eofErr
+ return r
+}
+
+// Read() implements the io.Reader Read() method for *RandReader.
+func (r *RandReader) Read(buf []byte) (n int, err error) {
+ // Generate bytes up to the end of the stream, or the end of the buffer.
+ max := r.limit - r.pos
+ if len(buf) < max {
+ max = len(buf)
+ }
+ for ; n != max; n++ {
+ if r.insertInterval == 0 || (r.pos%r.insertInterval) != 0 {
+ buf[n] = byte(r.rand.Int31n(256))
+ } else {
+ buf[n] = 0
+ }
+ r.pos++
+ }
+ if r.pos == r.limit {
+ err = r.eofErr
+ }
+ return n, err
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go b/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go
new file mode 100644
index 0000000..c3f11d4
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go
@@ -0,0 +1,368 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Example code for transferring a blob from one device to another.
+// See the simulateResumption constant to choose whether to simulate a full
+// transfer or a resumed one.
+package localblobstore_test
+
+import "bytes"
+import "fmt"
+import "io"
+import "io/ioutil"
+import "math/rand"
+import "os"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
+import "v.io/v23/context"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
+
+// simulateResumption tells the receiver whether to simulate having a partial
+// blob before blob transfer.
+const simulateResumption = true
+
+// createBlobStore() returns a new BlobStore, and the name of the directory
+// used to implement it.
+func createBlobStore(ctx *context.T) (bs localblobstore.BlobStore, dirName string) {
+ var err error
+ if dirName, err = ioutil.TempDir("", "localblobstore_transfer_test"); err != nil {
+ panic(err)
+ }
+ if bs, err = fs_cablobstore.Create(ctx, dirName); err != nil {
+ panic(err)
+ }
+ return bs, dirName
+}
+
+// createBlob writes a blob to bs of k 32kByte blocks drawn from a determinstic
+// but arbitrary random stream, starting at block offset within that stream.
+// Returns its name, which is "blob" if non-empty, and chosen arbitrarily otherwise.
+// The blob is finalized iff "complete" is true.
+func createBlob(ctx *context.T, bs localblobstore.BlobStore, blob string, complete bool, offset int, count int) string {
+ var bw localblobstore.BlobWriter
+ var err error
+ if bw, err = bs.NewBlobWriter(ctx, blob); err != nil {
+ panic(err)
+ }
+ blob = bw.Name()
+ var buffer [32 * 1024]byte
+ block := localblobstore.BlockOrFile{Block: buffer[:]}
+ r := rand.New(rand.NewSource(1)) // Always seed with 1 for repeatability.
+ for i := 0; i != offset+count; i++ {
+ for b := 0; b != len(buffer); b++ {
+ buffer[b] = byte(r.Int31n(256))
+ }
+ if i >= offset {
+ if err = bw.AppendFragment(block); err != nil {
+ panic(err)
+ }
+ }
+ }
+ if complete {
+ err = bw.Close()
+ } else {
+ err = bw.CloseWithoutFinalize()
+ }
+ if err != nil {
+ panic(err)
+ }
+ return blob
+}
+
+// A channelChunkStream turns a channel of chunk hashes into a ChunkStream.
+type channelChunkStream struct {
+ channel <-chan []byte
+ ok bool
+ value []byte
+}
+
+// newChannelChunkStream returns a ChunkStream, given a channel containing the
+// relevant chunk hashes.
+func newChannelChunkStream(ch <-chan []byte) localblobstore.ChunkStream {
+ return &channelChunkStream{channel: ch, ok: true}
+}
+
+// The following are the standard ChunkStream methods.
+func (cs *channelChunkStream) Advance() bool {
+ if cs.ok {
+ cs.value, cs.ok = <-cs.channel
+ }
+ return cs.ok
+}
+func (cs *channelChunkStream) Value(buf []byte) []byte { return cs.value }
+func (cs *channelChunkStream) Err() error { return nil }
+func (cs *channelChunkStream) Cancel() {}
+
+// Example_blobTransfer() demonstrates how to transfer a blob incrementally
+// from one device's blob store to another. In this code, the communication
+// between sender and receiver is modelled with Go channels.
+func Example_blobTransfer() {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ // ----------------------------------------------
+ // Channels used to send chunk hashes to receiver always end in
+ // ToSender or ToReceiver.
+ type blobData struct {
+ name string
+ size int64
+ checksum []byte
+ }
+ blobDataToReceiver := make(chan blobData) // indicate basic data for blob
+ needChunksToSender := make(chan bool) // indicate receiver does not have entire blob
+ chunkHashesToReceiver := make(chan []byte) // for initial trasfer of chunk hashes
+ chunkHashesToSender := make(chan []byte) // to report which chunks receiver needs
+ chunksToReceiver := make(chan []byte) // to report which chunks receiver needs
+
+ sDone := make(chan bool) // closed when sender done
+ rDone := make(chan bool) // closed when receiver done
+
+ // ----------------------------------------------
+ // The sender.
+ go func(ctx *context.T,
+ blobDataToReceiver chan<- blobData,
+ needChunksToSender <-chan bool,
+ chunkHashesToReceiver chan<- []byte,
+ chunkHashesToSender <-chan []byte,
+ chunksToReceiver chan<- []byte,
+ done chan<- bool) {
+
+ defer close(done)
+ var err error
+
+ bsS, bsSDir := createBlobStore(ctx)
+ defer os.RemoveAll(bsSDir)
+
+ blob := createBlob(ctx, bsS, "", true, 0, 32) // Create a 1M blob at the sender.
+
+ // 1. Send basic blob data to receiver.
+ var br localblobstore.BlobReader
+ if br, err = bsS.NewBlobReader(ctx, blob); err != nil {
+ panic(err)
+ }
+ blobDataToReceiver <- blobData{name: blob, size: br.Size(), checksum: br.Hash()}
+ br.Close()
+ close(blobDataToReceiver)
+
+ // 3. Get indication from receiver of whether it needs blob.
+ needChunks := <-needChunksToSender
+
+ if !needChunks { // Receiver has blob; done.
+ return
+ }
+
+ // 4. Send the chunk hashes to the receiver. This proceeds concurrently
+ // with the step below.
+ go func(ctx *context.T, blob string, chunkHashesToReceiver chan<- []byte) {
+ cs := bsS.BlobChunkStream(ctx, blob)
+ for cs.Advance() {
+ chunkHashesToReceiver <- cs.Value(nil)
+ }
+ if cs.Err() != nil {
+ panic(cs.Err())
+ }
+ close(chunkHashesToReceiver)
+ }(ctx, blob, chunkHashesToReceiver)
+
+ // 7. Get needed chunk hashes from receiver, find the relevant
+ // data, and send it back to the receiver.
+ var cbr localblobstore.BlobReader // Cached read handle on most-recent-read blob, or nil
+ // Given chunk hash h from chunkHashesToSender, send chunk to chunksToReceiver.
+ for h := range chunkHashesToSender {
+ loc, err := bsS.LookupChunk(ctx, h)
+ for err == nil && (cbr == nil || cbr.Name() != loc.BlobName) {
+ if cbr != nil && cbr.Name() != loc.BlobName {
+ cbr.Close()
+ cbr = nil
+ }
+ if cbr == nil {
+ if cbr, err = bsS.NewBlobReader(ctx, loc.BlobName); err != nil {
+ bsS.GC(ctx) // A partially-deleted blob may be confusing things.
+ loc, err = bsS.LookupChunk(ctx, h)
+ }
+ }
+ }
+ var i int = 1
+ var n int64
+ buffer := make([]byte, loc.Size) // buffer for current chunk
+ for n = int64(0); n != loc.Size && i != 0 && err == nil; n += int64(i) {
+ if i, err = cbr.ReadAt(buffer[n:loc.Size], n+loc.Offset); err == io.EOF {
+ err = nil // EOF is expected
+ }
+ }
+ if n == loc.Size { // Got chunk.
+ chunksToReceiver <- buffer[:loc.Size]
+ }
+ if err != nil {
+ break
+ }
+ }
+ close(chunksToReceiver)
+ if cbr != nil {
+ cbr.Close()
+ }
+
+ }(ctx, blobDataToReceiver, needChunksToSender, chunkHashesToReceiver, chunkHashesToSender, chunksToReceiver, sDone)
+
+ // ----------------------------------------------
+ // The receiver.
+ go func(ctx *context.T,
+ blobDataToReceiver <-chan blobData,
+ needChunksToSender chan<- bool,
+ chunkHashesToReceiver <-chan []byte,
+ chunkHashesToSender chan<- []byte,
+ chunksToReceiver <-chan []byte,
+ done chan<- bool) {
+
+ defer close(done)
+ var err error
+
+ bsR, bsRDir := createBlobStore(ctx)
+ defer os.RemoveAll(bsRDir)
+
+ // 2. Receive basic blob data from sender.
+ blobInfo := <-blobDataToReceiver
+
+ if simulateResumption {
+ // Write a fraction of the (unfinalized) blob on the receiving side
+ // to check that the transfer process can resume a partial blob.
+ createBlob(ctx, bsR, blobInfo.name, false, 0, 10)
+ }
+
+ // 3. Tell sender whether the recevier already has the complete
+ // blob.
+ needChunks := true
+ var br localblobstore.BlobReader
+ if br, err = bsR.NewBlobReader(ctx, blobInfo.name); err == nil {
+ if br.IsFinalized() {
+ if len(br.Hash()) == len(blobInfo.checksum) && bytes.Compare(br.Hash(), blobInfo.checksum) != 0 {
+ panic("receiver has a finalized blob with same name but different hash")
+ }
+ needChunks = false // The receiver already has the blob.
+ }
+ br.Close()
+ }
+ needChunksToSender <- needChunks
+ close(needChunksToSender)
+
+ if !needChunks { // Receiver has blob; done.
+ return
+ }
+
+ // 5. Receive the chunk hashes from the sender, and turn them
+ // into a recipe.
+ cs := newChannelChunkStream(chunkHashesToReceiver)
+ rs := bsR.RecipeStreamFromChunkStream(ctx, cs)
+
+ // 6. The following thread sends the chunk hashes that the
+ // receiver does not have are to the sender. It also makes
+ // a duplicate of the stream on the channel rsCopy. The
+ // buffering in rsCopy allows the receiver to put several
+ // chunks into a fragment.
+ rsCopy := make(chan localblobstore.RecipeStep, 100) // A buffered copy of the rs stream.
+ go func(ctx *context.T, rs localblobstore.RecipeStream, rsCopy chan<- localblobstore.RecipeStep, chunkHashesToSender chan<- []byte) {
+ for rs.Advance() {
+
+ step := rs.Value()
+ if step.Chunk != nil { // Data must be fetched from sender.
+ chunkHashesToSender <- step.Chunk
+ }
+ rsCopy <- step
+ }
+ close(chunkHashesToSender)
+ close(rsCopy)
+ }(ctx, rs, rsCopy, chunkHashesToSender)
+
+ // 8. The following thread splices the chunks from the sender
+ // (on chunksToReceiver) into the recipe stream copy
+ // (rsCopy) to generate a full recipe stream (rsFull) in
+ // which chunks are actual data, rather than just hashes.
+ rsFull := make(chan localblobstore.RecipeStep) // A recipe stream containing chunk data, not just hashes.
+ go func(ctx *context.T, rsCopy <-chan localblobstore.RecipeStep, chunksToReceiver <-chan []byte, rsFull chan<- localblobstore.RecipeStep) {
+ var ok bool
+ for step := range rsCopy {
+ if step.Chunk != nil { // Data must be fetched from sender.
+ if step.Chunk, ok = <-chunksToReceiver; !ok {
+ break
+ }
+ }
+ rsFull <- step
+ }
+ close(rsFull)
+ }(ctx, rsCopy, chunksToReceiver, rsFull)
+
+ // 9. Write the blob using the recipe.
+ var chunksTransferred int
+ const fragmentThreshold = 1024 * 1024 // Try to write on-disc fragments fragments at least this big.
+ var ignoreBytes int64
+ var bw localblobstore.BlobWriter
+ if bw, err = bsR.ResumeBlobWriter(ctx, blobInfo.name); err != nil {
+ bw, err = bsR.NewBlobWriter(ctx, blobInfo.name)
+ } else {
+ ignoreBytes = bw.Size()
+ }
+ if err == nil {
+ var fragment []localblobstore.BlockOrFile
+ var fragmentSize int64
+ for step := range rsFull {
+ if step.Chunk == nil { // Data can be obtained from local blob.
+ if ignoreBytes >= step.Size { // Ignore chunks we already have.
+ ignoreBytes -= step.Size
+ } else {
+ err = bw.AppendBlob(step.Blob, step.Size-ignoreBytes, step.Offset+ignoreBytes)
+ ignoreBytes = 0
+ }
+ } else if ignoreBytes >= int64(len(step.Chunk)) { // Ignoer chunks we already have.
+ ignoreBytes -= int64(len(step.Chunk))
+ } else { // Data is from a chunk send by the sender.
+ chunksTransferred++
+ fragment = append(fragment, localblobstore.BlockOrFile{Block: step.Chunk[ignoreBytes:]})
+ fragmentSize += int64(len(step.Chunk)) - ignoreBytes
+ ignoreBytes = 0
+ if fragmentSize > fragmentThreshold {
+ err = bw.AppendFragment(fragment...)
+ fragment = fragment[:0]
+ fragmentSize = 0
+ }
+ }
+ if err != nil {
+ break
+ }
+ }
+ if err == nil && len(fragment) != 0 {
+ err = bw.AppendFragment(fragment...)
+ }
+ if err2 := bw.Close(); err == nil {
+ err = err2
+ }
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ // 10. Verify that the blob was written correctly.
+ if br, err = bsR.NewBlobReader(ctx, blobInfo.name); err != nil {
+ panic(err)
+ }
+ if br.Size() != blobInfo.size {
+ panic("transferred blob has wrong size")
+ }
+ if len(br.Hash()) != len(blobInfo.checksum) || bytes.Compare(br.Hash(), blobInfo.checksum) != 0 {
+ panic("transferred blob has wrong checksum")
+ }
+ if err = br.Close(); err != nil {
+ panic(err)
+ }
+ fmt.Printf("%d chunks transferred\n", chunksTransferred)
+ }(ctx, blobDataToReceiver, needChunksToSender, chunkHashesToReceiver, chunkHashesToSender, chunksToReceiver, rDone)
+
+ // ----------------------------------------------
+ // Wait for sender and receiver to finish.
+ _ = <-sDone
+ _ = <-rDone
+
+ // Output: 635 chunks transferred
+}
diff --git a/x/ref/services/syncbase/localblobstore/model.go b/x/ref/services/syncbase/localblobstore/model.go
index b1a4565..16af4cf 100644
--- a/x/ref/services/syncbase/localblobstore/model.go
+++ b/x/ref/services/syncbase/localblobstore/model.go
@@ -4,6 +4,50 @@
// Package localblobstore is the interface to a local blob store.
// Implementations include fs_cablobstore.
+//
+// Expected use
+// ============
+// These examples assume that bs, bsS (sender) and bsR (receiver) are blobstores.
+//
+// Writing blobs
+// bw, err := bs.NewBlobWriter(ctx, "") // For a new blob, implementation picks blob name.
+// if err == nil {
+// blobName := bw.Name() // Get name the implementation picked.
+// ... use bw.AppendFragment() to append data to the blob...
+// ... and/or bw.AppendBlob() to append data that's in another existing blob...
+// err = bw.Close()
+// }
+//
+// Resume writing a blob that was partially written due to a crash (not yet finalized).
+// bw, err := bs.ResumeBlobWriter(ctx, name)
+// if err == nil {
+// size := bw.Size() // The store has this many bytes from the blob.
+// ... write the remaining data using bwAppendFragment() and/or bw.AppendBlob()...
+// err = bw.Close()
+// }
+//
+// Reading blobs
+// br, err := bs.NewBlobReader(ctx, name)
+// if err == nil {
+// ... read bytes with br.ReadAt() or br.Read(), perhas with br.Seek()...
+// err = br.Close()
+// }
+//
+// Transferring blobs from one store to another:
+// See example in localblobstore_transfer_test.go
+// Summary:
+// - The sender sends the chunksum of the blob from BlobReader's Hash().
+// - The receiver checks whether it already has the blob, with the same
+// checksum.
+// - If the receiver does not have the blob, the sender sends the list of chunk
+// hashes in the blob using BlobChunkStream().
+// - The receiver uses RecipeStreamFromChunkStream() with the chunk hash stream
+// from the sender, and tells the sender the chunk hashes of the chunks it
+// needs.
+// - The sender uses LookupChunk() to find the data for each chunk the receiver
+// needs, and sends it to the receiver.
+// - The receiver applies the recipe steps, with the actual chunkj data from
+// the sender and its own local data.
package localblobstore
import "v.io/v23/context"
@@ -17,12 +61,16 @@
NewBlobReader(ctx *context.T, blobName string) (br BlobReader, err error)
// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
- // a newly created blob name, which can be found using the Name()
- // method. BlobWriters should not be used concurrently by multiple
- // threads. The returned handle should be closed with either the
- // Close() or CloseWithoutFinalize() method to avoid leaking file
- // handles.
- NewBlobWriter(ctx *context.T) (bw BlobWriter, err error)
+ // a newly created blob. If "name" is non-empty, its is used to name
+ // the blob, and it must be in the format of a name returned by this
+ // interface (probably by another instance on another device).
+ // Otherwise, otherwise a new name is created, which can be found using
+ // the Name() method. It is an error to attempt to overwrite a blob
+ // that already exists in this blob store. BlobWriters should not be
+ // used concurrently by multiple threads. The returned handle should
+ // be closed with either the Close() or CloseWithoutFinalize() method
+ // to avoid leaking file handles.
+ NewBlobWriter(ctx *context.T, name string) (bw BlobWriter, err error)
// ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on
// an old, but unfinalized blob name.
@@ -36,6 +84,31 @@
// other calls to GC(), and with uses of BlobReaders and BlobWriters.
GC(ctx *context.T) error
+ // BlobChunkStream() returns a ChunkStream that can be used to read the
+ // ordered list of content hashes of chunks in blob blobName. It is
+ // expected that this list will be presented to
+ // RecipeStreamFromChunkStream() on another device, to create a recipe
+ // for transmitting the blob efficiently to that other device.
+ BlobChunkStream(ctx *context.T, blobName string) ChunkStream
+
+ // RecipeStreamFromChunkStream() returns a pointer to a RecipeStream
+ // that allows the client to iterate over each RecipeStep needed to
+ // create the blob formed by the chunks in chunkStream. It is expected
+ // that this will be called on a receiving device, and be given a
+ // ChunkStream from a sending device, to yield a recipe for efficient
+ // chunk transfer. RecipeStep values with non-nil Chunk fields need
+ // the chunk from the sender; once the data is returned is can be
+ // written with BlobWriter.AppendFragment(). Those with blob
+ // references can be written locally with BlobWriter.AppendBlob().
+ RecipeStreamFromChunkStream(ctx *context.T, chunkStream ChunkStream) RecipeStream
+
+ // LookupChunk() returns the location of a chunk with the specified chunk
+ // hash within the store. It is expected that chunk hashes from
+ // RecipeStep entries from RecipeStreamFromChunkStream() will be mapped
+ // to blob Location values on the sender for transmission to the
+ // receiver.
+ LookupChunk(ctx *context.T, chunkHash []byte) (loc Location, err error)
+
// ListBlobIds() returns an iterator that can be used to enumerate the
// blobs in a BlobStore. Expected use is:
//
@@ -46,7 +119,7 @@
// if iter.Err() != nil {
// // The loop terminated early due to an error.
// }
- ListBlobIds(ctx *context.T) (iter Iter)
+ ListBlobIds(ctx *context.T) (iter Stream)
// ListCAIds() returns an iterator that can be used to enumerate the
// content-addressable fragments in a BlobStore. Expected use is:
@@ -58,12 +131,20 @@
// if iter.Err() != nil {
// // The loop terminated early due to an error.
// }
- ListCAIds(ctx *context.T) (iter Iter)
+ ListCAIds(ctx *context.T) (iter Stream)
// Root() returns the name of the root directory where the BlobStore is stored.
Root() string
}
+// A Location describes chunk's location within a blob. It is returned by
+// BlobStore.LookupChunk().
+type Location struct {
+ BlobName string // name of blob
+ Offset int64 // byte offset of chunk within blob
+ Size int64 // size of chunk
+}
+
// A BlobReader allows a blob to be read using the standard ReadAt(), Read(),
// and Seek() calls. A BlobReader can be created with NewBlobReader(), and
// should be closed with the Close() method to avoid leaking file handles.
@@ -153,18 +234,67 @@
Hash() []byte
}
-// A Iter represents an iterator that allows the client to enumerate
-// all the blobs of fragments in a BlobStore.
-type Iter interface {
- // Advance() stages an item so that it may be retrieved via Value.
- // Returns true iff there is an item to retrieve. Advance must be
- // called before Value is called.
- Advance() (advanced bool)
+// A Stream represents an iterator that allows the client to enumerate
+// all the blobs or fragments in a BlobStore.
+//
+// The interfaces Stream, ChunkStream, RecipeStream all have four calls,
+// and differ only in the Value() call.
+type Stream interface {
+ // Advance() stages an item so that it may be retrieved via Value().
+ // Returns true iff there is an item to retrieve. Advance() must be
+ // called before Value() is called. The caller is expected to read
+ // until Advance() returns false, or to call Cancel().
+ Advance() bool
- // Value() returns the item that was staged by Advance. May panic if
- // Advance returned false or was not called. Never blocks.
+ // Value() returns the item that was staged by Advance(). May panic if
+ // Advance() returned false or was not called. Never blocks.
Value() (name string)
// Err() returns any error encountered by Advance. Never blocks.
Err() error
+
+ // Cancel() indicates that the client wishes to cease reading from the stream.
+ // It causes the next call to Advance() to return false. Never blocks.
+ // It may be called concurrently with other calls on the stream.
+ Cancel()
+}
+
+// A ChunkStream represents an iterator that allows the client to enumerate
+// the chunks in a blob. See the comments for Stream for usage.
+type ChunkStream interface {
+ Advance() bool
+
+ // Value() returns the chunkHash that was staged by Advance(). May
+ // panic if Advance() returned false or was not called. Never blocks.
+ // The result may share storage with buf[] if it is large enough;
+ // otherwise, a new buffer is allocated. It is legal to call with
+ // buf==nil.
+ Value(buf []byte) (chunkHash []byte)
+
+ Err() error
+ Cancel()
+}
+
+// A RecipeStep describes one piece of a recipe for making a blob.
+// The step consists either of appending the chunk with content hash Chunk and size Size,
+// or (if Chunk==nil) the Size bytes from Blob, starting at Offset.
+type RecipeStep struct {
+ Chunk []byte
+ Blob string
+ Size int64
+ Offset int64
+}
+
+// A RecipeStream represents an iterator that allows the client to obtain the
+// steps needed to construct a blob with a given ChunkStream, attempting to
+// reuse data in existing blobs. See the comments for Stream for usage.
+type RecipeStream interface {
+ Advance() bool
+
+ // Value() returns the RecipeStep that was staged by Advance(). May panic if
+ // Advance() returned false or was not called. Never blocks.
+ Value() RecipeStep
+
+ Err() error
+ Cancel()
}