v.io/syncbase/x/ref/services/syncbase/localblobstore: Integrate chunk handling into main blob store code.

This change integrates the existing code to break byte streams into chunks
and to maintain map to/from chunks and blob locations into the main blob store interface.

It also fixes a few annoyances with the blob store interface:
	- Iter is now called Stream
	- Streams now have Cancel calls
	- Various calls now take a *context.T

chunker/chunker.go
        - add Cancel() method to the streams so the Streams have standard
          Vanadium stream signatures.

        - add a context.T argument to the stream creations
          (I should have had one before.  It's now needed for verror.New()
          when generating the "stream cancelled" error.)

chunker/chunker_test.go
        - move the NewRandReader code to localblobstore_testlib/randreader.go
          so that it may be used by localblobstore_testlib/localblobstore_testlib.go.

chunkmap/chunkmap.go
        - rename the Blob field of chunkmap.Location to BlobID to reduce
          confusion with blob names.
          (Blob names are strings seen by clients, while blob IDs are more
          compact byte vectors for internal use.)

        - rename the chunk parameter to ChunkMap.LookupChunk to chunkHash,
          to make it clear that it's the hash of a chunk, rather than the chunk
          contents.  (Both are byte vectors, so it's unclear from the type.)

        - rename BlobStream to ChunkStream, to distinguish it from the new
          BlobStream (see below).  Now a BlobStream generates a sequence of
          blobs in a ChunkMap, while a ChunkStream generates a sequence of
          Chunks in a Blob..

        - add new BlobStream, which allows a client to generate the list of
          blobs in the ChunkMap.  This is used primarily by the blobstore
          garbage collector, to elimate blobs that have been deleted from the
          blob store, but left in the ChunkMap.

chunkmap/chunkmap_test.go
        - add test code for the new BlobStream
        - rename the old BlobStream uses to ChunkStream.

	- fix a bug in verifyChunksInBlob() which was testing that loc.Blob equalled itself.
	  It should not have been checking this at all, since (after renaming) loc.BlobID
	  and loc2.BlobID can differe legitimately.  The comment now explains this.

fs_cablobstore/fs_cablobstore.go
        - add chunk support fo the blob store  (see model.go for the public API changes)

          A ChunkMap is added to the store in a new "chunk" subdirectory.
          Chunks are written to the ChunkMap when a blob is written
          by forking a thread that reads the blob as it is being written;
          this reuses the BlobReader code for reading the data.
          Some synchronization in the blob descriptor keeps the BlobReader from trying
          to read parts of the blob that have not yet been written.
          See the comments on the fields of blobDesc for the way the mutex is used.

        - The call NewBlobWriter() has an extra argument to allow the caller
          to specify the name of the blob.  This is used when a blob is transferred from
          one store to another---the name of the blob is unchanged on the other side.

          For new blobs, created on the current device, an empty name can be specified
          and the system will picka random one.

                        --------------

        - add comments to distinguish fragments (the on-disc unit of storage)
          from chunks (the on-network unit of transfer)
        - add a "chunk" subdirectory to the blob store, which keeps the chunkmap.
        - add code to maintain the ChunkMap as blobs are written and deleted.

        - fix format of comments on FsCaBlobStore fields.

        - fileNameToHash() as the inverse of hashToFileName(),
          and make both general by allowing them to use an arbitrary directory prefix.
          These are used to convert between ID and names.
          (The reason for using names where possible is that they are human
          readble.  The reason for using IDs for chunks in the ChunkStream is
          that when transferring a large blob, the amount of data transmitted
          over the wire is smaller if raw bytes are used.)
          filenameToHash() now checks that the input string has the specified directory prefix,
          which subsumed some of the checking that was done in (say) DeleteBlob().

        - NewBlobWriter() and ResumeBlobWriter() now fork the thread that
          writes the chunks to the ChunkMap.

	- a fix to ResumeBlobWriter() which would crash if given the name of a non-existent blob

        - the new internal calls forkWriteChunkMap(), joinWriteChunkMap()
          create and then wait for a thread that runs writeChunkMap() to write
          teh chunks into the ChunkMap.  Each insertion is performed by insertChunk().

        - BlobWriter.Close() and BlobWriter().CloseWithoutFinalize() now tell the
         chunk-writer thread that writing has finished, and call joinWriteChunkMap().

        - AppendBlob() notifies the chunk-writing thread when there is more
          data to look at.

        - BlobReader has a new field waitForWriter that tells the reader
          whether it is synchronizaed with a concurrent BlobWriter.  This is
          set by the blob-writing thread.

        - waitUntilAvailable() encapculates the condition that allows
          the reader of the chunk-writer thread to continue.

        - ReadAt() has some reorganization to avoid holding the lock over I/O operations.
          The main things is that a copy of fragment[i] is put in fragmenti;
          acesses to fragmenti don't need the lock.

        - Seek() needs to use the lock, and waitUntilAvailable() if the offset
          is relative to the end of the file.

        - The FsCasIter gets a context.T fields.

        - Add Cancel() to FsCasIter.

        - Add BlobChunkStream() to allow the client to iterate over the chunks
          in a blob.  Unless the client passes a bad parameter, this is
          implemented by returning a localblobstore.ChunkStream

	- GC() now garbage collects the CHunkMap also.
	  It removes from the ChunkMap any blob present there, but not present in the blob store.

fs_cablobstore/fs_cablobstore_test.go
	- TestWritingViaChunks() mainly calls localblobstore_testlib.WriteViaChunks()
	  which tests that blobs can be written by incremental file transfer.

localblobstore_test.go
	- TestWritingViaChunks() mainly calls localblobstore_testlib.WriteViaChunks()
	  which tests that blobs can be written by incremental file transfer.

localblobstore_testlib/localblobstore_testlib.go
	- accommodate extra argument to NewBlobWriter()

	- add a test that creates two blob stores, stores a blob in one,
	  stores a somewhat similar blob (sharing many chunks) in the other,
	  and then simulates transfering the second blob to the first store.

	  The moments when data is expected to be sent over the wire are called out in the code.

	  In the test, I use Fatalf() instead of Errorf(), because a failure anywere
	  leads to many later failures.

          The test currently assumes that no blobs involved in the transfer are
          deleted in the stores.

localblobstore_testlib/randreader.go
	- This is copied from chunker/chunker_test.go so that it may be used by
	  localblobstore_testlib.go

localblobstore_transfer_test.go
        - A full example of transferring a blob incrementally from a sender to
          a receiver.

model.go
	- Comments at the top to explain expected use.

        - NewBlobWriter() has an extra argument to allow a blob to be written
          with a specified name.  This is expected to happen when a blob is
          transferred between devices, to preserve the original name.

	- Expose
                BlockChunkStream(), which allows the a sending device to list
                the chunks in a blob.

		RecipeStreamFromBlobStream() which allows a receiveing device
		to determine which chunks it needs, and which is has locally.

                LookupChunk() which allows a sending device to find where
                within its store a chunk appears, so that it may be read and sent
                to a receiving device.

          The chunks are recipes are treated as streams rather than arrays in
          case a large blob must be transferred---one so large that it would be
          inconvenient to hold all the chunk hashes or recipe steps in memory
          at one time.

	- The Iter type is now called Stream, and now has a Cancel() method.

	- Expose the ChunkStream interface, which is a implemented by chunkmap,
	  and passed through by the blob store.

	- Expose the RecipeStream interface which allows a client to
	  find the RecipeStep values needed to reconstruct a blob from
	  local and remote data.

Change-Id: I79f9ceefd0c03989949ffc580d0f1277bb30190c
diff --git a/x/ref/services/syncbase/localblobstore/chunker/chunker.go b/x/ref/services/syncbase/localblobstore/chunker/chunker.go
index 84abf6f..cb07533 100644
--- a/x/ref/services/syncbase/localblobstore/chunker/chunker.go
+++ b/x/ref/services/syncbase/localblobstore/chunker/chunker.go
@@ -30,8 +30,17 @@
 // http://www.hpl.hp.com/techreports/2005/HPL-2005-30R1.pdf
 
 import "io"
+import "sync"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore/crc64window"
+import "v.io/v23/context"
+import "v.io/v23/verror"
+
+const pkgPath = "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
+
+var (
+	errStreamCancelled = verror.Register(pkgPath+".errStreamCancelled", verror.NoRetry, "{1:}{2:} Advance() called on cancelled stream{:_}")
+)
 
 // A Param contains the parameters for chunking.
 //
@@ -69,10 +78,13 @@
 // stream.
 type Stream struct {
 	param        Param               // chunking parameters
+	ctx          *context.T          // context of creator
 	window       *crc64window.Window // sliding window for computing the hash
 	buf          []byte              // buffer of data
 	rd           io.Reader           // source of data
 	err          error               // error from rd
+	mu           sync.Mutex          // protects cancelled
+	cancelled    bool                // whether the stream has been cancelled
 	bufferChunks bool                // whether to buffer entire chunks
 	// Invariant:  bufStart <= chunkStart <= chunkEnd <= bufEnd
 	bufStart   int64  // offset in rd of first byte in buf[]
@@ -86,9 +98,10 @@
 // newStream() returns a pointer to a new Stream instance, with the
 // parameters in *param.  This internal version of NewStream() allows the caller
 // to specify via bufferChunks whether entire chunks should be buffered.
-func newStream(param *Param, rd io.Reader, bufferChunks bool) *Stream {
+func newStream(ctx *context.T, param *Param, rd io.Reader, bufferChunks bool) *Stream {
 	s := new(Stream)
 	s.param = *param
+	s.ctx = ctx
 	s.window = crc64window.New(crc64window.ECMA, s.param.WindowWidth)
 	bufSize := int64(8192)
 	if bufferChunks {
@@ -107,8 +120,16 @@
 
 // NewStream() returns a pointer to a new Stream instance, with the
 // parameters in *param.
-func NewStream(param *Param, rd io.Reader) *Stream {
-	return newStream(param, rd, true)
+func NewStream(ctx *context.T, param *Param, rd io.Reader) *Stream {
+	return newStream(ctx, param, rd, true)
+}
+
+// isCancelled() returns whether s.Cancel() has been called.
+func (s *Stream) isCancelled() (cancelled bool) {
+	s.mu.Lock()
+	cancelled = s.cancelled
+	s.mu.Unlock()
+	return cancelled
 }
 
 // Advance() stages the next chunk so that it may be retrieved via Value().
@@ -132,7 +153,7 @@
 			s.bufStart = s.chunkEnd
 		}
 		// Fill buffer with data, unless error/EOF.
-		for s.err == nil && s.bufEnd < s.bufStart+int64(len(s.buf)) {
+		for s.err == nil && s.bufEnd < s.bufStart+int64(len(s.buf)) && !s.isCancelled() {
 			var n int
 			n, s.err = s.rd.Read(s.buf[s.bufEnd-s.bufStart:])
 			s.bufEnd += int64(n)
@@ -148,7 +169,7 @@
 	// While not end of chunk...
 	for s.windowEnd != maxChunk &&
 		(s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) &&
-		(s.windowEnd != s.bufEnd || s.err == nil) {
+		(s.windowEnd != s.bufEnd || s.err == nil) && !s.isCancelled() {
 
 		// Fill the buffer if empty, and there's more data to read.
 		if s.windowEnd == s.bufEnd && s.err == nil {
@@ -167,7 +188,10 @@
 			bufLimit = s.bufEnd
 		}
 		// Advance window until both MinChunk reached and primary boundary found.
-		for s.windowEnd != bufLimit && (s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) {
+		for s.windowEnd != bufLimit &&
+			(s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) &&
+			!s.isCancelled() {
+
 			// Advance the window by one byte.
 			s.hash = s.window.Advance(s.buf[s.windowEnd-s.bufStart])
 			s.windowEnd++
@@ -185,7 +209,7 @@
 		s.chunkEnd = s.windowEnd
 	}
 
-	return s.chunkStart != s.chunkEnd // We have a non-empty chunk to return.
+	return !s.isCancelled() && s.chunkStart != s.chunkEnd // We have a non-empty chunk to return.
 }
 
 // Value() returns the chunk that was staged by Advance().  May panic if
@@ -196,12 +220,26 @@
 
 // Err() returns any error encountered by Advance().  Never blocks.
 func (s *Stream) Err() (err error) {
+	s.mu.Lock()
+	if s.cancelled && (s.err == nil || s.err == io.EOF) {
+		s.err = verror.New(errStreamCancelled, s.ctx)
+	}
+	s.mu.Unlock()
 	if s.err != io.EOF { // Do not consider EOF to be an error.
 		err = s.err
 	}
 	return err
 }
 
+// Cancel() causes the next call to Advance() to return false.
+// It should be used when the client does not wish to iterate to the end of the stream.
+// Never blocks.  May be called concurrently with other method calls on s.
+func (s *Stream) Cancel() {
+	s.mu.Lock()
+	s.cancelled = true
+	s.mu.Unlock()
+}
+
 // ----------------------------------
 
 // A PosStream is just like a Stream, except that the Value() method returns only
@@ -214,9 +252,9 @@
 
 // NewPosStream() returns a pointer to a new PosStream instance, with the
 // parameters in *param.
-func NewPosStream(param *Param, rd io.Reader) *PosStream {
+func NewPosStream(ctx *context.T, param *Param, rd io.Reader) *PosStream {
 	ps := new(PosStream)
-	ps.s = newStream(param, rd, false)
+	ps.s = newStream(ctx, param, rd, false)
 	return ps
 }
 
@@ -237,3 +275,10 @@
 func (ps *PosStream) Err() error {
 	return ps.s.Err()
 }
+
+// Cancel() causes the next call to Advance() to return false.
+// It should be used when the client does not wish to iterate to the end of the stream.
+// Never blocks.  May be called concurrently with other method calls on ps.
+func (ps *PosStream) Cancel() {
+	ps.s.Cancel()
+}
diff --git a/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go b/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go
index 6de3304..57c6fed 100644
--- a/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go
+++ b/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go
@@ -9,68 +9,28 @@
 import "crypto/md5"
 import "fmt"
 import "io"
-import "math/rand"
 import "testing"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
-
-// A RandReader contains a pointer to a rand.Read, and a size limit.  Its
-// pointers implement the Read() method from io.Reader, which yields bytes
-// obtained from the random number generator.
-type RandReader struct {
-	rand           *rand.Rand // Source of random bytes.
-	pos            int        // Number of bytes read.
-	limit          int        // Max number of bytes that may be read.
-	insertInterval int        // If non-zero, number of bytes between insertions of zero bytes.
-	eofErr         error      // error to be returned at the end of the stream
-}
-
-// NewRandReader() returns a new RandReader with the specified seed and size limit.
-// It yields eofErr when the end of the stream is reached.
-// If insertInterval is non-zero, a zero byte is inserted into the stream every
-// insertInterval bytes, before resuming getting bytes from the random number
-// generator.
-func NewRandReader(seed int64, limit int, insertInterval int, eofErr error) *RandReader {
-	r := new(RandReader)
-	r.rand = rand.New(rand.NewSource(seed))
-	r.limit = limit
-	r.insertInterval = insertInterval
-	r.eofErr = eofErr
-	return r
-}
-
-// Read() implements the io.Reader Read() method for *RandReader.
-func (r *RandReader) Read(buf []byte) (n int, err error) {
-	// Generate bytes up to the end of the stream, or the end of the buffer.
-	max := r.limit - r.pos
-	if len(buf) < max {
-		max = len(buf)
-	}
-	for ; n != max; n++ {
-		if r.insertInterval == 0 || (r.pos%r.insertInterval) != 0 {
-			buf[n] = byte(r.rand.Int31n(256))
-		} else {
-			buf[n] = 0
-		}
-		r.pos++
-	}
-	if r.pos == r.limit {
-		err = r.eofErr
-	}
-	return n, err
-}
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
+import "v.io/v23/context"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
 
 // TestChunksPartitionStream() tests that the chunker partitions its input
 // stream into reasonable sized chunks, which when concatenated form the
 // original stream.
 func TestChunksPartitionStream(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
 	var err error
 	totalLength := 1024 * 1024
 
 	// Compute the md5 of an arbiotrary stream.  We will later compare this
 	// with the md5 of the concanenation of chunks from an equivalent
 	// stream.
-	r := NewRandReader(1, totalLength, 0, io.EOF)
+	r := localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF)
 	hStream := md5.New()
 	buf := make([]byte, 8192)
 	for err == nil {
@@ -81,12 +41,12 @@
 	checksumStream := hStream.Sum(nil)
 
 	// Using an equivalent stream, break it into chunks.
-	r = NewRandReader(1, totalLength, 0, io.EOF)
+	r = localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF)
 	param := &chunker.DefaultParam
 	hChunked := md5.New()
 
 	length := 0
-	s := chunker.NewStream(param, r)
+	s := chunker.NewStream(ctx, param, r)
 	for s.Advance() {
 		chunk := s.Value()
 		length += len(chunk)
@@ -116,10 +76,15 @@
 
 // TestPosStream() tests that a PosStream leads to the same chunks as an Stream.
 func TestPosStream(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
 	totalLength := 1024 * 1024
 
-	s := chunker.NewStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
-	ps := chunker.NewPosStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
+	s := chunker.NewStream(ctx, &chunker.DefaultParam,
+		localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF))
+	ps := chunker.NewPosStream(ctx, &chunker.DefaultParam,
+		localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF))
 
 	itReady := s.Advance()
 	pitReady := ps.Advance()
@@ -150,8 +115,8 @@
 
 // chunkSums() returns a vector of md5 checksums for the chunks of the
 // specified Reader, using the default chunking parameters.
-func chunkSums(r io.Reader) (sums [][md5.Size]byte) {
-	s := chunker.NewStream(&chunker.DefaultParam, r)
+func chunkSums(ctx *context.T, r io.Reader) (sums [][md5.Size]byte) {
+	s := chunker.NewStream(ctx, &chunker.DefaultParam, r)
 	for s.Advance() {
 		sums = append(sums, md5.Sum(s.Value()))
 	}
@@ -161,14 +126,17 @@
 // TestInsertions() tests the how chunk sequences differ when bytes are
 // periodically inserted into a stream.
 func TestInsertions(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
 	totalLength := 1024 * 1024
 	insertionInterval := 20 * 1024
 	bytesInserted := totalLength / insertionInterval
 
 	// Get the md5 sums of the chunks of two similar streams, where the
 	// second has an extra bytes every 20k bytes.
-	sums0 := chunkSums(NewRandReader(1, totalLength, 0, io.EOF))
-	sums1 := chunkSums(NewRandReader(1, totalLength, insertionInterval, io.EOF))
+	sums0 := chunkSums(ctx, localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF))
+	sums1 := chunkSums(ctx, localblobstore_testlib.NewRandReader(1, totalLength, insertionInterval, io.EOF))
 
 	// Iterate over chunks of second stream, counting which are in common
 	// with first stream.  We expect to find common chunks within 10 of the
@@ -208,10 +176,13 @@
 // TestError() tests the behaviour of a chunker when given an error by its
 // reader.
 func TestError(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
 	notEOF := fmt.Errorf("not EOF")
 	totalLength := 50 * 1024
-	r := NewRandReader(1, totalLength, 0, notEOF)
-	s := chunker.NewStream(&chunker.DefaultParam, r)
+	r := localblobstore_testlib.NewRandReader(1, totalLength, 0, notEOF)
+	s := chunker.NewStream(ctx, &chunker.DefaultParam, r)
 	length := 0
 	for s.Advance() {
 		chunk := s.Value()
diff --git a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
index f08b796..a13cf9f 100644
--- a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
+++ b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
@@ -8,6 +8,7 @@
 package chunkmap
 
 import "encoding/binary"
+import "sync"
 
 import "v.io/syncbase/x/ref/services/syncbase/store"
 import "v.io/syncbase/x/ref/services/syncbase/store/leveldb"
@@ -75,7 +76,7 @@
 
 // A Location describes chunk's location within a blob.
 type Location struct {
-	Blob   []byte // ID of blob
+	BlobID []byte // ID of blob
 	Offset int64  // byte offset of chunk within blob
 	Size   int64  // size of chunk
 }
@@ -104,8 +105,8 @@
 // associated with the specified Location.
 func (cm *ChunkMap) AssociateChunkWithLocation(ctx *context.T, chunk []byte, loc Location) (err error) {
 	// Check of expected lengths explicitly in routines that modify the database.
-	if len(loc.Blob) != blobIDLen {
-		err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(loc.Blob), blobIDLen)
+	if len(loc.BlobID) != blobIDLen {
+		err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(loc.BlobID), blobIDLen)
 	} else if len(chunk) != chunkHashLen {
 		err = verror.New(errBadChunkHashLen, ctx, cm.dir, len(chunk), chunkHashLen)
 	} else {
@@ -115,7 +116,7 @@
 		// Put the blob-to-chunk entry first, since it's used
 		// to garbage collect the other.
 		keyLen := copy(key[:], blobPrefix)
-		keyLen += copy(key[keyLen:], loc.Blob)
+		keyLen += copy(key[keyLen:], loc.BlobID)
 		binary.BigEndian.PutUint64(key[keyLen:], uint64(loc.Offset))
 		keyLen += offsetLen
 
@@ -126,7 +127,7 @@
 		if err == nil {
 			keyLen = copy(key[:], chunkPrefix)
 			keyLen += copy(key[keyLen:], chunk)
-			keyLen += copy(key[keyLen:], loc.Blob)
+			keyLen += copy(key[keyLen:], loc.BlobID)
 
 			valLen = binary.PutVarint(val[:], loc.Offset)
 			valLen += binary.PutVarint(val[valLen:], loc.Size)
@@ -202,12 +203,12 @@
 // been deleted, the client should remove the blob from the ChunkMap using
 // DeleteBlob(loc.Blob), and try again.  (The client may also wish to
 // arrange at some point to call GC() on the blob store.)
-func (cm *ChunkMap) LookupChunk(ctx *context.T, chunk []byte) (loc Location, err error) {
+func (cm *ChunkMap) LookupChunk(ctx *context.T, chunkHash []byte) (loc Location, err error) {
 	var start [maxKeyLen]byte
 	var limit [maxKeyLen]byte
 
 	startLen := copy(start[:], chunkPrefix)
-	startLen += copy(start[startLen:], chunk)
+	startLen += copy(start[startLen:], chunkHash)
 
 	limitLen := copy(limit[:], start[:startLen])
 	limitLen += copy(limit[limitLen:], blobLimit)
@@ -220,13 +221,13 @@
 		var n int
 		key := s.Key(keyBuf[:])
 		value := s.Value(valBuf[:])
-		loc.Blob = key[len(chunkPrefix)+chunkHashLen:]
+		loc.BlobID = key[len(chunkPrefix)+chunkHashLen:]
 		loc.Offset, n = binary.Varint(value)
 		if n > 0 {
 			loc.Size, n = binary.Varint(value[n:])
 		}
 		if n <= 0 {
-			err = verror.New(errMalformedChunkEntry, ctx, cm.dir, chunk, key, value)
+			err = verror.New(errMalformedChunkEntry, ctx, cm.dir, chunkHash, key, value)
 		}
 		s.Cancel()
 	} else {
@@ -234,23 +235,23 @@
 			err = s.Err()
 		}
 		if err == nil {
-			err = verror.New(errNoSuchChunk, ctx, cm.dir, chunk)
+			err = verror.New(errNoSuchChunk, ctx, cm.dir, chunkHash)
 		}
 	}
 
 	return loc, err
 }
 
-// A BlobStream allows the client to iterate over the chunks in a blob:
-//	bs := cm.NewBlobStream(ctx, blob)
-//	for bs.Advance() {
-//		chunkHash := bs.Value()
+// A ChunkStream allows the client to iterate over the chunks in a blob:
+//	cs := cm.NewChunkStream(ctx, blob)
+//	for cs.Advance() {
+//		chunkHash := cs.Value()
 //		...process chunkHash...
 //	}
-//	if bs.Err() != nil {
+//	if cs.Err() != nil {
 //		...there was an error...
 //	}
-type BlobStream struct {
+type ChunkStream struct {
 	cm     *ChunkMap
 	ctx    *context.T
 	stream store.Stream
@@ -264,9 +265,9 @@
 	more   bool            // whether stream may be consulted again
 }
 
-// NewBlobStream() returns a pointer to a new BlobStream that allows the client
+// NewChunkStream() returns a pointer to a new ChunkStream that allows the client
 // to enumerate the chunk hashes in a blob, in order.
-func (cm *ChunkMap) NewBlobStream(ctx *context.T, blob []byte) *BlobStream {
+func (cm *ChunkMap) NewChunkStream(ctx *context.T, blob []byte) *ChunkStream {
 	var start [maxKeyLen]byte
 	var limit [maxKeyLen]byte
 
@@ -276,13 +277,13 @@
 	limitLen := copy(limit[:], start[:startLen])
 	limitLen += copy(limit[limitLen:], offsetLimit)
 
-	bs := new(BlobStream)
-	bs.cm = cm
-	bs.ctx = ctx
-	bs.stream = cm.st.Scan(start[:startLen], limit[:limitLen])
-	bs.more = true
+	cs := new(ChunkStream)
+	cs.cm = cm
+	cs.ctx = ctx
+	cs.stream = cm.st.Scan(start[:startLen], limit[:limitLen])
+	cs.more = true
 
-	return bs
+	return cs
 }
 
 // Advance() stages an element so the client can retrieve the chunk hash with
@@ -291,27 +292,27 @@
 // Value() or Location() The client must call Cancel if it does not iterate
 // through all elements (i.e. until Advance() returns false).  Advance() may
 // block if an element is not immediately available.
-func (bs *BlobStream) Advance() (ok bool) {
-	if bs.more && bs.err == nil {
-		if !bs.stream.Advance() {
-			bs.err = bs.stream.Err()
-			bs.more = false // no more stream, even if no error
+func (cs *ChunkStream) Advance() (ok bool) {
+	if cs.more && cs.err == nil {
+		if !cs.stream.Advance() {
+			cs.err = cs.stream.Err()
+			cs.more = false // no more stream, even if no error
 		} else {
-			bs.key = bs.stream.Key(bs.keyBuf[:])
-			bs.value = bs.stream.Value(bs.valBuf[:])
-			ok = (len(bs.value) >= chunkHashLen) &&
-				(len(bs.key) == len(blobPrefix)+blobIDLen+offsetLen)
+			cs.key = cs.stream.Key(cs.keyBuf[:])
+			cs.value = cs.stream.Value(cs.valBuf[:])
+			ok = (len(cs.value) >= chunkHashLen) &&
+				(len(cs.key) == len(blobPrefix)+blobIDLen+offsetLen)
 			if ok {
 				var n int
-				bs.loc.Blob = make([]byte, blobIDLen)
-				copy(bs.loc.Blob, bs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
-				bs.loc.Offset = int64(binary.BigEndian.Uint64(bs.key[len(blobPrefix)+blobIDLen:]))
-				bs.loc.Size, n = binary.Varint(bs.value[chunkHashLen:])
+				cs.loc.BlobID = make([]byte, blobIDLen)
+				copy(cs.loc.BlobID, cs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
+				cs.loc.Offset = int64(binary.BigEndian.Uint64(cs.key[len(blobPrefix)+blobIDLen:]))
+				cs.loc.Size, n = binary.Varint(cs.value[chunkHashLen:])
 				ok = (n > 0)
 			}
 			if !ok {
-				bs.err = verror.New(errMalformedBlobEntry, bs.ctx, bs.cm.dir, bs.key, bs.value)
-				bs.stream.Cancel()
+				cs.err = verror.New(errMalformedBlobEntry, cs.ctx, cs.cm.dir, cs.key, cs.value)
+				cs.stream.Cancel()
 			}
 		}
 	}
@@ -323,19 +324,141 @@
 // enough to hold the entire value.  Otherwise, a newly allocated slice will be
 // returned.  It is valid to pass a nil buf.  Value() may panic if Advance()
 // returned false or was not called at all.  Value() does not block.
-func (bs *BlobStream) Value(buf []byte) (result []byte) {
+func (cs *ChunkStream) Value(buf []byte) (result []byte) {
 	if len(buf) < chunkHashLen {
 		buf = make([]byte, chunkHashLen)
 	}
-	copy(buf, bs.value[:chunkHashLen])
+	copy(buf, cs.value[:chunkHashLen])
 	return buf[:chunkHashLen]
 }
 
 // Location() returns the Location associated with the chunk staged by
 // Advance().  Location() may panic if Advance() returned false or was not
 // called at all.  Location() does not block.
-func (bs *BlobStream) Location() Location {
-	return bs.loc
+func (cs *ChunkStream) Location() Location {
+	return cs.loc
+}
+
+// Err() returns a non-nil error iff the stream encountered any errors.  Err()
+// does not block.
+func (cs *ChunkStream) Err() error {
+	return cs.err
+}
+
+// Cancel() notifies the stream provider that it can stop producing elements.
+// The client must call Cancel() if it does not iterate through all elements
+// (i.e. until Advance() returns false).  Cancel() is idempotent and can be
+// called concurrently with a goroutine that is iterating via Advance() and
+// Value().  Cancel() causes Advance() to subsequently return false.
+// Cancel() does not block.
+func (cs *ChunkStream) Cancel() {
+	cs.stream.Cancel()
+}
+
+// A BlobStream allows the client to iterate over the blobs in ChunkMap:
+//	bs := cm.NewBlobStream(ctx)
+//	for bs.Advance() {
+//		blobID := bs.Value()
+//		...process blobID...
+//	}
+//	if bs.Err() != nil {
+//		...there was an error...
+//	}
+type BlobStream struct {
+	cm  *ChunkMap
+	ctx *context.T
+
+	key    []byte          // key for current element
+	keyBuf [maxKeyLen]byte // buffer for keys
+	err    error           // error encountered.
+	mu     sync.Mutex      // protects "more", which may be written in Cancel()
+	more   bool            // whether stream may be consulted again
+}
+
+// keyLimit is the key for limit in store.Scan() calls within a BlobStream.
+var keyLimit []byte
+
+func init() {
+	// The limit key is the maximum length key, all ones after the blobPrefix.
+	keyLimit = make([]byte, maxKeyLen)
+	for i := copy(keyLimit, blobPrefix); i != len(keyLimit); i++ {
+		keyLimit[i] = 0xff
+	}
+}
+
+// NewBlobStream() returns a pointer to a new BlobStream that allows the client
+// to enumerate the blobs ChunkMap, in lexicographic order.
+func (cm *ChunkMap) NewBlobStream(ctx *context.T) *BlobStream {
+	bs := new(BlobStream)
+	bs.cm = cm
+	bs.ctx = ctx
+	bs.more = true
+	return bs
+}
+
+// Advance() stages an element so the client can retrieve the next blob ID with
+// Value().  Advance() returns true iff there is an element to retrieve.  The
+// client must call Advance() before calling Value().  The client must call
+// Cancel if it does not iterate through all elements (i.e. until Advance()
+// returns false).  Advance() may block if an element is not immediately
+// available.
+func (bs *BlobStream) Advance() (ok bool) {
+	bs.mu.Lock()
+	ok = bs.more
+	bs.mu.Unlock()
+	if ok {
+		prefixAndKeyLen := len(blobPrefix) + blobIDLen
+		// Compute the next key to search for.
+		if len(bs.key) == 0 { // First time through: anything starting with blobPrefix.
+			n := copy(bs.keyBuf[:], blobPrefix)
+			bs.key = bs.keyBuf[:n]
+		} else {
+			// Increment the blobID to form the next possible key.
+			i := prefixAndKeyLen - 1
+			for ; i != len(blobPrefix)-1 && bs.keyBuf[i] == 0xff; i-- {
+				bs.keyBuf[i] = 0
+			}
+			if i == len(blobPrefix)-1 { // End of database
+				ok = false
+			} else {
+				bs.keyBuf[i]++
+			}
+			bs.key = bs.keyBuf[:prefixAndKeyLen]
+		}
+		if ok {
+			stream := bs.cm.st.Scan(bs.key, keyLimit)
+			if !stream.Advance() {
+				bs.err = stream.Err()
+				ok = false // no more stream, even if no error
+			} else {
+				bs.key = stream.Key(bs.keyBuf[:])
+				if len(bs.key) < prefixAndKeyLen {
+					bs.err = verror.New(errMalformedBlobEntry, bs.ctx, bs.cm.dir, bs.key, stream.Value(nil))
+					ok = false
+				}
+				stream.Cancel() // We get at most one element from each stream.
+			}
+		}
+		if !ok {
+			bs.mu.Lock()
+			bs.more = false
+			bs.mu.Unlock()
+		}
+	}
+	return ok
+}
+
+// Value() returns the blob ID staged by Advance().  The returned slice may be
+// a sub-slice of buf if buf is large enough to hold the entire value.
+// Otherwise, a newly allocated slice will be returned.  It is valid to pass a
+// nil buf.  Value() may panic if Advance() returned false or was not called at
+// all.  Value() does not block.
+func (bs *BlobStream) Value(buf []byte) (result []byte) {
+	if len(buf) < blobIDLen {
+		buf = make([]byte, blobIDLen)
+	}
+	copy(buf, bs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
+	return buf[:blobIDLen]
 }
 
 // Err() returns a non-nil error iff the stream encountered any errors.  Err()
@@ -351,5 +474,7 @@
 // Value().  Cancel() causes Advance() to subsequently return false.
 // Cancel() does not block.
 func (bs *BlobStream) Cancel() {
-	bs.stream.Cancel()
+	bs.mu.Lock()
+	bs.more = false
+	bs.mu.Unlock()
 }
diff --git a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
index ab961a7..b7ab2df 100644
--- a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
+++ b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
@@ -9,12 +9,12 @@
 import "io/ioutil"
 import "math/rand"
 import "os"
+import "runtime"
 import "testing"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap"
 import "v.io/v23/context"
 
-// import "v.io/v23/verror"
 import "v.io/x/ref/test"
 import _ "v.io/x/ref/runtime/factories/generic"
 
@@ -27,78 +27,125 @@
 	return v
 }
 
-// verifyNoBlob() tests that blob b[blobi] is not present in *cm.
-// callSite is a callsite identifier, output in all error messages.
-func verifyNoBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, callSite int) {
-	bs := cm.NewBlobStream(ctx, b[blobi])
-	for i := 0; bs.Advance(); i++ {
-		t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: %v",
-			callSite, blobi, i, bs.Value(nil))
+// verifyBlobs() tests that the blobs in *cm are those in b[], as revealed via
+// the BlobStream() interface.
+func verifyBlobs(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, b [][]byte) {
+	_, _, callerLine, _ := runtime.Caller(1)
+	seen := make([]bool, len(b)) // seen[i] == whether b[i] seen in *cm
+	bs := cm.NewBlobStream(ctx)
+	var i int
+	for i = 0; bs.Advance(); i++ {
+		blob := bs.Value(nil)
+		var j int
+		for j = 0; j != len(b) && bytes.Compare(b[j], blob) != 0; j++ {
+		}
+		if j == len(b) {
+			t.Errorf("chunkmap_test: line %d: unexpected blob %v present in ChunkMap",
+				callerLine, blob)
+		} else if seen[j] {
+			t.Errorf("chunkmap_test: line %d: blob %v seen twice in ChunkMap",
+				callerLine, blob)
+		} else {
+			seen[j] = true
+		}
+	}
+	if i != len(b) {
+		t.Errorf("chunkmap_test: line %d: found %d blobs in ChunkMap, but expected %d",
+			callerLine, i, len(b))
+	}
+	for j := range seen {
+		if !seen[j] {
+			t.Errorf("chunkmap_test: line %d: blob %v not seen un ChunkMap",
+				callerLine, b[j])
+		}
 	}
 	if bs.Err() != nil {
-		t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Advance: unexpected error %v",
-			callSite, blobi, bs.Err())
+		t.Errorf("chunkmap_test: line %d: BlobStream.Advance: unexpected error %v",
+			callerLine, bs.Err())
 	}
 }
 
-// verifyBlob() tests that blob b[blobi] in *cm contains the expected chunks from c[].
-// Each blob is expected to have 8 chunks, 0...7, except that b[1] has c[8] instead of c[4] for chunk 4.
-// callSite is a callsite identifier, output in all error messages.
-func verifyBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, c [][]byte, callSite int) {
+// verifyNoChunksInBlob() tests that blob b[blobi] has no chunks in *cm, as
+// revealed by the ChunkStream interface.
+func verifyNoChunksInBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte) {
+	_, _, callerLine, _ := runtime.Caller(1)
+	cs := cm.NewChunkStream(ctx, b[blobi])
+	for i := 0; cs.Advance(); i++ {
+		t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: %v",
+			callerLine, blobi, i, cs.Value(nil))
+	}
+	if cs.Err() != nil {
+		t.Errorf("chunkmap_test: line %d: blob %d: ChunkStream.Advance: unexpected error %v",
+			callerLine, blobi, cs.Err())
+	}
+}
+
+// verifyChunksInBlob() tests that blob b[blobi] in *cm contains the expected
+// chunks from c[].  Each blob is expected to have 8 chunks, 0...7, except that
+// b[1] has c[8] instead of c[4] for chunk 4.
+func verifyChunksInBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, c [][]byte) {
+	_, _, callerLine, _ := runtime.Caller(1)
 	var err error
 	var i int
-	bs := cm.NewBlobStream(ctx, b[blobi])
-	for i = 0; bs.Advance(); i++ {
-		chunk := bs.Value(nil)
+	cs := cm.NewChunkStream(ctx, b[blobi])
+	for i = 0; cs.Advance(); i++ {
+		chunk := cs.Value(nil)
 		chunki := i
 		if blobi == 1 && i == 4 { // In blob 1, c[4] is replaced by c[8]
 			chunki = 8
 		}
 		if bytes.Compare(c[chunki], chunk) != 0 {
-			t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: got %v, expected %v",
-				callSite, blobi, i, chunk, c[chunki])
+			t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: got %v, expected %v",
+				callerLine, blobi, i, chunk, c[chunki])
 		}
 
 		var loc chunkmap.Location
 		loc, err = cm.LookupChunk(ctx, chunk)
 		if err != nil {
-			t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: LookupChunk got unexpected error: %v",
-				callSite, blobi, i, err)
+			t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: LookupChunk got unexpected error: %v",
+				callerLine, blobi, i, err)
 		} else {
 			if i == 4 {
-				if bytes.Compare(loc.Blob, b[blobi]) != 0 {
-					t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Blob got %v, expected %v",
-						callSite, blobi, i, loc.Blob, b[blobi])
+				if bytes.Compare(loc.BlobID, b[blobi]) != 0 {
+					t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.BlobID got %v, expected %v",
+						callerLine, blobi, i, loc.BlobID, b[blobi])
 				}
 			} else {
-				if bytes.Compare(loc.Blob, b[0]) != 0 && bytes.Compare(loc.Blob, b[1]) != 0 {
-					t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Blob got %v, expected %v",
-						callSite, blobi, i, loc.Blob, b[blobi])
+				if bytes.Compare(loc.BlobID, b[0]) != 0 && bytes.Compare(loc.BlobID, b[1]) != 0 {
+					t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.BlobID got %v, expected %v",
+						callerLine, blobi, i, loc.BlobID, b[blobi])
 				}
 			}
 			if loc.Offset != int64(i) {
-				t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Offset got %d, expected %d",
-					callSite, blobi, i, loc.Offset, i)
+				t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.Offset got %d, expected %d",
+					callerLine, blobi, i, loc.Offset, i)
 			}
 			if loc.Size != 1 {
-				t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Size got %d, expected 1",
-					callSite, blobi, i, loc.Size)
+				t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.Size got %d, expected 1",
+					callerLine, blobi, i, loc.Size)
 			}
 
-			loc2 := bs.Location()
-			if bytes.Compare(loc.Blob, loc.Blob) != 0 || loc.Offset != loc2.Offset || loc.Size != loc2.Size {
-				t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: disagreement about location: LookupChunk %v vs BlobStream %v",
-					callSite, blobi, i, loc, loc2)
+			// The offsets and sizes will match, between the result
+			// from the stream and the result from LookupChunk(),
+			// because for all chunks written to both, they are
+			// written to the same places.  However, the blob need
+			// not match, since LookupChunk() will return an
+			// arbitrary Location in the store that contains the
+			// chunk.
+			loc2 := cs.Location()
+			if loc.Offset != loc2.Offset || loc.Size != loc2.Size {
+				t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: disagreement about location: LookupChunk %v vs ChunkStream %v",
+					callerLine, blobi, i, loc, loc2)
 			}
 		}
 	}
-	if bs.Err() != nil {
-		t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Err() unepxected error %v",
-			callSite, blobi, bs.Err())
+	if cs.Err() != nil {
+		t.Errorf("chunkmap_test: line %d: blob %d: ChunkStream.Err() unepxected error %v",
+			callerLine, blobi, cs.Err())
 	}
 	if i != 8 {
-		t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Advance unexpectedly saw %d chunks, expected 8",
-			callSite, blobi, i)
+		t.Errorf("chunkmap_test: line %d: blob %d: ChunkStream.Advance unexpectedly saw %d chunks, expected 8",
+			callerLine, blobi, i)
 	}
 }
 
@@ -131,9 +178,10 @@
 	// Nine chunks: c[0 .. 8]
 	c := [][]byte{id(), id(), id(), id(), id(), id(), id(), id(), id()}
 
-	// Verify that there are no chunks in blobs initially.
-	verifyNoBlob(t, ctx, cm, 0, b, 0)
-	verifyNoBlob(t, ctx, cm, 1, b, 1)
+	// Verify that there are no blobs, or chunks in blobs initially.
+	verifyBlobs(t, ctx, cm, nil)
+	verifyNoChunksInBlob(t, ctx, cm, 0, b)
+	verifyNoChunksInBlob(t, ctx, cm, 1, b)
 
 	// Verify that all chunks have no locations initially.
 	for chunki := range c {
@@ -152,7 +200,7 @@
 				chunki = 8
 			}
 			err = cm.AssociateChunkWithLocation(ctx, c[chunki],
-				chunkmap.Location{Blob: b[blobi], Offset: int64(i), Size: 1})
+				chunkmap.Location{BlobID: b[blobi], Offset: int64(i), Size: 1})
 			if err != nil {
 				t.Errorf("chunkmap_test: blob %d: AssociateChunkWithLocation: unexpected error: %v",
 					blobi, err)
@@ -160,9 +208,10 @@
 		}
 	}
 
-	// Verify that the blobs contain the chunks specified.
-	verifyBlob(t, ctx, cm, 0, b, c, 2)
-	verifyBlob(t, ctx, cm, 1, b, c, 3)
+	// Verify that the blobs are present, with the chunks specified.
+	verifyBlobs(t, ctx, cm, b)
+	verifyChunksInBlob(t, ctx, cm, 0, b, c)
+	verifyChunksInBlob(t, ctx, cm, 1, b, c)
 
 	// Verify that all chunks now have locations.
 	for chunki := range c {
@@ -197,8 +246,9 @@
 	}
 
 	// Verify that blob 0 is gone, but blob 1 remains.
-	verifyNoBlob(t, ctx, cm, 0, b, 4)
-	verifyBlob(t, ctx, cm, 1, b, c, 5)
+	verifyBlobs(t, ctx, cm, b[1:])
+	verifyNoChunksInBlob(t, ctx, cm, 0, b)
+	verifyChunksInBlob(t, ctx, cm, 1, b, c)
 
 	// Delete b[1].
 	err = cm.DeleteBlob(ctx, b[1])
@@ -207,9 +257,10 @@
 			err)
 	}
 
-	// Verify that there are no chunks in blobs initially.
-	verifyNoBlob(t, ctx, cm, 0, b, 6)
-	verifyNoBlob(t, ctx, cm, 1, b, 7)
+	// Verify that there are no blobs, or chunks in blobs once more.
+	verifyBlobs(t, ctx, cm, nil)
+	verifyNoChunksInBlob(t, ctx, cm, 0, b)
+	verifyNoChunksInBlob(t, ctx, cm, 1, b)
 
 	// Verify that all chunks have no locations once more.
 	for chunki := range c {
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
index 43cc542..cb8bca8 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -8,13 +8,17 @@
 package fs_cablobstore
 
 // Internals:
-//   The blobstore consists of a directory with "blob", "cas", and "tmp"
-//   subdirectories.
+// Blobs are partitioned into two types of unit: "fragments" and "chunks".
+// A fragment is stored in a single file on disc.  A chunk is a unit of network
+// transmission.
+//
+//   The blobstore consists of a directory with "blob", "cas", "chunk", and
+//   "tmp" subdirectories.
 //   - "tmp" is used for temporary files that are moved into place via
 //     link()/unlink() or rename(), depending on what's available.
 //   - "cas" contains files whose names are content hashes of the files being
 //     named.  A few slashes are thrown into the name near the front so that no
-//     single directory gets too large.
+//     single directory gets too large.  These files are called "fragments".
 //   - "blob" contains files whose names are random numbers.  These names are
 //     visible externally as "blob names".  Again, a few slashes are thrown
 //     into the name near the front so that no single directory gets too large.
@@ -26,13 +30,17 @@
 //     <offset> bytes into <cas-fragment>, which is in the "cas" subtree.  The
 //     "f" line indicates that the blob is "finalized" and gives its complete
 //     md5 hash.  No fragments may be appended to a finalized blob.
+//   - "chunk" contains a store (currently implemented with leveldb) that
+//     maps chunks of blobs to content hashes and vice versa.
 
 import "bufio"
+import "bytes"
 import "crypto/md5"
 import "fmt"
 import "hash"
 import "io"
 import "io/ioutil"
+import "math"
 import "math/rand"
 import "os"
 import "path/filepath"
@@ -42,6 +50,8 @@
 import "time"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap"
 import "v.io/v23/context"
 import "v.io/v23/verror"
 
@@ -62,35 +72,37 @@
 	errCantDeleteBlob         = verror.Register(pkgPath+".errCantDeleteBlob", verror.NoRetry, "{1:}{2:} Can't delete blob {3}{:_}")
 	errBlobDeleted            = verror.Register(pkgPath+".errBlobDeleted", verror.NoRetry, "{1:}{2:} Blob is deleted{:_}")
 	errSizeTooBigForFragment  = verror.Register(pkgPath+".errSizeTooBigForFragment", verror.NoRetry, "{1:}{2:} writing blob {1}, size too big for fragment{:1}")
+	errStreamCancelled        = verror.Register(pkgPath+".errStreamCancelled", verror.NoRetry, "{1:}{2:} Advance() called on cancelled stream{:_}")
 )
 
 // For the moment, we disallow others from accessing the tree where blobs are
-// stored.  We could in the future relax this to 0711 or 0755.
+// stored.  We could in the future relax this to 0711/0755, and 0644.
 const dirPermissions = 0700
+const filePermissions = 0600
 
 // Subdirectories of the blobstore's tree
 const (
-	blobDir = "blob" // Subdirectory where blobs are indexed by blob id.
-	casDir  = "cas"  // Subdirectory where blobs are indexed by content hash.
-	tmpDir  = "tmp"  // Subdirectory where temporary files are created.
+	blobDir  = "blob"  // Subdirectory where blobs are indexed by blob id.
+	casDir   = "cas"   // Subdirectory where fragments are indexed by content hash.
+	chunkDir = "chunk" // Subdirectory where chunks are indexed by content hash.
+	tmpDir   = "tmp"   // Subdirectory where temporary files are created.
 )
 
 // An FsCaBlobStore represents a simple, content-addressable store.
 type FsCaBlobStore struct {
-	rootName string // The name of the root of the store.
+	rootName string             // The name of the root of the store.
+	cm       *chunkmap.ChunkMap // Mapping from chunks to blob locations and vice versa.
 
-	mu sync.Mutex // Protects fields below, plus
-	// blobDesc.fragment, blobDesc.activeDescIndex,
-	// and blobDesc.refCount.
+	// mu protects fields below, plus most fields in each blobDesc when used from a BlobWriter.
+	mu         sync.Mutex
 	activeDesc []*blobDesc        // The blob descriptors in use by active BlobReaders and BlobWriters.
 	toDelete   []*map[string]bool // Sets of items that active GC threads are about to delete. (Pointers to maps, to allow pointer comparison.)
 }
 
-// hashToFileName() returns the content-addressed name of the data with the
-// specified hash, given its content hash.  Requires len(hash)==16.  An md5
-// hash is suitable.
-func hashToFileName(hash []byte) string {
-	return filepath.Join(casDir,
+// hashToFileName() returns the name of the binary ID with the specified
+// prefix.  Requires len(id)==16.  An md5 hash is suitable.
+func hashToFileName(prefix string, hash []byte) string {
+	return filepath.Join(prefix,
 		fmt.Sprintf("%02x", hash[0]),
 		fmt.Sprintf("%02x", hash[1]),
 		fmt.Sprintf("%02x", hash[2]),
@@ -101,6 +113,23 @@
 			hash[12], hash[13], hash[14], hash[15]))
 }
 
+// fileNameToHash() converts a file name in the format generated by
+// hashToFileName(prefix, ...) to a vector of 16 bytes.  If the string is
+// malformed, the nil slice is returned.
+func fileNameToHash(prefix string, s string) []byte {
+	idStr := strings.TrimPrefix(filepath.ToSlash(s), prefix+"/")
+	hash := make([]byte, 16, 16)
+	n, err := fmt.Sscanf(idStr, "%02x/%02x/%02x/%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+		&hash[0], &hash[1], &hash[2], &hash[3],
+		&hash[4], &hash[5], &hash[6], &hash[7],
+		&hash[8], &hash[9], &hash[10], &hash[11],
+		&hash[12], &hash[13], &hash[14], &hash[15])
+	if n != 16 || err != nil {
+		hash = nil
+	}
+	return hash
+}
+
 // newBlobName() returns a new random name for a blob.
 func newBlobName() string {
 	return filepath.Join(blobDir,
@@ -143,7 +172,7 @@
 // Create() returns a pointer to an FsCaBlobStore stored in the file system at
 // "rootName".  If the directory rootName does not exist, it is created.
 func Create(ctx *context.T, rootName string) (fscabs *FsCaBlobStore, err error) {
-	dir := []string{tmpDir, casDir, blobDir}
+	dir := []string{tmpDir, casDir, chunkDir, blobDir}
 	for i := 0; i != len(dir) && err == nil; i++ {
 		fullName := filepath.Join(rootName, dir[i])
 		os.MkdirAll(fullName, dirPermissions)
@@ -153,9 +182,14 @@
 			err = verror.New(errNotADir, ctx, fullName)
 		}
 	}
+	var cm *chunkmap.ChunkMap
+	if err == nil {
+		cm, err = chunkmap.New(ctx, filepath.Join(rootName, chunkDir))
+	}
 	if err == nil {
 		fscabs = new(FsCaBlobStore)
 		fscabs.rootName = rootName
+		fscabs.cm = cm
 	}
 	return fscabs, err
 }
@@ -169,12 +203,15 @@
 func (fscabs *FsCaBlobStore) DeleteBlob(ctx *context.T, blobName string) (err error) {
 	// Disallow deletions of things outside the blob tree, or that may contain "..".
 	// For simplicity, the code currently disallows '.'.
-	if !strings.HasPrefix(blobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
+	blobID := fileNameToHash(blobDir, blobName)
+	if blobID == nil || strings.IndexByte(blobName, '.') != -1 {
 		err = verror.New(errInvalidBlobName, ctx, blobName)
 	} else {
 		err = os.Remove(filepath.Join(fscabs.rootName, blobName))
 		if err != nil {
 			err = verror.New(errCantDeleteBlob, ctx, blobName, err)
+		} else {
+			err = fscabs.cm.DeleteBlob(ctx, blobID)
 		}
 	}
 	return err
@@ -250,7 +287,7 @@
 
 // addFragment() ensures that the store *fscabs contains a fragment comprising
 // the catenation of the byte vectors named by item[..].block and the contents
-// of the files named by item[..].filename.  The block field is ignored if
+// of the files named by item[..].fileName.  The block field is ignored if
 // fileName!="".  The fragment is not physically added if already present.
 // The fragment is added to the fragment list of the descriptor *desc.
 func (fscabs *FsCaBlobStore) addFragment(ctx *context.T, extHasher hash.Hash,
@@ -305,7 +342,7 @@
 
 	// Compute the hash, and form the file name in the respository.
 	hash := hasher.Sum(nil)
-	relFileName := hashToFileName(hash)
+	relFileName := hashToFileName(casDir, hash)
 	absFileName := filepath.Join(fscabs.rootName, relFileName)
 
 	// Add the fragment's name to *desc's fragments so the garbage
@@ -316,7 +353,6 @@
 		size:     size,
 		offset:   0,
 		fileName: relFileName})
-	desc.size += size
 	fscabs.mu.Unlock()
 
 	// If the file does not already exist, ...
@@ -365,7 +401,11 @@
 		// Remove the entry added to fragment list above.
 		fscabs.mu.Lock()
 		desc.fragment = desc.fragment[0 : len(desc.fragment)-1]
-		desc.size -= size
+		fscabs.mu.Unlock()
+	} else { // commit the change by updating the size
+		fscabs.mu.Lock()
+		desc.size += size
+		desc.cv.Broadcast() // Tell chunkmap BlobReader there's more to read.
 		fscabs.mu.Unlock()
 	}
 
@@ -385,16 +425,24 @@
 	activeDescIndex int // Index into fscabs.activeDesc if refCount>0; under fscabs.mu.
 	refCount        int // Reference count; under fscabs.mu.
 
-	name     string         // Name of the blob.
-	fragment []blobFragment // All the fragements in this blob;
-	// modified under fscabs.mu and in BlobWriter
-	// owner's thread; may be read by GC under
-	// fscabs.mu.
-	size      int64 // Total size of the blob.
-	finalized bool  // Whether the blob has been finalized.
+	name string // Name of the blob.
+
+	// The following fields are modified under fscabs.mu and in BlobWriter
+	// owner's thread; they may be read by GC (when obtained from
+	// fscabs.activeDesc) and the chunk writer under fscabs.mu.  In the
+	// BlobWriter owner's thread, reading does not require a lock, but
+	// writing does.  In other contexts (BlobReader, or a desc that has
+	// just been allocated by getBlob()), no locking is needed.
+
+	fragment  []blobFragment // All the fragments in this blob
+	size      int64          // Total size of the blob.
+	finalized bool           // Whether the blob has been finalized.
 	// A finalized blob has a valid hash field, and no new bytes may be added
 	// to it.  A well-formed hash has 16 bytes.
 	hash []byte
+
+	openWriter bool       // Whether this descriptor is being written by an open BlobWriter.
+	cv         *sync.Cond // signalled when a BlobWriter writes or closes.
 }
 
 // isBeingDeleted() returns whether fragment fragName is about to be deleted
@@ -456,7 +504,8 @@
 
 // getBlob() returns the in-memory blob descriptor for the named blob.
 func (fscabs *FsCaBlobStore) getBlob(ctx *context.T, blobName string) (desc *blobDesc, err error) {
-	if !strings.HasPrefix(blobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
+	slashBlobName := filepath.ToSlash(blobName)
+	if !strings.HasPrefix(slashBlobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
 		err = verror.New(errInvalidBlobName, ctx, blobName)
 	} else {
 		absBlobName := filepath.Join(fscabs.rootName, blobName)
@@ -467,6 +516,7 @@
 			desc = new(blobDesc)
 			desc.activeDescIndex = -1
 			desc.name = blobName
+			desc.cv = sync.NewCond(&fscabs.mu)
 			scanner := bufio.NewScanner(fh)
 			for scanner.Scan() {
 				field := strings.Split(scanner.Text(), " ")
@@ -528,32 +578,49 @@
 	desc   *blobDesc // Description of the blob being written.
 	f      *file     // The file being written.
 	hasher hash.Hash // Running hash of blob.
+
+	// Fields to allow the ChunkMap to be written.
+	csBr  *BlobReader     // Reader over the blob that's currently being written.
+	cs    *chunker.Stream // Stream of chunks derived from csBr
+	csErr chan error      // writeChunkMap() sends its result here; Close/CloseWithoutFinalize receives it.
 }
 
-// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on a newly
-// created blob name, which can be found using the Name() method.  BlobWriters
-// should not be used concurrently by multiple threads.  The returned handle
-// should be closed with either the Close() or CloseWithoutFinalize() method to
-// avoid leaking file handles.
-func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (localblobstore.BlobWriter, error) {
+// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
+// a newly created blob.  If "name" is non-empty, it is used to name
+// the blob, and it must be in the format of a name returned by this
+// interface (probably by another instance on another device).
+// Otherwise, a new name is created, which can be found using
+// the Name() method.  It is an error to attempt to overwrite a blob
+// that already exists in this blob store.  BlobWriters should not be
+// used concurrently by multiple threads.  The returned handle should
+// be closed with either the Close() or CloseWithoutFinalize() method
+// to avoid leaking file handles.
+func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T, name string) (localblobstore.BlobWriter, error) {
 	var bw *BlobWriter
-	newName := newBlobName()
-	fileName := filepath.Join(fscabs.rootName, newName)
+	if name == "" {
+		name = newBlobName()
+	}
+	fileName := filepath.Join(fscabs.rootName, name)
 	os.MkdirAll(filepath.Dir(fileName), dirPermissions)
-	f, err := newFile(os.Create(fileName))
+	f, err := newFile(os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, filePermissions))
 	if err == nil {
 		bw = new(BlobWriter)
 		bw.fscabs = fscabs
 		bw.ctx = ctx
 		bw.desc = new(blobDesc)
 		bw.desc.activeDescIndex = -1
-		bw.desc.name = newName
+		bw.desc.name = name
+		bw.desc.cv = sync.NewCond(&fscabs.mu)
+		bw.desc.openWriter = true
 		bw.f = f
 		bw.hasher = md5.New()
 		if !fscabs.descRef(bw.desc) {
 			// Can't happen; descriptor refers to no fragments.
 			panic(verror.New(errBlobDeleted, ctx, bw.desc.name))
 		}
+		// Write the chunks of this blob into the ChunkMap, as they are
+		// written by this writer.
+		bw.forkWriteChunkMap()
 	}
 	return bw, err
 }
@@ -562,14 +629,17 @@
 // old, but unfinalized blob name.
 func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (localblobstore.BlobWriter, error) {
 	var err error
-	bw := new(BlobWriter)
-	bw.fscabs = fscabs
-	bw.ctx = ctx
-	bw.desc, err = bw.fscabs.getBlob(ctx, blobName)
-	if err == nil && bw.desc.finalized {
+	var bw *BlobWriter
+	var desc *blobDesc
+	desc, err = fscabs.getBlob(ctx, blobName)
+	if err == nil && desc.finalized {
 		err = verror.New(errBlobAlreadyFinalized, ctx, blobName)
-		bw = nil
-	} else {
+	} else if err == nil {
+		bw = new(BlobWriter)
+		bw.fscabs = fscabs
+		bw.ctx = ctx
+		bw.desc = desc
+		bw.desc.openWriter = true
 		fileName := filepath.Join(fscabs.rootName, bw.desc.name)
 		bw.f, err = newFile(os.OpenFile(fileName, os.O_WRONLY|os.O_APPEND, 0666))
 		bw.hasher = md5.New()
@@ -581,7 +651,7 @@
 			// non-zero.
 			panic(verror.New(errBlobDeleted, ctx, fileName))
 		}
-		br := fscabs.blobReaderFromDesc(ctx, bw.desc)
+		br := fscabs.blobReaderFromDesc(ctx, bw.desc, dontWaitForWriter)
 		buf := make([]byte, 8192, 8192)
 		for err == nil {
 			var n int
@@ -592,10 +662,89 @@
 		if err == io.EOF { // EOF is expected.
 			err = nil
 		}
+		if err == nil {
+			// Write the chunks of this blob into the ChunkMap, as
+			// they are written by this writer.
+			bw.forkWriteChunkMap()
+		}
 	}
 	return bw, err
 }
 
+// forkWriteChunkMap() creates a new thread to run writeChunkMap().  It adds
+// the chunks written to *bw to the blob store's ChunkMap.  The caller is
+// expected to call joinWriteChunkMap() at some later point.
+func (bw *BlobWriter) forkWriteChunkMap() {
+	// The descRef's ref count is incremented here to compensate
+	// for the decrement it will receive in br.Close() in joinWriteChunkMap.
+	if !bw.fscabs.descRef(bw.desc) {
+		// Can't happen; descriptor's ref count was already non-zero.
+		panic(verror.New(errBlobDeleted, bw.ctx, bw.desc.name))
+	}
+	bw.csBr = bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc, waitForWriter)
+	bw.cs = chunker.NewStream(bw.ctx, &chunker.DefaultParam, bw.csBr)
+	bw.csErr = make(chan error)
+	go bw.writeChunkMap()
+}
+
+// insertChunk() inserts chunk into the blob store's ChunkMap, associating it
+// with the specified byte offset in the blob blobID being written by *bw.  The byte
+// offset of the next chunk is returned.
+func (bw *BlobWriter) insertChunk(blobID []byte, chunkHash []byte, offset int64, size int64) (int64, error) {
+	err := bw.fscabs.cm.AssociateChunkWithLocation(bw.ctx, chunkHash[:],
+		chunkmap.Location{BlobID: blobID, Offset: offset, Size: size})
+	if err != nil {
+		bw.cs.Cancel()
+	}
+	return offset + size, err
+}
+
+// writeChunkMap() iterates over the chunk in stream bw.cs, and associates each
+// one with the blob being written.
+func (bw *BlobWriter) writeChunkMap() {
+	var err error
+	var offset int64
+	blobID := fileNameToHash(blobDir, bw.desc.name)
+	// Associate each chunk only after the next chunk has been seen (or
+	// the blob finalized), to avoid recording an artificially short chunk
+	// at the end of a partial transfer.
+	var chunkHash [md5.Size]byte
+	var chunkLen int64
+	if bw.cs.Advance() {
+		chunk := bw.cs.Value()
+		// Record the hash and size, since chunk's underlying buffer
+		// may be reused by the next call to Advance().
+		chunkHash = md5.Sum(chunk)
+		chunkLen = int64(len(chunk))
+		for bw.cs.Advance() {
+			offset, err = bw.insertChunk(blobID, chunkHash[:], offset, chunkLen)
+			chunk = bw.cs.Value()
+			chunkHash = md5.Sum(chunk)
+			chunkLen = int64(len(chunk))
+		}
+	}
+	if err == nil {
+		err = bw.cs.Err()
+	}
+	bw.fscabs.mu.Lock()
+	if err == nil && chunkLen != 0 && bw.desc.finalized {
+		offset, err = bw.insertChunk(blobID, chunkHash[:], offset, chunkLen)
+	}
+	bw.fscabs.mu.Unlock()
+	bw.csErr <- err // wake joinWriteChunkMap()
+}
+
+// joinWriteChunkMap waits for the completion of the thread forked by forkWriteChunkMap().
+// It returns when the chunks in the blob have been written to the blob store's ChunkMap.
+func (bw *BlobWriter) joinWriteChunkMap(err error) error {
+	err2 := <-bw.csErr // read error from end of writeChunkMap()
+	if err == nil {
+		err = err2
+	}
+	bw.csBr.Close()
+	return err
+}
+
 // Close() finalizes *bw, and indicates that the client will perform no further
 // append operations on *bw.  Any internal open file handles are closed.
 func (bw *BlobWriter) Close() (err error) {
@@ -608,7 +757,12 @@
 		_, err = fmt.Fprintf(bw.f.writer, "f %s\n", hashToString(h)) // finalize
 		_, err = bw.f.close(bw.ctx, err)
 		bw.f = nil
+		bw.fscabs.mu.Lock()
 		bw.desc.finalized = true
+		bw.desc.openWriter = false
+		bw.desc.cv.Broadcast() // Tell chunkmap BlobReader that writing has ceased.
+		bw.fscabs.mu.Unlock()
+		err = bw.joinWriteChunkMap(err)
 		bw.fscabs.descUnref(bw.desc)
 	}
 	return err
@@ -622,8 +776,13 @@
 	if bw.f == nil {
 		err = verror.New(errAlreadyClosed, bw.ctx, bw.desc.name)
 	} else {
+		bw.fscabs.mu.Lock()
+		bw.desc.openWriter = false
+		bw.desc.cv.Broadcast() // Tell chunkmap BlobReader that writing has ceased.
+		bw.fscabs.mu.Unlock()
 		_, err = bw.f.close(bw.ctx, err)
 		bw.f = nil
+		err = bw.joinWriteChunkMap(err)
 		bw.fscabs.descUnref(bw.desc)
 	}
 	return err
@@ -688,6 +847,7 @@
 						offset:   offset + desc.fragment[i].offset,
 						fileName: desc.fragment[i].fileName})
 					bw.desc.size += consume
+					bw.desc.cv.Broadcast() // Tell chunkmap BlobReader there's more to read.
 					bw.fscabs.mu.Unlock()
 				}
 				offset = 0
@@ -701,7 +861,7 @@
 			// non-zero.
 			panic(verror.New(errBlobDeleted, bw.ctx, blobName))
 		}
-		br := bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc)
+		br := bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc, dontWaitForWriter)
 		if err == nil {
 			_, err = br.Seek(origSize, 0)
 		}
@@ -752,7 +912,8 @@
 	fscabs *FsCaBlobStore
 	ctx    *context.T
 
-	desc *blobDesc // A description of the blob being read.
+	desc          *blobDesc // A description of the blob being read.
+	waitForWriter bool      // whether this reader should wait for a concurrent BlobWriter
 
 	pos int64 // The next position we will read from (used by Read/Seek, not ReadAt).
 
@@ -761,14 +922,23 @@
 	fh            *os.File // non-nil iff fragmentIndex != -1.
 }
 
+// constants to make the calls to blobReaderFromDesc invocations more readable
+const (
+	dontWaitForWriter = false
+	waitForWriter     = true
+)
+
 // blobReaderFromDesc() returns a pointer to a newly allocated BlobReader given
-// a pre-existing blobDesc.
-func (fscabs *FsCaBlobStore) blobReaderFromDesc(ctx *context.T, desc *blobDesc) *BlobReader {
+// a pre-existing blobDesc.  If waitForWriter is true, the reader will wait for
+// any BlobWriter to finish writing the part of the blob the reader is trying
+// to read.
+func (fscabs *FsCaBlobStore) blobReaderFromDesc(ctx *context.T, desc *blobDesc, waitForWriter bool) *BlobReader {
 	br := new(BlobReader)
 	br.fscabs = fscabs
 	br.ctx = ctx
 	br.fragmentIndex = -1
 	br.desc = desc
+	br.waitForWriter = waitForWriter
 	return br
 }
 
@@ -779,7 +949,7 @@
 	var desc *blobDesc
 	desc, err = fscabs.getBlob(ctx, blobName)
 	if err == nil {
-		br = fscabs.blobReaderFromDesc(ctx, desc)
+		br = fscabs.blobReaderFromDesc(ctx, desc, dontWaitForWriter)
 	}
 	return br, err
 }
@@ -821,24 +991,41 @@
 	return lo
 }
 
+// waitUntilAvailable() waits until position pos within *br is available for
+// reading, if this reader is waiting for writers.  This may be because:
+//  - *br is on an already written blob.
+//  - *br is on a blob being written that has been closed, or whose writes have
+//    passed position pos.
+// The value pos==math.MaxInt64 can be used to mean "until the writer is closed".
+// Requires br.fscabs.mu held.
+func (br *BlobReader) waitUntilAvailable(pos int64) {
+	for br.waitForWriter && br.desc.openWriter && br.desc.size < pos {
+		br.desc.cv.Wait()
+	}
+}
+
 // ReadAt() fills b[] with up to len(b) bytes of data starting at position "at"
 // within the blob that *br indicates, and returns the number of bytes read.
 func (br *BlobReader) ReadAt(b []byte, at int64) (n int, err error) {
+	br.fscabs.mu.Lock()
+	br.waitUntilAvailable(at + int64(len(b)))
 	i := findFragment(br.desc.fragment, at)
 	if i < len(br.desc.fragment) && at <= br.desc.size {
+		fragmenti := br.desc.fragment[i] // copy fragment data to allow releasing lock
+		br.fscabs.mu.Unlock()
 		if i != br.fragmentIndex {
 			br.closeInternal()
 		}
 		if br.fragmentIndex == -1 {
-			br.fh, err = os.Open(filepath.Join(br.fscabs.rootName, br.desc.fragment[i].fileName))
+			br.fh, err = os.Open(filepath.Join(br.fscabs.rootName, fragmenti.fileName))
 			if err == nil {
 				br.fragmentIndex = i
 			} else {
 				br.closeInternal()
 			}
 		}
-		var offset int64 = at - br.desc.fragment[i].pos + br.desc.fragment[i].offset
-		consume := br.desc.fragment[i].size - (at - br.desc.fragment[i].pos)
+		var offset int64 = at - fragmenti.pos + fragmenti.offset
+		consume := fragmenti.size - (at - fragmenti.pos)
 		if int64(len(b)) < consume {
 			consume = int64(len(b))
 		}
@@ -847,10 +1034,11 @@
 		} else if err == nil {
 			panic("failed to open blob fragment")
 		}
+		br.fscabs.mu.Lock()
 		// Return io.EOF if the Read reached the end of the last
 		// fragment, but not if it's merely the end of some interior
-		// fragment.
-		if int64(n)+at >= br.desc.size {
+		// fragment or the blob is still being extended.
+		if int64(n)+at >= br.desc.size && !(br.waitForWriter && br.desc.openWriter) {
 			if err == nil {
 				err = io.EOF
 			}
@@ -860,6 +1048,7 @@
 	} else {
 		err = verror.New(errIllegalPositionForRead, br.ctx, br.pos, br.desc.size)
 	}
+	br.fscabs.mu.Unlock()
 	return n, err
 }
 
@@ -879,11 +1068,13 @@
 // offset+current_seek_position if whence==1, and offset+end_of_blob if
 // whence==2, and then returns the current seek position.
 func (br *BlobReader) Seek(offset int64, whence int) (result int64, err error) {
+	br.fscabs.mu.Lock()
 	if whence == 0 {
 		result = offset
 	} else if whence == 1 {
 		result = offset + br.pos
 	} else if whence == 2 {
+		br.waitUntilAvailable(math.MaxInt64)
 		result = offset + br.desc.size
 	} else {
 		err = verror.New(errBadSeekWhence, br.ctx, whence)
@@ -898,17 +1089,26 @@
 	} else if err == nil {
 		br.pos = result
 	}
+	br.fscabs.mu.Unlock()
 	return result, err
 }
 
 // IsFinalized() returns whether *br has been finalized.
 func (br *BlobReader) IsFinalized() bool {
-	return br.desc.finalized
+	br.fscabs.mu.Lock()
+	br.waitUntilAvailable(math.MaxInt64)
+	finalized := br.desc.finalized
+	br.fscabs.mu.Unlock()
+	return finalized
 }
 
 // Size() returns *br's size.
 func (br *BlobReader) Size() int64 {
-	return br.desc.size
+	br.fscabs.mu.Lock()
+	br.waitUntilAvailable(math.MaxInt64)
+	size := br.desc.size
+	br.fscabs.mu.Unlock()
+	return size
 }
 
 // Name() returns *br's name.
@@ -918,7 +1118,11 @@
 
 // Hash() returns *br's hash.  It may be nil if the blob is not finalized.
 func (br *BlobReader) Hash() []byte {
-	return br.desc.hash
+	br.fscabs.mu.Lock()
+	br.waitUntilAvailable(math.MaxInt64)
+	hash := br.desc.hash
+	br.fscabs.mu.Unlock()
+	return hash
 }
 
 // -----------------------------------------------------------
@@ -936,6 +1140,10 @@
 	fscabs *FsCaBlobStore // The parent FsCaBlobStore.
 	err    error          // If non-nil, the error that terminated iteration.
 	stack  []dirListing   // The stack of dirListings leading to the current entry.
+	ctx    *context.T     // context passed to ListBlobIds() or ListCAIds()
+
+	mu        sync.Mutex // Protects cancelled.
+	cancelled bool       // Whether Cancel() has been called.
 }
 
 // ListBlobIds() returns an iterator that can be used to enumerate the blobs in
@@ -947,10 +1155,10 @@
 //    if fscabsi.Err() != nil {
 //      // The loop terminated early due to an error.
 //    }
-func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Iter {
+func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Stream {
 	stack := make([]dirListing, 1)
 	stack[0] = dirListing{pos: -1, nameList: []string{blobDir}}
-	return &FsCasIter{fscabs: fscabs, stack: stack}
+	return &FsCasIter{fscabs: fscabs, stack: stack, ctx: ctx}
 }
 
 // ListCAIds() returns an iterator that can be used to enumerate the
@@ -963,10 +1171,18 @@
 //    if fscabsi.Err() != nil {
 //      // The loop terminated early due to an error.
 //    }
-func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Iter {
+func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Stream {
 	stack := make([]dirListing, 1)
 	stack[0] = dirListing{pos: -1, nameList: []string{casDir}}
-	return &FsCasIter{fscabs: fscabs, stack: stack}
+	return &FsCasIter{fscabs: fscabs, stack: stack, ctx: ctx}
+}
+
+// isCancelled() returns whether Cancel() has been called.
+func (fscabsi *FsCasIter) isCancelled() bool {
+	fscabsi.mu.Lock()
+	cancelled := fscabsi.cancelled
+	fscabsi.mu.Unlock()
+	return cancelled
 }
 
 // Advance() stages an item so that it may be retrieved via Value.  Returns
@@ -976,7 +1192,7 @@
 	stack := fscabsi.stack
 	err := fscabsi.err
 
-	for err == nil && !advanced && len(stack) != 0 {
+	for err == nil && !advanced && len(stack) != 0 && !fscabsi.isCancelled() {
 		last := len(stack) - 1
 		stack[last].pos++
 		if stack[last].pos == len(stack[last].nameList) {
@@ -1005,6 +1221,13 @@
 		}
 	}
 
+	if fscabsi.isCancelled() {
+		if err == nil {
+			fscabsi.err = verror.New(errStreamCancelled, fscabsi.ctx)
+		}
+		advanced = false
+	}
+
 	fscabsi.err = err
 	return advanced
 }
@@ -1027,6 +1250,163 @@
 	return fscabsi.err
 }
 
+// Cancel() indicates that the iteration stream should terminate early.
+// Never blocks.  May be called concurrently with other methods on fscabsi.
+func (fscabsi *FsCasIter) Cancel() {
+	fscabsi.mu.Lock()
+	fscabsi.cancelled = true
+	fscabsi.mu.Unlock()
+}
+
+// -----------------------------------------------------------
+
+// An errorChunkStream is a localblobstore.ChunkStream that yields an error.
+type errorChunkStream struct {
+	err error
+}
+
+func (*errorChunkStream) Advance() bool       { return false }
+func (*errorChunkStream) Value([]byte) []byte { return nil }
+func (ecs *errorChunkStream) Err() error      { return ecs.err }
+func (*errorChunkStream) Cancel()             {}
+
+// BlobChunkStream() returns a ChunkStream that can be used to read the ordered
+// list of content hashes of chunks in blob blobName.  It is expected that this
+// list will be presented to RecipeFromChunks() on another device, to create a
+// recipe for transmitting the blob efficiently to that other device.
+func (fscabs *FsCaBlobStore) BlobChunkStream(ctx *context.T, blobName string) (cs localblobstore.ChunkStream) {
+	blobID := fileNameToHash(blobDir, blobName)
+	if blobID == nil {
+		cs = &errorChunkStream{err: verror.New(errInvalidBlobName, ctx, blobName)}
+	} else {
+		cs = fscabs.cm.NewChunkStream(ctx, blobID)
+	}
+	return cs
+}
+
+// -----------------------------------------------------------
+
+// LookupChunk returns the location of a chunk with the specified chunk hash
+// within the store.
+func (fscabs *FsCaBlobStore) LookupChunk(ctx *context.T, chunkHash []byte) (loc localblobstore.Location, err error) {
+	var chunkMapLoc chunkmap.Location
+	chunkMapLoc, err = fscabs.cm.LookupChunk(ctx, chunkHash)
+	if err == nil {
+		loc.BlobName = hashToFileName(blobDir, chunkMapLoc.BlobID)
+		loc.Size = chunkMapLoc.Size
+		loc.Offset = chunkMapLoc.Offset
+	}
+	return loc, err
+}
+
+// -----------------------------------------------------------
+
+// A RecipeStream implements localblobstore.RecipeStream.  It allows the client
+// to iterate over the recipe steps to recreate a blob identified by a stream
+// of chunk hashes (from chunkStream), but using parts of blobs in the current
+// blob store where possible.
+type RecipeStream struct {
+	fscabs *FsCaBlobStore
+	ctx    *context.T
+
+	chunkStream     localblobstore.ChunkStream // the stream of chunks in the blob
+	pendingChunkBuf [16]byte                   // a buffer for pendingChunk
+	pendingChunk    []byte                     // the last unprocessed chunk hash read chunkStream, or nil if none
+	step            localblobstore.RecipeStep  // the recipe step to be returned by Value()
+	mu              sync.Mutex                 // protects cancelled
+	cancelled       bool                       // whether Cancel() has been called
+}
+
+// RecipeStreamFromChunkStream() returns a pointer to a RecipeStream that allows
+// the client to iterate over each RecipeStep needed to create the blob formed
+// by the chunks in chunkStream.
+func (fscabs *FsCaBlobStore) RecipeStreamFromChunkStream(ctx *context.T, chunkStream localblobstore.ChunkStream) localblobstore.RecipeStream {
+	rs := new(RecipeStream)
+	rs.fscabs = fscabs
+	rs.ctx = ctx
+	rs.chunkStream = chunkStream
+	return rs
+}
+
+// isCancelled() returns whether rs.Cancel() has been called.
+func (rs *RecipeStream) isCancelled() bool {
+	rs.mu.Lock()
+	cancelled := rs.cancelled
+	rs.mu.Unlock()
+	return cancelled
+}
+
+// Advance() stages an item so that it may be retrieved via Value().
+// Returns true iff there is an item to retrieve.  Advance() must be
+// called before Value() is called.  The caller is expected to read
+// until Advance() returns false, or to call Cancel().
+func (rs *RecipeStream) Advance() (ok bool) {
+	if rs.pendingChunk == nil && rs.chunkStream.Advance() {
+		rs.pendingChunk = rs.chunkStream.Value(rs.pendingChunkBuf[:])
+	}
+	for !ok && rs.pendingChunk != nil && !rs.isCancelled() {
+		var err error
+		var loc0 chunkmap.Location
+		loc0, err = rs.fscabs.cm.LookupChunk(rs.ctx, rs.pendingChunk)
+		if err == nil {
+			blobName := hashToFileName(blobDir, loc0.BlobID)
+			var blobDesc *blobDesc
+			if blobDesc, err = rs.fscabs.getBlob(rs.ctx, blobName); err != nil {
+				// The ChunkMap contained a reference to a
+				// deleted blob.  Delete the reference in the
+				// ChunkMap; the next loop iteration will
+				// consider the chunk again.
+				rs.fscabs.cm.DeleteBlob(rs.ctx, loc0.BlobID)
+			} else {
+				rs.fscabs.descUnref(blobDesc)
+				// The chunk is in a known blob.  Combine
+				// contiguous chunks into a single recipe
+				// entry.
+				rs.pendingChunk = nil // consumed
+				for rs.pendingChunk == nil && rs.chunkStream.Advance() {
+					rs.pendingChunk = rs.chunkStream.Value(rs.pendingChunkBuf[:])
+					var loc chunkmap.Location
+					loc, err = rs.fscabs.cm.LookupChunk(rs.ctx, rs.pendingChunk)
+					if err == nil && bytes.Compare(loc0.BlobID, loc.BlobID) == 0 && loc.Offset == loc0.Offset+loc0.Size {
+						loc0.Size += loc.Size
+						rs.pendingChunk = nil // consumed
+					}
+				}
+				rs.step = localblobstore.RecipeStep{Blob: blobName, Offset: loc0.Offset, Size: loc0.Size}
+				ok = true
+			}
+		} else { // The chunk is not in the ChunkMap; yield a single chunk hash.
+			rs.step = localblobstore.RecipeStep{Chunk: rs.pendingChunk}
+			rs.pendingChunk = nil // consumed
+			ok = true
+		}
+	}
+	return ok && !rs.isCancelled()
+}
+
+// Value() returns the item that was staged by Advance().  May panic if
+// Advance() returned false or was not called.  Never blocks.
+func (rs *RecipeStream) Value() localblobstore.RecipeStep {
+	return rs.step
+}
+
+// Err() returns any error encountered by Advance.  Never blocks.
+func (rs *RecipeStream) Err() error {
+	// There are no errors to return here.  The errors encountered in
+	// Advance() are expected and recoverable.
+	return nil
+}
+
+// Cancel() indicates that the client wishes to cease reading from the stream.
+// It causes the next call to Advance() to return false.  Never blocks.
+// It may be called concurrently with other calls on the stream.
+func (rs *RecipeStream) Cancel() {
+	rs.mu.Lock()
+	rs.cancelled = true
+	rs.mu.Unlock()
+	rs.chunkStream.Cancel()
+}
+
 // -----------------------------------------------------------
 
 // gcTemp() attempts to delete files in dirName older than threshold.
@@ -1057,16 +1437,46 @@
 	for caIter.Advance() {
 		caSet[caIter.Value()] = true
 	}
+	err = caIter.Err()
 
-	// Remove from caSet all fragments referenced by extant blobs.
-	blobIter := fscabs.ListBlobIds(ctx)
-	for blobIter.Advance() {
-		var blobDesc *blobDesc
-		if blobDesc, err = fscabs.getBlob(ctx, blobIter.Value()); err == nil {
-			for i := range blobDesc.fragment {
-				delete(caSet, blobDesc.fragment[i].fileName)
+	// cmBlobs maps the names of blobs found in the ChunkMap to their IDs.
+	// (The IDs can be derived from the names; the map is really being used
+	// to record which blobs exist, and the value merely avoids repeated
+	// conversions.)
+	cmBlobs := make(map[string][]byte)
+	if err == nil {
+		// Record all the blobs known to the ChunkMap;
+		bs := fscabs.cm.NewBlobStream(ctx)
+		for bs.Advance() {
+			blobID := bs.Value(nil)
+			cmBlobs[hashToFileName(blobDir, blobID)] = blobID
+		}
+	}
+
+	if err == nil {
+		// Remove from cmBlobs all extant blobs, and remove from
+		// caSet all their fragments.
+		blobIter := fscabs.ListBlobIds(ctx)
+		for blobIter.Advance() {
+			var blobDesc *blobDesc
+			if blobDesc, err = fscabs.getBlob(ctx, blobIter.Value()); err == nil {
+				delete(cmBlobs, blobDesc.name)
+				for i := range blobDesc.fragment {
+					delete(caSet, blobDesc.fragment[i].fileName)
+				}
+				fscabs.descUnref(blobDesc)
 			}
-			fscabs.descUnref(blobDesc)
+		}
+	}
+
+	if err == nil {
+		// Remove all blobs still mentioned in cmBlobs from the ChunkMap;
+		// these are the ones that no longer exist in the blobs directory.
+		for _, blobID := range cmBlobs {
+			err = fscabs.cm.DeleteBlob(ctx, blobID)
+			if err != nil {
+				break
+			}
 		}
 	}
 
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
index 886f5c4..75bc54e 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
@@ -41,3 +41,31 @@
 	// Test it.
 	localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
 }
+
+// This test case tests the incremental transfer of blobs via chunks.
+func TestWritingViaChunks(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	var err error
+
+	// Make a pair of blobstores, each in its own temporary directory.
+	const nBlobStores = 2
+	var testDirName [nBlobStores]string
+	var bs [nBlobStores]localblobstore.BlobStore
+	for i := 0; i != nBlobStores; i++ {
+		testDirName[i], err = ioutil.TempDir("", "localblobstore_test")
+		if err != nil {
+			t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+		}
+		defer os.RemoveAll(testDirName[i])
+
+		bs[i], err = fs_cablobstore.Create(ctx, testDirName[i])
+		if err != nil {
+			t.Fatalf("fs_cablobstore.Create failed: %v", err)
+		}
+	}
+
+	// Test it.
+	localblobstore_testlib.WriteViaChunks(t, ctx, bs)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_test.go b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
index 973a9c8..49beec8 100644
--- a/x/ref/services/syncbase/localblobstore/localblobstore_test.go
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
@@ -41,3 +41,31 @@
 	// Test it.
 	localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
 }
+
+// This test case tests the incremental transfer of blobs via chunks.
+func TestWritingViaChunks(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	var err error
+
+	// Make a pair of blobstores, each in its own temporary directory.
+	const nBlobStores = 2
+	var testDirName [nBlobStores]string
+	var bs [nBlobStores]localblobstore.BlobStore
+	for i := 0; i != nBlobStores; i++ {
+		testDirName[i], err = ioutil.TempDir("", "localblobstore_test")
+		if err != nil {
+			t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+		}
+		defer os.RemoveAll(testDirName[i])
+
+		bs[i], err = fs_cablobstore.Create(ctx, testDirName[i])
+		if err != nil {
+			t.Fatalf("fs_cablobstore.Create failed: %v", err)
+		}
+	}
+
+	// Test it.
+	localblobstore_testlib.WriteViaChunks(t, ctx, bs)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
index 662e964..481bea1 100644
--- a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
@@ -14,6 +14,7 @@
 import "testing"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
 import "v.io/v23/context"
 import "v.io/v23/verror"
 
@@ -59,7 +60,7 @@
 	content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
 	var bw localblobstore.BlobWriter
 	var err error
-	bw, err = bs.NewBlobWriter(ctx)
+	bw, err = bs.NewBlobWriter(ctx, "")
 	if err != nil {
 		t.Errorf("localblobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
 	}
@@ -546,3 +547,276 @@
 	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
 	checkFragments(t, ctx, bs, fragmentMap, testDirName)
 }
+
+// writeBlobFromReader() writes the contents of rd to blobstore bs, as blob
+// "name", or picks a name name if "name" is empty.  It returns the name of the
+// blob.  Errors cause the test to terminate.  Error messages contain the
+// "callSite" value to allow the test to tell which call site is which.
+func writeBlobFromReader(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, name string, rd io.Reader, callSite int) string {
+	var err error
+	var bw localblobstore.BlobWriter
+	if bw, err = bs.NewBlobWriter(ctx, name); err != nil {
+		t.Fatalf("callSite %d: NewBlobWriter failed: %v", callSite, err)
+	}
+	blobName := bw.Name()
+	buf := make([]byte, 8192) // buffer for data read from rd.
+	for i := 0; err == nil; i++ {
+		var n int
+		if n, err = rd.Read(buf); err != nil && err != io.EOF {
+			t.Fatalf("callSite %d: unexpected error from reader: %v", callSite, err)
+		}
+		if n > 0 {
+			if err = bw.AppendFragment(localblobstore.BlockOrFile{Block: buf[:n]}); err != nil {
+				t.Fatalf("callSite %d: BlobWriter.AppendFragment failed: %v", callSite, err)
+			}
+			// Every so often, close without finalizing, and reopen.
+			if (i % 7) == 0 {
+				if err = bw.CloseWithoutFinalize(); err != nil {
+					t.Fatalf("callSite %d: BlobWriter.CloseWithoutFinalize failed: %v", callSite, err)
+				}
+				if bw, err = bs.ResumeBlobWriter(ctx, blobName); err != nil {
+					t.Fatalf("callSite %d: ResumeBlobWriter %q failed: %v", callSite, blobName, err)
+				}
+			}
+		}
+	}
+	if err = bw.Close(); err != nil {
+		t.Fatalf("callSite %d: BlobWriter.Close failed: %v", callSite, err)
+	}
+	return blobName
+}
+
+// checkBlobAgainstReader() verifies that the blob blobName has the same bytes as the reader rd.
+// Errors cause the test to terminate.  Error messages contain the
+// "callSite" value to allow the test to tell which call site is which.
+func checkBlobAgainstReader(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobName string, rd io.Reader, callSite int) {
+	// Open a reader on the blob.
+	var blob_rd io.Reader
+	var blob_err error
+	if blob_rd, blob_err = bs.NewBlobReader(ctx, blobName); blob_err != nil {
+		t.Fatalf("callSite %d: NewBlobReader on %q failed: %v", callSite, blobName, blob_err)
+	}
+
+	// Variables for reading the two streams, indexed by "reader" and "blob".
+	type stream struct {
+		name string
+		rd   io.Reader // Reader for this stream
+		buf  []byte    // buffer for data
+		i    int       // bytes processed within current buffer
+		n    int       // valid bytes in current buffer
+		err  error     // error, or nil
+	}
+
+	s := [2]stream{
+		{name: "reader", rd: rd, buf: make([]byte, 8192)},
+		{name: blobName, rd: blob_rd, buf: make([]byte, 8192)},
+	}
+
+	// Descriptive names for the two elements of s, when we aren't treating them the same.
+	reader := &s[0]
+	blob := &s[1]
+
+	var pos int // position within file, for error reporting.
+
+	for x := 0; x != 2; x++ {
+		s[x].n, s[x].err = s[x].rd.Read(s[x].buf)
+		s[x].i = 0
+	}
+	for blob.n != 0 && reader.n != 0 {
+		for reader.i != reader.n && blob.i != blob.n && reader.buf[reader.i] == blob.buf[blob.i] {
+			pos++
+			blob.i++
+			reader.i++
+		}
+		if reader.i != reader.n && blob.i != blob.n {
+			t.Fatalf("callSite %d: BlobStore %q: BlobReader on blob %q and rd reader generated different bytes at position %d: 0x%x vs 0x%x",
+				callSite, bs.Root(), blobName, pos, reader.buf[reader.i], blob.buf[blob.i])
+		}
+		for x := 0; x != 2; x++ { // read more data from each reader, if needed
+			if s[x].i == s[x].n {
+				s[x].i = 0
+				s[x].n = 0
+				if s[x].err == nil {
+					s[x].n, s[x].err = s[x].rd.Read(s[x].buf)
+				}
+			}
+		}
+	}
+	for x := 0; x != 2; x++ {
+		if s[x].err != io.EOF {
+			t.Fatalf("callSite %d: %s got error %v", callSite, s[x].name, s[x].err)
+		}
+		if s[x].n != 0 {
+			t.Fatalf("callSite %d: %s is longer than %s", callSite, s[x].name, s[1-x].name)
+		}
+	}
+}
+
+// checkBlobAgainstReader() verifies that the blob blobName has the same chunks
+// (according to BlobChunkStream) as a chunker applied to the reader rd.
+// Errors cause the test to terminate.  Error messages contain the
+// "callSite" value to allow the test to tell which call site is which.
+func checkBlobChunksAgainstReader(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobName string, rd io.Reader, callSite int) {
+	buf := make([]byte, 8192) // buffer used to hold data from the chunk stream from rd.
+	rawChunks := chunker.NewStream(ctx, &chunker.DefaultParam, rd)
+	cs := bs.BlobChunkStream(ctx, blobName)
+	pos := 0 // byte position within the blob, to be retported in error messages
+	i := 0   // chunk index, to be reported in error messages
+	rawMore, more := rawChunks.Advance(), cs.Advance()
+	for rawMore && more {
+		c := rawChunks.Value()
+		rawChunk := md5.Sum(rawChunks.Value())
+		chunk := cs.Value(buf)
+		if bytes.Compare(rawChunk[:], chunk) != 0 {
+			t.Errorf("raw random stream and chunk record for blob %q have different chunk %d:\n\t%v\nvs\n\t%v\n\tpos %d\n\tlen %d\n\tc %v",
+				blobName, i, rawChunk, chunk, pos, len(c), c)
+		}
+		pos += len(c)
+		i++
+		rawMore, more = rawChunks.Advance(), cs.Advance()
+	}
+	if rawMore {
+		t.Fatalf("callSite %d: blob %q has fewer chunks than raw stream", callSite, blobName)
+	}
+	if more {
+		t.Fatalf("callSite %d: blob %q has more chunks than raw stream", callSite, blobName)
+	}
+	if rawChunks.Err() != nil {
+		t.Fatalf("callSite %d: error reading raw chunk stream: %v", callSite, rawChunks.Err())
+	}
+	if cs.Err() != nil {
+		t.Fatalf("callSite %d: error reading chunk stream for blob %q; %v", callSite, blobName, cs.Err())
+	}
+}
+
+// WriteViaChunks() tests that a large blob in one blob store can be transmitted
+// to another incrementally, without transferring chunks already in the other blob store.
+func WriteViaChunks(t *testing.T, ctx *context.T, bs [2]localblobstore.BlobStore) {
+	// The original blob will be a megabyte.
+	totalLength := 1024 * 1024
+
+	// Write a random blob to bs[0], using seed 1, then check that the
+	// bytes and chunk we get from the blob just written are the same as
+	// those obtained from an identical byte stream.
+	blob0 := writeBlobFromReader(t, ctx, bs[0], "", NewRandReader(1, totalLength, 0, io.EOF), 0)
+	checkBlobAgainstReader(t, ctx, bs[0], blob0, NewRandReader(1, totalLength, 0, io.EOF), 1)
+	checkBlobChunksAgainstReader(t, ctx, bs[0], blob0, NewRandReader(1, totalLength, 0, io.EOF), 2)
+
+	// ---------------------------------------------------------------------
+	// Write into bs[1] a blob that is similar to blob0, but not identical, and check it as above.
+	insertionInterval := 20 * 1024
+	blob1 := writeBlobFromReader(t, ctx, bs[1], "", NewRandReader(1, totalLength, insertionInterval, io.EOF), 3)
+	checkBlobAgainstReader(t, ctx, bs[1], blob1, NewRandReader(1, totalLength, insertionInterval, io.EOF), 4)
+	checkBlobChunksAgainstReader(t, ctx, bs[1], blob1, NewRandReader(1, totalLength, insertionInterval, io.EOF), 5)
+
+	// ---------------------------------------------------------------------
+	// Count the number of chunks, and the number of steps in the recipe
+	// for copying blob0 from bs[0] to bs[1].  We expect the that the
+	// former to be significantly bigger than the latter, because the
+	// insertionInterval is significantly larger than the expected chunk
+	// size.
+	cs := bs[0].BlobChunkStream(ctx, blob0)          // Stream of chunks in blob0
+	rs := bs[1].RecipeStreamFromChunkStream(ctx, cs) // Recipe from bs[1]
+
+	recipeLen := 0
+	chunkCount := 0
+	for rs.Advance() {
+		step := rs.Value()
+		if step.Chunk != nil {
+			chunkCount++
+		}
+		recipeLen++
+	}
+	if rs.Err() != nil {
+		t.Fatalf("RecipeStream got error: %v", rs.Err())
+	}
+
+	cs = bs[0].BlobChunkStream(ctx, blob0) // Get the original chunk count.
+	origChunkCount := 0
+	for cs.Advance() {
+		origChunkCount++
+	}
+	if cs.Err() != nil {
+		t.Fatalf("ChunkStream got error: %v", cs.Err())
+	}
+	if origChunkCount < chunkCount*5 {
+		t.Errorf("expected fewer chunks in repipe: recipeLen %d  chunkCount %d  origChunkCount %d\n",
+			recipeLen, chunkCount, origChunkCount)
+	}
+
+	// Copy blob0 from bs[0] to bs[1], using chunks from blob1 (already in bs[1]) where possible.
+	cs = bs[0].BlobChunkStream(ctx, blob0) // Stream of chunks in blob0
+	// In a real application, at this point the stream cs would be sent to the device with bs[1].
+	rs = bs[1].RecipeStreamFromChunkStream(ctx, cs) // Recipe from bs[1]
+	// Write blob with known blob name.
+	var bw localblobstore.BlobWriter
+	var err error
+	if bw, err = bs[1].NewBlobWriter(ctx, blob0); err != nil {
+		t.Fatalf("bs[1].NewBlobWriter yields error: %v", err)
+	}
+	var br localblobstore.BlobReader
+	const maxFragment = 1024 * 1024
+	blocks := make([]localblobstore.BlockOrFile, maxFragment/chunker.DefaultParam.MinChunk)
+	for gotStep := rs.Advance(); gotStep; {
+		step := rs.Value()
+		if step.Chunk == nil {
+			// This part of the blob can be read from an existing blob locally (at bs[1]).
+			if err = bw.AppendBlob(step.Blob, step.Size, step.Offset); err != nil {
+				t.Fatalf("AppendBlob(%v) yields error: %v", step, err)
+			}
+			gotStep = rs.Advance()
+		} else {
+			var fragmentSize int64
+			// In a real application, the sequence of chunk hashes
+			// in recipe steps would be communicated back to bs[0],
+			// which then finds the associated chunks.
+			var b int
+			for b = 0; gotStep && step.Chunk != nil && fragmentSize+chunker.DefaultParam.MaxChunk < maxFragment; b++ {
+				var loc localblobstore.Location
+				if loc, err = bs[0].LookupChunk(ctx, step.Chunk); err != nil {
+					t.Fatalf("bs[0] unexpectedly does not have chunk %v", step.Chunk)
+				}
+				if br != nil && br.Name() != loc.BlobName { // Close blob if we need a different one.
+					if err = br.Close(); err != nil {
+						t.Fatalf("unexpected error in BlobReader.Close(): %v", err)
+					}
+					br = nil
+				}
+				if br == nil { // Open blob if needed.
+					if br, err = bs[0].NewBlobReader(ctx, loc.BlobName); err != nil {
+						t.Fatalf("unexpected failure to create BlobReader on %q: %v", loc.BlobName, err)
+					}
+				}
+				if loc.Size > chunker.DefaultParam.MaxChunk {
+					t.Fatalf("chunk exceeds max chunk size: %d vs %d", loc.Size, chunker.DefaultParam.MaxChunk)
+				}
+				fragmentSize += loc.Size
+				if blocks[b].Block == nil {
+					blocks[b].Block = make([]byte, chunker.DefaultParam.MaxChunk)
+				}
+				blocks[b].Block = blocks[b].Block[:loc.Size]
+				var i int
+				var n int64
+				for n = int64(0); n != loc.Size; n += int64(i) {
+					if i, err = br.ReadAt(blocks[b].Block[n:loc.Size], n+loc.Offset); err != nil && err != io.EOF {
+						t.Fatalf("ReadAt on %q failed: %v", br.Name(), err)
+					}
+				}
+				if gotStep = rs.Advance(); gotStep {
+					step = rs.Value()
+				}
+			}
+			if err = bw.AppendFragment(blocks[:b]...); err != nil {
+				t.Fatalf("AppendFragment on %q failed: %v", bw.Name(), err)
+			}
+		}
+	}
+	if err = bw.Close(); err != nil {
+		t.Fatalf("BlobWriter.Close on %q failed: %v", bw.Name(), err)
+	}
+
+	// Check that the transferred blob in bs[1] is the same as the original
+	// stream used to make the blob in bs[0].
+	checkBlobAgainstReader(t, ctx, bs[1], blob0, NewRandReader(1, totalLength, 0, io.EOF), 6)
+	checkBlobChunksAgainstReader(t, ctx, bs[1], blob0, NewRandReader(1, totalLength, 0, io.EOF), 7)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/randreader.go b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/randreader.go
new file mode 100644
index 0000000..85e32d3
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/randreader.go
@@ -0,0 +1,53 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package localblobstore_testlib
+
+import "math/rand"
+
+// A RandReader contains a pointer to a rand.Read, and a size limit.  Its
+// pointers implement the Read() method from io.Reader, which yields bytes
+// obtained from the random number generator.
+type RandReader struct {
+	rand           *rand.Rand // Source of random bytes.
+	pos            int        // Number of bytes read.
+	limit          int        // Max number of bytes that may be read.
+	insertInterval int        // If non-zero, number of bytes between insertions of zero bytes.
+	eofErr         error      // error to be returned at the end of the stream
+}
+
+// NewRandReader() returns a new RandReader with the specified seed and size limit.
+// It yields eofErr when the end of the stream is reached.
+// If insertInterval is non-zero, a zero byte is inserted into the stream every
+// insertInterval bytes, before resuming getting bytes from the random number
+// generator.
+func NewRandReader(seed int64, limit int, insertInterval int, eofErr error) *RandReader {
+	r := new(RandReader)
+	r.rand = rand.New(rand.NewSource(seed))
+	r.limit = limit
+	r.insertInterval = insertInterval
+	r.eofErr = eofErr
+	return r
+}
+
+// Read() implements the io.Reader Read() method for *RandReader.
+func (r *RandReader) Read(buf []byte) (n int, err error) {
+	// Generate bytes up to the end of the stream, or the end of the buffer.
+	max := r.limit - r.pos
+	if len(buf) < max {
+		max = len(buf)
+	}
+	for ; n != max; n++ {
+		if r.insertInterval == 0 || (r.pos%r.insertInterval) != 0 {
+			buf[n] = byte(r.rand.Int31n(256))
+		} else {
+			buf[n] = 0
+		}
+		r.pos++
+	}
+	if r.pos == r.limit {
+		err = r.eofErr
+	}
+	return n, err
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go b/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go
new file mode 100644
index 0000000..c3f11d4
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go
@@ -0,0 +1,368 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Example code for transferring a blob from one device to another.
+// See the simulateResumption constant to choose whether to simulate a full
+// transfer or a resumed one.
+package localblobstore_test
+
+import "bytes"
+import "fmt"
+import "io"
+import "io/ioutil"
+import "math/rand"
+import "os"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
+import "v.io/v23/context"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
+
+// simulateResumption tells the receiver whether to simulate having a partial
+// blob before blob transfer.
+const simulateResumption = true
+
+// createBlobStore() returns a new BlobStore, and the name of the directory
+// used to implement it.
+func createBlobStore(ctx *context.T) (bs localblobstore.BlobStore, dirName string) {
+	var err error
+	if dirName, err = ioutil.TempDir("", "localblobstore_transfer_test"); err != nil {
+		panic(err)
+	}
+	if bs, err = fs_cablobstore.Create(ctx, dirName); err != nil {
+		panic(err)
+	}
+	return bs, dirName
+}
+
+// createBlob writes a blob to bs of k 32kByte blocks drawn from a determinstic
+// but arbitrary random stream, starting at block offset within that stream.
+// Returns its name, which is "blob" if non-empty, and chosen arbitrarily otherwise.
+// The blob is finalized iff "complete" is true.
+func createBlob(ctx *context.T, bs localblobstore.BlobStore, blob string, complete bool, offset int, count int) string {
+	var bw localblobstore.BlobWriter
+	var err error
+	if bw, err = bs.NewBlobWriter(ctx, blob); err != nil {
+		panic(err)
+	}
+	blob = bw.Name()
+	var buffer [32 * 1024]byte
+	block := localblobstore.BlockOrFile{Block: buffer[:]}
+	r := rand.New(rand.NewSource(1)) // Always seed with 1 for repeatability.
+	for i := 0; i != offset+count; i++ {
+		for b := 0; b != len(buffer); b++ {
+			buffer[b] = byte(r.Int31n(256))
+		}
+		if i >= offset {
+			if err = bw.AppendFragment(block); err != nil {
+				panic(err)
+			}
+		}
+	}
+	if complete {
+		err = bw.Close()
+	} else {
+		err = bw.CloseWithoutFinalize()
+	}
+	if err != nil {
+		panic(err)
+	}
+	return blob
+}
+
+// A channelChunkStream turns a channel of chunk hashes into a ChunkStream.
+type channelChunkStream struct {
+	channel <-chan []byte
+	ok      bool
+	value   []byte
+}
+
+// newChannelChunkStream returns a ChunkStream, given a channel containing the
+// relevant chunk hashes.
+func newChannelChunkStream(ch <-chan []byte) localblobstore.ChunkStream {
+	return &channelChunkStream{channel: ch, ok: true}
+}
+
+// The following are the standard ChunkStream methods.
+func (cs *channelChunkStream) Advance() bool {
+	if cs.ok {
+		cs.value, cs.ok = <-cs.channel
+	}
+	return cs.ok
+}
+func (cs *channelChunkStream) Value(buf []byte) []byte { return cs.value }
+func (cs *channelChunkStream) Err() error              { return nil }
+func (cs *channelChunkStream) Cancel()                 {}
+
+// Example_blobTransfer() demonstrates how to transfer a blob incrementally
+// from one device's blob store to another.  In this code, the communication
+// between sender and receiver is modelled with Go channels.
+func Example_blobTransfer() {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	// ----------------------------------------------
+	// Channels used to send chunk hashes to receiver always end in
+	// ToSender or ToReceiver.
+	type blobData struct {
+		name     string
+		size     int64
+		checksum []byte
+	}
+	blobDataToReceiver := make(chan blobData)  // indicate basic data for blob
+	needChunksToSender := make(chan bool)      // indicate receiver does not have entire blob
+	chunkHashesToReceiver := make(chan []byte) // for initial trasfer of chunk hashes
+	chunkHashesToSender := make(chan []byte)   // to report which chunks receiver needs
+	chunksToReceiver := make(chan []byte)      // to report which chunks receiver needs
+
+	sDone := make(chan bool) // closed when sender done
+	rDone := make(chan bool) // closed when receiver done
+
+	// ----------------------------------------------
+	// The sender.
+	go func(ctx *context.T,
+		blobDataToReceiver chan<- blobData,
+		needChunksToSender <-chan bool,
+		chunkHashesToReceiver chan<- []byte,
+		chunkHashesToSender <-chan []byte,
+		chunksToReceiver chan<- []byte,
+		done chan<- bool) {
+
+		defer close(done)
+		var err error
+
+		bsS, bsSDir := createBlobStore(ctx)
+		defer os.RemoveAll(bsSDir)
+
+		blob := createBlob(ctx, bsS, "", true, 0, 32) // Create a 1M blob at the sender.
+
+		// 1. Send basic blob data to receiver.
+		var br localblobstore.BlobReader
+		if br, err = bsS.NewBlobReader(ctx, blob); err != nil {
+			panic(err)
+		}
+		blobDataToReceiver <- blobData{name: blob, size: br.Size(), checksum: br.Hash()}
+		br.Close()
+		close(blobDataToReceiver)
+
+		// 3. Get indication from receiver of whether it needs blob.
+		needChunks := <-needChunksToSender
+
+		if !needChunks { // Receiver has blob; done.
+			return
+		}
+
+		// 4. Send the chunk hashes to the receiver.  This proceeds concurrently
+		//    with the step below.
+		go func(ctx *context.T, blob string, chunkHashesToReceiver chan<- []byte) {
+			cs := bsS.BlobChunkStream(ctx, blob)
+			for cs.Advance() {
+				chunkHashesToReceiver <- cs.Value(nil)
+			}
+			if cs.Err() != nil {
+				panic(cs.Err())
+			}
+			close(chunkHashesToReceiver)
+		}(ctx, blob, chunkHashesToReceiver)
+
+		// 7. Get needed chunk hashes from receiver, find the relevant
+		//    data, and send it back to the receiver.
+		var cbr localblobstore.BlobReader // Cached read handle on most-recent-read blob, or nil
+		// Given chunk hash h from chunkHashesToSender, send chunk to chunksToReceiver.
+		for h := range chunkHashesToSender {
+			loc, err := bsS.LookupChunk(ctx, h)
+			for err == nil && (cbr == nil || cbr.Name() != loc.BlobName) {
+				if cbr != nil && cbr.Name() != loc.BlobName {
+					cbr.Close()
+					cbr = nil
+				}
+				if cbr == nil {
+					if cbr, err = bsS.NewBlobReader(ctx, loc.BlobName); err != nil {
+						bsS.GC(ctx) // A partially-deleted blob may be confusing things.
+						loc, err = bsS.LookupChunk(ctx, h)
+					}
+				}
+			}
+			var i int = 1
+			var n int64
+			buffer := make([]byte, loc.Size) // buffer for current chunk
+			for n = int64(0); n != loc.Size && i != 0 && err == nil; n += int64(i) {
+				if i, err = cbr.ReadAt(buffer[n:loc.Size], n+loc.Offset); err == io.EOF {
+					err = nil // EOF is expected
+				}
+			}
+			if n == loc.Size { // Got chunk.
+				chunksToReceiver <- buffer[:loc.Size]
+			}
+			if err != nil {
+				break
+			}
+		}
+		close(chunksToReceiver)
+		if cbr != nil {
+			cbr.Close()
+		}
+
+	}(ctx, blobDataToReceiver, needChunksToSender, chunkHashesToReceiver, chunkHashesToSender, chunksToReceiver, sDone)
+
+	// ----------------------------------------------
+	// The receiver.
+	go func(ctx *context.T,
+		blobDataToReceiver <-chan blobData,
+		needChunksToSender chan<- bool,
+		chunkHashesToReceiver <-chan []byte,
+		chunkHashesToSender chan<- []byte,
+		chunksToReceiver <-chan []byte,
+		done chan<- bool) {
+
+		defer close(done)
+		var err error
+
+		bsR, bsRDir := createBlobStore(ctx)
+		defer os.RemoveAll(bsRDir)
+
+		// 2. Receive basic blob data from sender.
+		blobInfo := <-blobDataToReceiver
+
+		if simulateResumption {
+			// Write a fraction of the (unfinalized) blob on the receiving side
+			// to check that the transfer process can resume a partial blob.
+			createBlob(ctx, bsR, blobInfo.name, false, 0, 10)
+		}
+
+		// 3. Tell sender whether the recevier already has the complete
+		//    blob.
+		needChunks := true
+		var br localblobstore.BlobReader
+		if br, err = bsR.NewBlobReader(ctx, blobInfo.name); err == nil {
+			if br.IsFinalized() {
+				if len(br.Hash()) == len(blobInfo.checksum) && bytes.Compare(br.Hash(), blobInfo.checksum) != 0 {
+					panic("receiver has a finalized blob with same name but different hash")
+				}
+				needChunks = false // The receiver already has the blob.
+			}
+			br.Close()
+		}
+		needChunksToSender <- needChunks
+		close(needChunksToSender)
+
+		if !needChunks { // Receiver has blob; done.
+			return
+		}
+
+		// 5. Receive the chunk hashes from the sender, and turn them
+		//    into a recipe.
+		cs := newChannelChunkStream(chunkHashesToReceiver)
+		rs := bsR.RecipeStreamFromChunkStream(ctx, cs)
+
+		// 6. The following thread sends the chunk hashes that the
+		//    receiver does not have are to the sender.  It also makes
+		//    a duplicate of the stream on the channel rsCopy.  The
+		//    buffering in rsCopy allows the receiver to put several
+		//    chunks into a fragment.
+		rsCopy := make(chan localblobstore.RecipeStep, 100) // A buffered copy of the rs stream.
+		go func(ctx *context.T, rs localblobstore.RecipeStream, rsCopy chan<- localblobstore.RecipeStep, chunkHashesToSender chan<- []byte) {
+			for rs.Advance() {
+
+				step := rs.Value()
+				if step.Chunk != nil { // Data must be fetched from sender.
+					chunkHashesToSender <- step.Chunk
+				}
+				rsCopy <- step
+			}
+			close(chunkHashesToSender)
+			close(rsCopy)
+		}(ctx, rs, rsCopy, chunkHashesToSender)
+
+		// 8. The following thread splices the chunks from the sender
+		//    (on chunksToReceiver) into the recipe stream copy
+		//    (rsCopy) to generate a full recipe stream (rsFull) in
+		//    which chunks are actual data, rather than just hashes.
+		rsFull := make(chan localblobstore.RecipeStep) // A recipe stream containing chunk data, not just hashes.
+		go func(ctx *context.T, rsCopy <-chan localblobstore.RecipeStep, chunksToReceiver <-chan []byte, rsFull chan<- localblobstore.RecipeStep) {
+			var ok bool
+			for step := range rsCopy {
+				if step.Chunk != nil { // Data must be fetched from sender.
+					if step.Chunk, ok = <-chunksToReceiver; !ok {
+						break
+					}
+				}
+				rsFull <- step
+			}
+			close(rsFull)
+		}(ctx, rsCopy, chunksToReceiver, rsFull)
+
+		// 9. Write the blob using the recipe.
+		var chunksTransferred int
+		const fragmentThreshold = 1024 * 1024 // Try to write on-disc fragments fragments at least this big.
+		var ignoreBytes int64
+		var bw localblobstore.BlobWriter
+		if bw, err = bsR.ResumeBlobWriter(ctx, blobInfo.name); err != nil {
+			bw, err = bsR.NewBlobWriter(ctx, blobInfo.name)
+		} else {
+			ignoreBytes = bw.Size()
+		}
+		if err == nil {
+			var fragment []localblobstore.BlockOrFile
+			var fragmentSize int64
+			for step := range rsFull {
+				if step.Chunk == nil { // Data can be obtained from local blob.
+					if ignoreBytes >= step.Size { // Ignore chunks we already have.
+						ignoreBytes -= step.Size
+					} else {
+						err = bw.AppendBlob(step.Blob, step.Size-ignoreBytes, step.Offset+ignoreBytes)
+						ignoreBytes = 0
+					}
+				} else if ignoreBytes >= int64(len(step.Chunk)) { // Ignoer chunks we already have.
+					ignoreBytes -= int64(len(step.Chunk))
+				} else { // Data is from a chunk send by the sender.
+					chunksTransferred++
+					fragment = append(fragment, localblobstore.BlockOrFile{Block: step.Chunk[ignoreBytes:]})
+					fragmentSize += int64(len(step.Chunk)) - ignoreBytes
+					ignoreBytes = 0
+					if fragmentSize > fragmentThreshold {
+						err = bw.AppendFragment(fragment...)
+						fragment = fragment[:0]
+						fragmentSize = 0
+					}
+				}
+				if err != nil {
+					break
+				}
+			}
+			if err == nil && len(fragment) != 0 {
+				err = bw.AppendFragment(fragment...)
+			}
+			if err2 := bw.Close(); err == nil {
+				err = err2
+			}
+			if err != nil {
+				panic(err)
+			}
+		}
+
+		// 10. Verify that the blob was written correctly.
+		if br, err = bsR.NewBlobReader(ctx, blobInfo.name); err != nil {
+			panic(err)
+		}
+		if br.Size() != blobInfo.size {
+			panic("transferred blob has wrong size")
+		}
+		if len(br.Hash()) != len(blobInfo.checksum) || bytes.Compare(br.Hash(), blobInfo.checksum) != 0 {
+			panic("transferred blob has wrong checksum")
+		}
+		if err = br.Close(); err != nil {
+			panic(err)
+		}
+		fmt.Printf("%d chunks transferred\n", chunksTransferred)
+	}(ctx, blobDataToReceiver, needChunksToSender, chunkHashesToReceiver, chunkHashesToSender, chunksToReceiver, rDone)
+
+	// ----------------------------------------------
+	// Wait for sender and receiver to finish.
+	_ = <-sDone
+	_ = <-rDone
+
+	// Output: 635 chunks transferred
+}
diff --git a/x/ref/services/syncbase/localblobstore/model.go b/x/ref/services/syncbase/localblobstore/model.go
index b1a4565..16af4cf 100644
--- a/x/ref/services/syncbase/localblobstore/model.go
+++ b/x/ref/services/syncbase/localblobstore/model.go
@@ -4,6 +4,50 @@
 
 // Package localblobstore is the interface to a local blob store.
 // Implementations include fs_cablobstore.
+//
+// Expected use
+// ============
+// These examples assume that bs, bsS (sender) and bsR (receiver) are blobstores.
+//
+// Writing blobs
+//      bw, err := bs.NewBlobWriter(ctx, "")  // For a new blob, implementation picks blob name.
+//      if err == nil {
+//		blobName := bw.Name()  // Get name the implementation picked.
+//   	  	... use bw.AppendFragment() to append data to the blob...
+//   	  	... and/or bw.AppendBlob() to append data that's in another existing blob...
+//        	err = bw.Close()
+//   	}
+//
+// Resume writing a blob that was partially written due to a crash (not yet finalized).
+//	bw, err := bs.ResumeBlobWriter(ctx, name)
+//	if err == nil {
+//		size := bw.Size() // The store has this many bytes from the blob.
+//		... write the remaining data using bwAppendFragment() and/or bw.AppendBlob()...
+//		err = bw.Close()
+//	}
+//
+// Reading blobs
+//	br, err := bs.NewBlobReader(ctx, name)
+//	if err == nil {
+//		... read bytes with br.ReadAt() or br.Read(), perhas with br.Seek()...
+//		err = br.Close()
+//	}
+//
+// Transferring blobs from one store to another:
+// See example in localblobstore_transfer_test.go
+// Summary:
+// - The sender sends the chunksum of the blob from BlobReader's Hash().
+// - The receiver checks whether it already has the blob, with the same
+//   checksum.
+// - If the receiver does not have the blob, the sender sends the list of chunk
+//   hashes in the blob using BlobChunkStream().
+// - The receiver uses RecipeStreamFromChunkStream() with the chunk hash stream
+//   from the sender, and tells the sender the chunk hashes of the chunks it
+//   needs.
+// - The sender uses LookupChunk() to find the data for each chunk the receiver
+//   needs, and sends it to the receiver.
+// - The receiver applies the recipe steps, with the actual chunkj data from
+//   the sender and its own local data.
 package localblobstore
 
 import "v.io/v23/context"
@@ -17,12 +61,16 @@
 	NewBlobReader(ctx *context.T, blobName string) (br BlobReader, err error)
 
 	// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
-	// a newly created blob name, which can be found using the Name()
-	// method.  BlobWriters should not be used concurrently by multiple
-	// threads.  The returned handle should be closed with either the
-	// Close() or CloseWithoutFinalize() method to avoid leaking file
-	// handles.
-	NewBlobWriter(ctx *context.T) (bw BlobWriter, err error)
+	// a newly created blob.  If "name" is non-empty, its is used to name
+	// the blob, and it must be in the format of a name returned by this
+	// interface (probably by another instance on another device).
+	// Otherwise, otherwise a new name is created, which can be found using
+	// the Name() method.  It is an error to attempt to overwrite a blob
+	// that already exists in this blob store.  BlobWriters should not be
+	// used concurrently by multiple threads.  The returned handle should
+	// be closed with either the Close() or CloseWithoutFinalize() method
+	// to avoid leaking file handles.
+	NewBlobWriter(ctx *context.T, name string) (bw BlobWriter, err error)
 
 	// ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on
 	// an old, but unfinalized blob name.
@@ -36,6 +84,31 @@
 	// other calls to GC(), and with uses of BlobReaders and BlobWriters.
 	GC(ctx *context.T) error
 
+	// BlobChunkStream() returns a ChunkStream that can be used to read the
+	// ordered list of content hashes of chunks in blob blobName.  It is
+	// expected that this list will be presented to
+	// RecipeStreamFromChunkStream() on another device, to create a recipe
+	// for transmitting the blob efficiently to that other device.
+	BlobChunkStream(ctx *context.T, blobName string) ChunkStream
+
+	// RecipeStreamFromChunkStream() returns a pointer to a RecipeStream
+	// that allows the client to iterate over each RecipeStep needed to
+	// create the blob formed by the chunks in chunkStream.  It is expected
+	// that this will be called on a receiving device, and be given a
+	// ChunkStream from a sending device, to yield a recipe for efficient
+	// chunk transfer.  RecipeStep values with non-nil Chunk fields need
+	// the chunk from the sender; once the data is returned is can be
+	// written with BlobWriter.AppendFragment().  Those with blob
+	// references can be written locally with BlobWriter.AppendBlob().
+	RecipeStreamFromChunkStream(ctx *context.T, chunkStream ChunkStream) RecipeStream
+
+	// LookupChunk() returns the location of a chunk with the specified chunk
+	// hash within the store.  It is expected that chunk hashes from
+	// RecipeStep entries from RecipeStreamFromChunkStream() will be mapped
+	// to blob Location values on the sender for transmission to the
+	// receiver.
+	LookupChunk(ctx *context.T, chunkHash []byte) (loc Location, err error)
+
 	// ListBlobIds() returns an iterator that can be used to enumerate the
 	// blobs in a BlobStore.  Expected use is:
 	//
@@ -46,7 +119,7 @@
 	//	if iter.Err() != nil {
 	//	  // The loop terminated early due to an error.
 	//	}
-	ListBlobIds(ctx *context.T) (iter Iter)
+	ListBlobIds(ctx *context.T) (iter Stream)
 
 	// ListCAIds() returns an iterator that can be used to enumerate the
 	// content-addressable fragments in a BlobStore.  Expected use is:
@@ -58,12 +131,20 @@
 	//	if iter.Err() != nil {
 	//	  // The loop terminated early due to an error.
 	//	}
-	ListCAIds(ctx *context.T) (iter Iter)
+	ListCAIds(ctx *context.T) (iter Stream)
 
 	// Root() returns the name of the root directory where the BlobStore is stored.
 	Root() string
 }
 
+// A Location describes chunk's location within a blob.  It is returned by
+// BlobStore.LookupChunk().
+type Location struct {
+	BlobName string // name of blob
+	Offset   int64  // byte offset of chunk within blob
+	Size     int64  // size of chunk
+}
+
 // A BlobReader allows a blob to be read using the standard ReadAt(), Read(),
 // and Seek() calls.  A BlobReader can be created with NewBlobReader(), and
 // should be closed with the Close() method to avoid leaking file handles.
@@ -153,18 +234,67 @@
 	Hash() []byte
 }
 
-// A Iter represents an iterator that allows the client to enumerate
-// all the blobs of fragments in a BlobStore.
-type Iter interface {
-	// Advance() stages an item so that it may be retrieved via Value.
-	// Returns true iff there is an item to retrieve.  Advance must be
-	// called before Value is called.
-	Advance() (advanced bool)
+// A Stream represents an iterator that allows the client to enumerate
+// all the blobs or fragments in a BlobStore.
+//
+// The interfaces Stream, ChunkStream, RecipeStream all have four calls,
+// and differ only in the Value() call.
+type Stream interface {
+	// Advance() stages an item so that it may be retrieved via Value().
+	// Returns true iff there is an item to retrieve.  Advance() must be
+	// called before Value() is called.  The caller is expected to read
+	// until Advance() returns false, or to call Cancel().
+	Advance() bool
 
-	// Value() returns the item that was staged by Advance.  May panic if
-	// Advance returned false or was not called.  Never blocks.
+	// Value() returns the item that was staged by Advance().  May panic if
+	// Advance() returned false or was not called.  Never blocks.
 	Value() (name string)
 
 	// Err() returns any error encountered by Advance.  Never blocks.
 	Err() error
+
+	// Cancel() indicates that the client wishes to cease reading from the stream.
+	// It causes the next call to Advance() to return false.  Never blocks.
+	// It may be called concurrently with other calls on the stream.
+	Cancel()
+}
+
+// A ChunkStream represents an iterator that allows the client to enumerate
+// the chunks in a blob.   See the comments for Stream for usage.
+type ChunkStream interface {
+	Advance() bool
+
+	// Value() returns the chunkHash that was staged by Advance().  May
+	// panic if Advance() returned false or was not called.  Never blocks.
+	// The result may share storage with buf[] if it is large enough;
+	// otherwise, a new buffer is allocated.  It is legal to call with
+	// buf==nil.
+	Value(buf []byte) (chunkHash []byte)
+
+	Err() error
+	Cancel()
+}
+
+// A RecipeStep describes one piece of a recipe for making a blob.
+// The step consists either of appending the chunk with content hash Chunk and size Size,
+// or (if Chunk==nil) the Size bytes from Blob, starting at Offset.
+type RecipeStep struct {
+	Chunk  []byte
+	Blob   string
+	Size   int64
+	Offset int64
+}
+
+// A RecipeStream represents an iterator that allows the client to obtain the
+// steps needed to construct a blob with a given ChunkStream, attempting to
+// reuse data in existing blobs.  See the comments for Stream for usage.
+type RecipeStream interface {
+	Advance() bool
+
+	// Value() returns the RecipeStep that was staged by Advance().  May panic if
+	// Advance() returned false or was not called.  Never blocks.
+	Value() RecipeStep
+
+	Err() error
+	Cancel()
 }