Merge "syncbase/vsync: * Integration test for bi-directional syncing. * Made client/servers independent principals in integration tests."
diff --git a/x/ref/services/syncbase/localblobstore/chunker/chunker.go b/x/ref/services/syncbase/localblobstore/chunker/chunker.go
index 84abf6f..cb07533 100644
--- a/x/ref/services/syncbase/localblobstore/chunker/chunker.go
+++ b/x/ref/services/syncbase/localblobstore/chunker/chunker.go
@@ -30,8 +30,17 @@
 // http://www.hpl.hp.com/techreports/2005/HPL-2005-30R1.pdf
 
 import "io"
+import "sync"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore/crc64window"
+import "v.io/v23/context"
+import "v.io/v23/verror"
+
+const pkgPath = "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
+
+var (
+	errStreamCancelled = verror.Register(pkgPath+".errStreamCancelled", verror.NoRetry, "{1:}{2:} Advance() called on cancelled stream{:_}")
+)
 
 // A Param contains the parameters for chunking.
 //
@@ -69,10 +78,13 @@
 // stream.
 type Stream struct {
 	param        Param               // chunking parameters
+	ctx          *context.T          // context of creator
 	window       *crc64window.Window // sliding window for computing the hash
 	buf          []byte              // buffer of data
 	rd           io.Reader           // source of data
 	err          error               // error from rd
+	mu           sync.Mutex          // protects cancelled
+	cancelled    bool                // whether the stream has been cancelled
 	bufferChunks bool                // whether to buffer entire chunks
 	// Invariant:  bufStart <= chunkStart <= chunkEnd <= bufEnd
 	bufStart   int64  // offset in rd of first byte in buf[]
@@ -86,9 +98,10 @@
 // newStream() returns a pointer to a new Stream instance, with the
 // parameters in *param.  This internal version of NewStream() allows the caller
 // to specify via bufferChunks whether entire chunks should be buffered.
-func newStream(param *Param, rd io.Reader, bufferChunks bool) *Stream {
+func newStream(ctx *context.T, param *Param, rd io.Reader, bufferChunks bool) *Stream {
 	s := new(Stream)
 	s.param = *param
+	s.ctx = ctx
 	s.window = crc64window.New(crc64window.ECMA, s.param.WindowWidth)
 	bufSize := int64(8192)
 	if bufferChunks {
@@ -107,8 +120,16 @@
 
 // NewStream() returns a pointer to a new Stream instance, with the
 // parameters in *param.
-func NewStream(param *Param, rd io.Reader) *Stream {
-	return newStream(param, rd, true)
+func NewStream(ctx *context.T, param *Param, rd io.Reader) *Stream {
+	return newStream(ctx, param, rd, true)
+}
+
+// isCancelled() returns whether s.Cancel() has been called.
+func (s *Stream) isCancelled() (cancelled bool) {
+	s.mu.Lock()
+	cancelled = s.cancelled
+	s.mu.Unlock()
+	return cancelled
 }
 
 // Advance() stages the next chunk so that it may be retrieved via Value().
@@ -132,7 +153,7 @@
 			s.bufStart = s.chunkEnd
 		}
 		// Fill buffer with data, unless error/EOF.
-		for s.err == nil && s.bufEnd < s.bufStart+int64(len(s.buf)) {
+		for s.err == nil && s.bufEnd < s.bufStart+int64(len(s.buf)) && !s.isCancelled() {
 			var n int
 			n, s.err = s.rd.Read(s.buf[s.bufEnd-s.bufStart:])
 			s.bufEnd += int64(n)
@@ -148,7 +169,7 @@
 	// While not end of chunk...
 	for s.windowEnd != maxChunk &&
 		(s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) &&
-		(s.windowEnd != s.bufEnd || s.err == nil) {
+		(s.windowEnd != s.bufEnd || s.err == nil) && !s.isCancelled() {
 
 		// Fill the buffer if empty, and there's more data to read.
 		if s.windowEnd == s.bufEnd && s.err == nil {
@@ -167,7 +188,10 @@
 			bufLimit = s.bufEnd
 		}
 		// Advance window until both MinChunk reached and primary boundary found.
-		for s.windowEnd != bufLimit && (s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) {
+		for s.windowEnd != bufLimit &&
+			(s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) &&
+			!s.isCancelled() {
+
 			// Advance the window by one byte.
 			s.hash = s.window.Advance(s.buf[s.windowEnd-s.bufStart])
 			s.windowEnd++
@@ -185,7 +209,7 @@
 		s.chunkEnd = s.windowEnd
 	}
 
-	return s.chunkStart != s.chunkEnd // We have a non-empty chunk to return.
+	return !s.isCancelled() && s.chunkStart != s.chunkEnd // We have a non-empty chunk to return.
 }
 
 // Value() returns the chunk that was staged by Advance().  May panic if
@@ -196,12 +220,26 @@
 
 // Err() returns any error encountered by Advance().  Never blocks.
 func (s *Stream) Err() (err error) {
+	s.mu.Lock()
+	if s.cancelled && (s.err == nil || s.err == io.EOF) {
+		s.err = verror.New(errStreamCancelled, s.ctx)
+	}
+	s.mu.Unlock()
 	if s.err != io.EOF { // Do not consider EOF to be an error.
 		err = s.err
 	}
 	return err
 }
 
+// Cancel() causes the next call to Advance() to return false.
+// It should be used when the client does not wish to iterate to the end of the stream.
+// Never blocks.  May be called concurrently with other method calls on s.
+func (s *Stream) Cancel() {
+	s.mu.Lock()
+	s.cancelled = true
+	s.mu.Unlock()
+}
+
 // ----------------------------------
 
 // A PosStream is just like a Stream, except that the Value() method returns only
@@ -214,9 +252,9 @@
 
 // NewPosStream() returns a pointer to a new PosStream instance, with the
 // parameters in *param.
-func NewPosStream(param *Param, rd io.Reader) *PosStream {
+func NewPosStream(ctx *context.T, param *Param, rd io.Reader) *PosStream {
 	ps := new(PosStream)
-	ps.s = newStream(param, rd, false)
+	ps.s = newStream(ctx, param, rd, false)
 	return ps
 }
 
@@ -237,3 +275,10 @@
 func (ps *PosStream) Err() error {
 	return ps.s.Err()
 }
+
+// Cancel() causes the next call to Advance() to return false.
+// It should be used when the client does not wish to iterate to the end of the stream.
+// Never blocks.  May be called concurrently with other method calls on ps.
+func (ps *PosStream) Cancel() {
+	ps.s.Cancel()
+}
diff --git a/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go b/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go
index 6de3304..57c6fed 100644
--- a/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go
+++ b/x/ref/services/syncbase/localblobstore/chunker/chunker_test.go
@@ -9,68 +9,28 @@
 import "crypto/md5"
 import "fmt"
 import "io"
-import "math/rand"
 import "testing"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
-
-// A RandReader contains a pointer to a rand.Read, and a size limit.  Its
-// pointers implement the Read() method from io.Reader, which yields bytes
-// obtained from the random number generator.
-type RandReader struct {
-	rand           *rand.Rand // Source of random bytes.
-	pos            int        // Number of bytes read.
-	limit          int        // Max number of bytes that may be read.
-	insertInterval int        // If non-zero, number of bytes between insertions of zero bytes.
-	eofErr         error      // error to be returned at the end of the stream
-}
-
-// NewRandReader() returns a new RandReader with the specified seed and size limit.
-// It yields eofErr when the end of the stream is reached.
-// If insertInterval is non-zero, a zero byte is inserted into the stream every
-// insertInterval bytes, before resuming getting bytes from the random number
-// generator.
-func NewRandReader(seed int64, limit int, insertInterval int, eofErr error) *RandReader {
-	r := new(RandReader)
-	r.rand = rand.New(rand.NewSource(seed))
-	r.limit = limit
-	r.insertInterval = insertInterval
-	r.eofErr = eofErr
-	return r
-}
-
-// Read() implements the io.Reader Read() method for *RandReader.
-func (r *RandReader) Read(buf []byte) (n int, err error) {
-	// Generate bytes up to the end of the stream, or the end of the buffer.
-	max := r.limit - r.pos
-	if len(buf) < max {
-		max = len(buf)
-	}
-	for ; n != max; n++ {
-		if r.insertInterval == 0 || (r.pos%r.insertInterval) != 0 {
-			buf[n] = byte(r.rand.Int31n(256))
-		} else {
-			buf[n] = 0
-		}
-		r.pos++
-	}
-	if r.pos == r.limit {
-		err = r.eofErr
-	}
-	return n, err
-}
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
+import "v.io/v23/context"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
 
 // TestChunksPartitionStream() tests that the chunker partitions its input
 // stream into reasonable sized chunks, which when concatenated form the
 // original stream.
 func TestChunksPartitionStream(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
 	var err error
 	totalLength := 1024 * 1024
 
 	// Compute the md5 of an arbiotrary stream.  We will later compare this
 	// with the md5 of the concanenation of chunks from an equivalent
 	// stream.
-	r := NewRandReader(1, totalLength, 0, io.EOF)
+	r := localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF)
 	hStream := md5.New()
 	buf := make([]byte, 8192)
 	for err == nil {
@@ -81,12 +41,12 @@
 	checksumStream := hStream.Sum(nil)
 
 	// Using an equivalent stream, break it into chunks.
-	r = NewRandReader(1, totalLength, 0, io.EOF)
+	r = localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF)
 	param := &chunker.DefaultParam
 	hChunked := md5.New()
 
 	length := 0
-	s := chunker.NewStream(param, r)
+	s := chunker.NewStream(ctx, param, r)
 	for s.Advance() {
 		chunk := s.Value()
 		length += len(chunk)
@@ -116,10 +76,15 @@
 
 // TestPosStream() tests that a PosStream leads to the same chunks as an Stream.
 func TestPosStream(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
 	totalLength := 1024 * 1024
 
-	s := chunker.NewStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
-	ps := chunker.NewPosStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
+	s := chunker.NewStream(ctx, &chunker.DefaultParam,
+		localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF))
+	ps := chunker.NewPosStream(ctx, &chunker.DefaultParam,
+		localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF))
 
 	itReady := s.Advance()
 	pitReady := ps.Advance()
@@ -150,8 +115,8 @@
 
 // chunkSums() returns a vector of md5 checksums for the chunks of the
 // specified Reader, using the default chunking parameters.
-func chunkSums(r io.Reader) (sums [][md5.Size]byte) {
-	s := chunker.NewStream(&chunker.DefaultParam, r)
+func chunkSums(ctx *context.T, r io.Reader) (sums [][md5.Size]byte) {
+	s := chunker.NewStream(ctx, &chunker.DefaultParam, r)
 	for s.Advance() {
 		sums = append(sums, md5.Sum(s.Value()))
 	}
@@ -161,14 +126,17 @@
 // TestInsertions() tests the how chunk sequences differ when bytes are
 // periodically inserted into a stream.
 func TestInsertions(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
 	totalLength := 1024 * 1024
 	insertionInterval := 20 * 1024
 	bytesInserted := totalLength / insertionInterval
 
 	// Get the md5 sums of the chunks of two similar streams, where the
 	// second has an extra bytes every 20k bytes.
-	sums0 := chunkSums(NewRandReader(1, totalLength, 0, io.EOF))
-	sums1 := chunkSums(NewRandReader(1, totalLength, insertionInterval, io.EOF))
+	sums0 := chunkSums(ctx, localblobstore_testlib.NewRandReader(1, totalLength, 0, io.EOF))
+	sums1 := chunkSums(ctx, localblobstore_testlib.NewRandReader(1, totalLength, insertionInterval, io.EOF))
 
 	// Iterate over chunks of second stream, counting which are in common
 	// with first stream.  We expect to find common chunks within 10 of the
@@ -208,10 +176,13 @@
 // TestError() tests the behaviour of a chunker when given an error by its
 // reader.
 func TestError(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
 	notEOF := fmt.Errorf("not EOF")
 	totalLength := 50 * 1024
-	r := NewRandReader(1, totalLength, 0, notEOF)
-	s := chunker.NewStream(&chunker.DefaultParam, r)
+	r := localblobstore_testlib.NewRandReader(1, totalLength, 0, notEOF)
+	s := chunker.NewStream(ctx, &chunker.DefaultParam, r)
 	length := 0
 	for s.Advance() {
 		chunk := s.Value()
diff --git a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
index f08b796..a13cf9f 100644
--- a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
+++ b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
@@ -8,6 +8,7 @@
 package chunkmap
 
 import "encoding/binary"
+import "sync"
 
 import "v.io/syncbase/x/ref/services/syncbase/store"
 import "v.io/syncbase/x/ref/services/syncbase/store/leveldb"
@@ -75,7 +76,7 @@
 
 // A Location describes chunk's location within a blob.
 type Location struct {
-	Blob   []byte // ID of blob
+	BlobID []byte // ID of blob
 	Offset int64  // byte offset of chunk within blob
 	Size   int64  // size of chunk
 }
@@ -104,8 +105,8 @@
 // associated with the specified Location.
 func (cm *ChunkMap) AssociateChunkWithLocation(ctx *context.T, chunk []byte, loc Location) (err error) {
 	// Check of expected lengths explicitly in routines that modify the database.
-	if len(loc.Blob) != blobIDLen {
-		err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(loc.Blob), blobIDLen)
+	if len(loc.BlobID) != blobIDLen {
+		err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(loc.BlobID), blobIDLen)
 	} else if len(chunk) != chunkHashLen {
 		err = verror.New(errBadChunkHashLen, ctx, cm.dir, len(chunk), chunkHashLen)
 	} else {
@@ -115,7 +116,7 @@
 		// Put the blob-to-chunk entry first, since it's used
 		// to garbage collect the other.
 		keyLen := copy(key[:], blobPrefix)
-		keyLen += copy(key[keyLen:], loc.Blob)
+		keyLen += copy(key[keyLen:], loc.BlobID)
 		binary.BigEndian.PutUint64(key[keyLen:], uint64(loc.Offset))
 		keyLen += offsetLen
 
@@ -126,7 +127,7 @@
 		if err == nil {
 			keyLen = copy(key[:], chunkPrefix)
 			keyLen += copy(key[keyLen:], chunk)
-			keyLen += copy(key[keyLen:], loc.Blob)
+			keyLen += copy(key[keyLen:], loc.BlobID)
 
 			valLen = binary.PutVarint(val[:], loc.Offset)
 			valLen += binary.PutVarint(val[valLen:], loc.Size)
@@ -202,12 +203,12 @@
 // been deleted, the client should remove the blob from the ChunkMap using
 // DeleteBlob(loc.Blob), and try again.  (The client may also wish to
 // arrange at some point to call GC() on the blob store.)
-func (cm *ChunkMap) LookupChunk(ctx *context.T, chunk []byte) (loc Location, err error) {
+func (cm *ChunkMap) LookupChunk(ctx *context.T, chunkHash []byte) (loc Location, err error) {
 	var start [maxKeyLen]byte
 	var limit [maxKeyLen]byte
 
 	startLen := copy(start[:], chunkPrefix)
-	startLen += copy(start[startLen:], chunk)
+	startLen += copy(start[startLen:], chunkHash)
 
 	limitLen := copy(limit[:], start[:startLen])
 	limitLen += copy(limit[limitLen:], blobLimit)
@@ -220,13 +221,13 @@
 		var n int
 		key := s.Key(keyBuf[:])
 		value := s.Value(valBuf[:])
-		loc.Blob = key[len(chunkPrefix)+chunkHashLen:]
+		loc.BlobID = key[len(chunkPrefix)+chunkHashLen:]
 		loc.Offset, n = binary.Varint(value)
 		if n > 0 {
 			loc.Size, n = binary.Varint(value[n:])
 		}
 		if n <= 0 {
-			err = verror.New(errMalformedChunkEntry, ctx, cm.dir, chunk, key, value)
+			err = verror.New(errMalformedChunkEntry, ctx, cm.dir, chunkHash, key, value)
 		}
 		s.Cancel()
 	} else {
@@ -234,23 +235,23 @@
 			err = s.Err()
 		}
 		if err == nil {
-			err = verror.New(errNoSuchChunk, ctx, cm.dir, chunk)
+			err = verror.New(errNoSuchChunk, ctx, cm.dir, chunkHash)
 		}
 	}
 
 	return loc, err
 }
 
-// A BlobStream allows the client to iterate over the chunks in a blob:
-//	bs := cm.NewBlobStream(ctx, blob)
-//	for bs.Advance() {
-//		chunkHash := bs.Value()
+// A ChunkStream allows the client to iterate over the chunks in a blob:
+//	cs := cm.NewChunkStream(ctx, blob)
+//	for cs.Advance() {
+//		chunkHash := cs.Value()
 //		...process chunkHash...
 //	}
-//	if bs.Err() != nil {
+//	if cs.Err() != nil {
 //		...there was an error...
 //	}
-type BlobStream struct {
+type ChunkStream struct {
 	cm     *ChunkMap
 	ctx    *context.T
 	stream store.Stream
@@ -264,9 +265,9 @@
 	more   bool            // whether stream may be consulted again
 }
 
-// NewBlobStream() returns a pointer to a new BlobStream that allows the client
+// NewChunkStream() returns a pointer to a new ChunkStream that allows the client
 // to enumerate the chunk hashes in a blob, in order.
-func (cm *ChunkMap) NewBlobStream(ctx *context.T, blob []byte) *BlobStream {
+func (cm *ChunkMap) NewChunkStream(ctx *context.T, blob []byte) *ChunkStream {
 	var start [maxKeyLen]byte
 	var limit [maxKeyLen]byte
 
@@ -276,13 +277,13 @@
 	limitLen := copy(limit[:], start[:startLen])
 	limitLen += copy(limit[limitLen:], offsetLimit)
 
-	bs := new(BlobStream)
-	bs.cm = cm
-	bs.ctx = ctx
-	bs.stream = cm.st.Scan(start[:startLen], limit[:limitLen])
-	bs.more = true
+	cs := new(ChunkStream)
+	cs.cm = cm
+	cs.ctx = ctx
+	cs.stream = cm.st.Scan(start[:startLen], limit[:limitLen])
+	cs.more = true
 
-	return bs
+	return cs
 }
 
 // Advance() stages an element so the client can retrieve the chunk hash with
@@ -291,27 +292,27 @@
 // Value() or Location() The client must call Cancel if it does not iterate
 // through all elements (i.e. until Advance() returns false).  Advance() may
 // block if an element is not immediately available.
-func (bs *BlobStream) Advance() (ok bool) {
-	if bs.more && bs.err == nil {
-		if !bs.stream.Advance() {
-			bs.err = bs.stream.Err()
-			bs.more = false // no more stream, even if no error
+func (cs *ChunkStream) Advance() (ok bool) {
+	if cs.more && cs.err == nil {
+		if !cs.stream.Advance() {
+			cs.err = cs.stream.Err()
+			cs.more = false // no more stream, even if no error
 		} else {
-			bs.key = bs.stream.Key(bs.keyBuf[:])
-			bs.value = bs.stream.Value(bs.valBuf[:])
-			ok = (len(bs.value) >= chunkHashLen) &&
-				(len(bs.key) == len(blobPrefix)+blobIDLen+offsetLen)
+			cs.key = cs.stream.Key(cs.keyBuf[:])
+			cs.value = cs.stream.Value(cs.valBuf[:])
+			ok = (len(cs.value) >= chunkHashLen) &&
+				(len(cs.key) == len(blobPrefix)+blobIDLen+offsetLen)
 			if ok {
 				var n int
-				bs.loc.Blob = make([]byte, blobIDLen)
-				copy(bs.loc.Blob, bs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
-				bs.loc.Offset = int64(binary.BigEndian.Uint64(bs.key[len(blobPrefix)+blobIDLen:]))
-				bs.loc.Size, n = binary.Varint(bs.value[chunkHashLen:])
+				cs.loc.BlobID = make([]byte, blobIDLen)
+				copy(cs.loc.BlobID, cs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
+				cs.loc.Offset = int64(binary.BigEndian.Uint64(cs.key[len(blobPrefix)+blobIDLen:]))
+				cs.loc.Size, n = binary.Varint(cs.value[chunkHashLen:])
 				ok = (n > 0)
 			}
 			if !ok {
-				bs.err = verror.New(errMalformedBlobEntry, bs.ctx, bs.cm.dir, bs.key, bs.value)
-				bs.stream.Cancel()
+				cs.err = verror.New(errMalformedBlobEntry, cs.ctx, cs.cm.dir, cs.key, cs.value)
+				cs.stream.Cancel()
 			}
 		}
 	}
@@ -323,19 +324,141 @@
 // enough to hold the entire value.  Otherwise, a newly allocated slice will be
 // returned.  It is valid to pass a nil buf.  Value() may panic if Advance()
 // returned false or was not called at all.  Value() does not block.
-func (bs *BlobStream) Value(buf []byte) (result []byte) {
+func (cs *ChunkStream) Value(buf []byte) (result []byte) {
 	if len(buf) < chunkHashLen {
 		buf = make([]byte, chunkHashLen)
 	}
-	copy(buf, bs.value[:chunkHashLen])
+	copy(buf, cs.value[:chunkHashLen])
 	return buf[:chunkHashLen]
 }
 
 // Location() returns the Location associated with the chunk staged by
 // Advance().  Location() may panic if Advance() returned false or was not
 // called at all.  Location() does not block.
-func (bs *BlobStream) Location() Location {
-	return bs.loc
+func (cs *ChunkStream) Location() Location {
+	return cs.loc
+}
+
+// Err() returns a non-nil error iff the stream encountered any errors.  Err()
+// does not block.
+func (cs *ChunkStream) Err() error {
+	return cs.err
+}
+
+// Cancel() notifies the stream provider that it can stop producing elements.
+// The client must call Cancel() if it does not iterate through all elements
+// (i.e. until Advance() returns false).  Cancel() is idempotent and can be
+// called concurrently with a goroutine that is iterating via Advance() and
+// Value().  Cancel() causes Advance() to subsequently return false.
+// Cancel() does not block.
+func (cs *ChunkStream) Cancel() {
+	cs.stream.Cancel()
+}
+
+// A BlobStream allows the client to iterate over the blobs in ChunkMap:
+//	bs := cm.NewBlobStream(ctx)
+//	for bs.Advance() {
+//		blobID := bs.Value()
+//		...process blobID...
+//	}
+//	if bs.Err() != nil {
+//		...there was an error...
+//	}
+type BlobStream struct {
+	cm  *ChunkMap
+	ctx *context.T
+
+	key    []byte          // key for current element
+	keyBuf [maxKeyLen]byte // buffer for keys
+	err    error           // error encountered.
+	mu     sync.Mutex      // protects "more", which may be written in Cancel()
+	more   bool            // whether stream may be consulted again
+}
+
+// keyLimit is the key for limit in store.Scan() calls within a BlobStream.
+var keyLimit []byte
+
+func init() {
+	// The limit key is the maximum length key, all ones after the blobPrefix.
+	keyLimit = make([]byte, maxKeyLen)
+	for i := copy(keyLimit, blobPrefix); i != len(keyLimit); i++ {
+		keyLimit[i] = 0xff
+	}
+}
+
+// NewBlobStream() returns a pointer to a new BlobStream that allows the client
+// to enumerate the blobs ChunkMap, in lexicographic order.
+func (cm *ChunkMap) NewBlobStream(ctx *context.T) *BlobStream {
+	bs := new(BlobStream)
+	bs.cm = cm
+	bs.ctx = ctx
+	bs.more = true
+	return bs
+}
+
+// Advance() stages an element so the client can retrieve the next blob ID with
+// Value().  Advance() returns true iff there is an element to retrieve.  The
+// client must call Advance() before calling Value().  The client must call
+// Cancel if it does not iterate through all elements (i.e. until Advance()
+// returns false).  Advance() may block if an element is not immediately
+// available.
+func (bs *BlobStream) Advance() (ok bool) {
+	bs.mu.Lock()
+	ok = bs.more
+	bs.mu.Unlock()
+	if ok {
+		prefixAndKeyLen := len(blobPrefix) + blobIDLen
+		// Compute the next key to search for.
+		if len(bs.key) == 0 { // First time through: anything starting with blobPrefix.
+			n := copy(bs.keyBuf[:], blobPrefix)
+			bs.key = bs.keyBuf[:n]
+		} else {
+			// Increment the blobID to form the next possible key.
+			i := prefixAndKeyLen - 1
+			for ; i != len(blobPrefix)-1 && bs.keyBuf[i] == 0xff; i-- {
+				bs.keyBuf[i] = 0
+			}
+			if i == len(blobPrefix)-1 { // End of database
+				ok = false
+			} else {
+				bs.keyBuf[i]++
+			}
+			bs.key = bs.keyBuf[:prefixAndKeyLen]
+		}
+		if ok {
+			stream := bs.cm.st.Scan(bs.key, keyLimit)
+			if !stream.Advance() {
+				bs.err = stream.Err()
+				ok = false // no more stream, even if no error
+			} else {
+				bs.key = stream.Key(bs.keyBuf[:])
+				if len(bs.key) < prefixAndKeyLen {
+					bs.err = verror.New(errMalformedBlobEntry, bs.ctx, bs.cm.dir, bs.key, stream.Value(nil))
+					ok = false
+				}
+				stream.Cancel() // We get at most one element from each stream.
+			}
+		}
+		if !ok {
+			bs.mu.Lock()
+			bs.more = false
+			bs.mu.Unlock()
+		}
+	}
+	return ok
+}
+
+// Value() returns the blob ID staged by Advance().  The returned slice may be
+// a sub-slice of buf if buf is large enough to hold the entire value.
+// Otherwise, a newly allocated slice will be returned.  It is valid to pass a
+// nil buf.  Value() may panic if Advance() returned false or was not called at
+// all.  Value() does not block.
+func (bs *BlobStream) Value(buf []byte) (result []byte) {
+	if len(buf) < blobIDLen {
+		buf = make([]byte, blobIDLen)
+	}
+	copy(buf, bs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
+	return buf[:blobIDLen]
 }
 
 // Err() returns a non-nil error iff the stream encountered any errors.  Err()
@@ -351,5 +474,7 @@
 // Value().  Cancel() causes Advance() to subsequently return false.
 // Cancel() does not block.
 func (bs *BlobStream) Cancel() {
-	bs.stream.Cancel()
+	bs.mu.Lock()
+	bs.more = false
+	bs.mu.Unlock()
 }
diff --git a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
index ab961a7..b7ab2df 100644
--- a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
+++ b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
@@ -9,12 +9,12 @@
 import "io/ioutil"
 import "math/rand"
 import "os"
+import "runtime"
 import "testing"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap"
 import "v.io/v23/context"
 
-// import "v.io/v23/verror"
 import "v.io/x/ref/test"
 import _ "v.io/x/ref/runtime/factories/generic"
 
@@ -27,78 +27,125 @@
 	return v
 }
 
-// verifyNoBlob() tests that blob b[blobi] is not present in *cm.
-// callSite is a callsite identifier, output in all error messages.
-func verifyNoBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, callSite int) {
-	bs := cm.NewBlobStream(ctx, b[blobi])
-	for i := 0; bs.Advance(); i++ {
-		t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: %v",
-			callSite, blobi, i, bs.Value(nil))
+// verifyBlobs() tests that the blobs in *cm are those in b[], as revealed via
+// the BlobStream() interface.
+func verifyBlobs(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, b [][]byte) {
+	_, _, callerLine, _ := runtime.Caller(1)
+	seen := make([]bool, len(b)) // seen[i] == whether b[i] seen in *cm
+	bs := cm.NewBlobStream(ctx)
+	var i int
+	for i = 0; bs.Advance(); i++ {
+		blob := bs.Value(nil)
+		var j int
+		for j = 0; j != len(b) && bytes.Compare(b[j], blob) != 0; j++ {
+		}
+		if j == len(b) {
+			t.Errorf("chunkmap_test: line %d: unexpected blob %v present in ChunkMap",
+				callerLine, blob)
+		} else if seen[j] {
+			t.Errorf("chunkmap_test: line %d: blob %v seen twice in ChunkMap",
+				callerLine, blob)
+		} else {
+			seen[j] = true
+		}
+	}
+	if i != len(b) {
+		t.Errorf("chunkmap_test: line %d: found %d blobs in ChunkMap, but expected %d",
+			callerLine, i, len(b))
+	}
+	for j := range seen {
+		if !seen[j] {
+			t.Errorf("chunkmap_test: line %d: blob %v not seen un ChunkMap",
+				callerLine, b[j])
+		}
 	}
 	if bs.Err() != nil {
-		t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Advance: unexpected error %v",
-			callSite, blobi, bs.Err())
+		t.Errorf("chunkmap_test: line %d: BlobStream.Advance: unexpected error %v",
+			callerLine, bs.Err())
 	}
 }
 
-// verifyBlob() tests that blob b[blobi] in *cm contains the expected chunks from c[].
-// Each blob is expected to have 8 chunks, 0...7, except that b[1] has c[8] instead of c[4] for chunk 4.
-// callSite is a callsite identifier, output in all error messages.
-func verifyBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, c [][]byte, callSite int) {
+// verifyNoChunksInBlob() tests that blob b[blobi] has no chunks in *cm, as
+// revealed by the ChunkStream interface.
+func verifyNoChunksInBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte) {
+	_, _, callerLine, _ := runtime.Caller(1)
+	cs := cm.NewChunkStream(ctx, b[blobi])
+	for i := 0; cs.Advance(); i++ {
+		t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: %v",
+			callerLine, blobi, i, cs.Value(nil))
+	}
+	if cs.Err() != nil {
+		t.Errorf("chunkmap_test: line %d: blob %d: ChunkStream.Advance: unexpected error %v",
+			callerLine, blobi, cs.Err())
+	}
+}
+
+// verifyChunksInBlob() tests that blob b[blobi] in *cm contains the expected
+// chunks from c[].  Each blob is expected to have 8 chunks, 0...7, except that
+// b[1] has c[8] instead of c[4] for chunk 4.
+func verifyChunksInBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, c [][]byte) {
+	_, _, callerLine, _ := runtime.Caller(1)
 	var err error
 	var i int
-	bs := cm.NewBlobStream(ctx, b[blobi])
-	for i = 0; bs.Advance(); i++ {
-		chunk := bs.Value(nil)
+	cs := cm.NewChunkStream(ctx, b[blobi])
+	for i = 0; cs.Advance(); i++ {
+		chunk := cs.Value(nil)
 		chunki := i
 		if blobi == 1 && i == 4 { // In blob 1, c[4] is replaced by c[8]
 			chunki = 8
 		}
 		if bytes.Compare(c[chunki], chunk) != 0 {
-			t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: got %v, expected %v",
-				callSite, blobi, i, chunk, c[chunki])
+			t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: got %v, expected %v",
+				callerLine, blobi, i, chunk, c[chunki])
 		}
 
 		var loc chunkmap.Location
 		loc, err = cm.LookupChunk(ctx, chunk)
 		if err != nil {
-			t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: LookupChunk got unexpected error: %v",
-				callSite, blobi, i, err)
+			t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: LookupChunk got unexpected error: %v",
+				callerLine, blobi, i, err)
 		} else {
 			if i == 4 {
-				if bytes.Compare(loc.Blob, b[blobi]) != 0 {
-					t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Blob got %v, expected %v",
-						callSite, blobi, i, loc.Blob, b[blobi])
+				if bytes.Compare(loc.BlobID, b[blobi]) != 0 {
+					t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.BlobID got %v, expected %v",
+						callerLine, blobi, i, loc.BlobID, b[blobi])
 				}
 			} else {
-				if bytes.Compare(loc.Blob, b[0]) != 0 && bytes.Compare(loc.Blob, b[1]) != 0 {
-					t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Blob got %v, expected %v",
-						callSite, blobi, i, loc.Blob, b[blobi])
+				if bytes.Compare(loc.BlobID, b[0]) != 0 && bytes.Compare(loc.BlobID, b[1]) != 0 {
+					t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.BlobID got %v, expected %v",
+						callerLine, blobi, i, loc.BlobID, b[blobi])
 				}
 			}
 			if loc.Offset != int64(i) {
-				t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Offset got %d, expected %d",
-					callSite, blobi, i, loc.Offset, i)
+				t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.Offset got %d, expected %d",
+					callerLine, blobi, i, loc.Offset, i)
 			}
 			if loc.Size != 1 {
-				t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Size got %d, expected 1",
-					callSite, blobi, i, loc.Size)
+				t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: Location.Size got %d, expected 1",
+					callerLine, blobi, i, loc.Size)
 			}
 
-			loc2 := bs.Location()
-			if bytes.Compare(loc.Blob, loc.Blob) != 0 || loc.Offset != loc2.Offset || loc.Size != loc2.Size {
-				t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: disagreement about location: LookupChunk %v vs BlobStream %v",
-					callSite, blobi, i, loc, loc2)
+			// The offsets and sizes will match, between the result
+			// from the stream and the result from LookupChunk(),
+			// because for all chunks written to both, they are
+			// written to the same places.  However, the blob need
+			// not match, since LookupChunk() will return an
+			// arbitrary Location in the store that contains the
+			// chunk.
+			loc2 := cs.Location()
+			if loc.Offset != loc2.Offset || loc.Size != loc2.Size {
+				t.Errorf("chunkmap_test: line %d: blob %d: chunk %d: disagreement about location: LookupChunk %v vs ChunkStream %v",
+					callerLine, blobi, i, loc, loc2)
 			}
 		}
 	}
-	if bs.Err() != nil {
-		t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Err() unepxected error %v",
-			callSite, blobi, bs.Err())
+	if cs.Err() != nil {
+		t.Errorf("chunkmap_test: line %d: blob %d: ChunkStream.Err() unepxected error %v",
+			callerLine, blobi, cs.Err())
 	}
 	if i != 8 {
-		t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Advance unexpectedly saw %d chunks, expected 8",
-			callSite, blobi, i)
+		t.Errorf("chunkmap_test: line %d: blob %d: ChunkStream.Advance unexpectedly saw %d chunks, expected 8",
+			callerLine, blobi, i)
 	}
 }
 
@@ -131,9 +178,10 @@
 	// Nine chunks: c[0 .. 8]
 	c := [][]byte{id(), id(), id(), id(), id(), id(), id(), id(), id()}
 
-	// Verify that there are no chunks in blobs initially.
-	verifyNoBlob(t, ctx, cm, 0, b, 0)
-	verifyNoBlob(t, ctx, cm, 1, b, 1)
+	// Verify that there are no blobs, or chunks in blobs initially.
+	verifyBlobs(t, ctx, cm, nil)
+	verifyNoChunksInBlob(t, ctx, cm, 0, b)
+	verifyNoChunksInBlob(t, ctx, cm, 1, b)
 
 	// Verify that all chunks have no locations initially.
 	for chunki := range c {
@@ -152,7 +200,7 @@
 				chunki = 8
 			}
 			err = cm.AssociateChunkWithLocation(ctx, c[chunki],
-				chunkmap.Location{Blob: b[blobi], Offset: int64(i), Size: 1})
+				chunkmap.Location{BlobID: b[blobi], Offset: int64(i), Size: 1})
 			if err != nil {
 				t.Errorf("chunkmap_test: blob %d: AssociateChunkWithLocation: unexpected error: %v",
 					blobi, err)
@@ -160,9 +208,10 @@
 		}
 	}
 
-	// Verify that the blobs contain the chunks specified.
-	verifyBlob(t, ctx, cm, 0, b, c, 2)
-	verifyBlob(t, ctx, cm, 1, b, c, 3)
+	// Verify that the blobs are present, with the chunks specified.
+	verifyBlobs(t, ctx, cm, b)
+	verifyChunksInBlob(t, ctx, cm, 0, b, c)
+	verifyChunksInBlob(t, ctx, cm, 1, b, c)
 
 	// Verify that all chunks now have locations.
 	for chunki := range c {
@@ -197,8 +246,9 @@
 	}
 
 	// Verify that blob 0 is gone, but blob 1 remains.
-	verifyNoBlob(t, ctx, cm, 0, b, 4)
-	verifyBlob(t, ctx, cm, 1, b, c, 5)
+	verifyBlobs(t, ctx, cm, b[1:])
+	verifyNoChunksInBlob(t, ctx, cm, 0, b)
+	verifyChunksInBlob(t, ctx, cm, 1, b, c)
 
 	// Delete b[1].
 	err = cm.DeleteBlob(ctx, b[1])
@@ -207,9 +257,10 @@
 			err)
 	}
 
-	// Verify that there are no chunks in blobs initially.
-	verifyNoBlob(t, ctx, cm, 0, b, 6)
-	verifyNoBlob(t, ctx, cm, 1, b, 7)
+	// Verify that there are no blobs, or chunks in blobs once more.
+	verifyBlobs(t, ctx, cm, nil)
+	verifyNoChunksInBlob(t, ctx, cm, 0, b)
+	verifyNoChunksInBlob(t, ctx, cm, 1, b)
 
 	// Verify that all chunks have no locations once more.
 	for chunki := range c {
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
index 43cc542..cb8bca8 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -8,13 +8,17 @@
 package fs_cablobstore
 
 // Internals:
-//   The blobstore consists of a directory with "blob", "cas", and "tmp"
-//   subdirectories.
+// Blobs are partitioned into two types of unit: "fragments" and "chunks".
+// A fragment is stored in a single file on disc.  A chunk is a unit of network
+// transmission.
+//
+//   The blobstore consists of a directory with "blob", "cas", "chunk", and
+//   "tmp" subdirectories.
 //   - "tmp" is used for temporary files that are moved into place via
 //     link()/unlink() or rename(), depending on what's available.
 //   - "cas" contains files whose names are content hashes of the files being
 //     named.  A few slashes are thrown into the name near the front so that no
-//     single directory gets too large.
+//     single directory gets too large.  These files are called "fragments".
 //   - "blob" contains files whose names are random numbers.  These names are
 //     visible externally as "blob names".  Again, a few slashes are thrown
 //     into the name near the front so that no single directory gets too large.
@@ -26,13 +30,17 @@
 //     <offset> bytes into <cas-fragment>, which is in the "cas" subtree.  The
 //     "f" line indicates that the blob is "finalized" and gives its complete
 //     md5 hash.  No fragments may be appended to a finalized blob.
+//   - "chunk" contains a store (currently implemented with leveldb) that
+//     maps chunks of blobs to content hashes and vice versa.
 
 import "bufio"
+import "bytes"
 import "crypto/md5"
 import "fmt"
 import "hash"
 import "io"
 import "io/ioutil"
+import "math"
 import "math/rand"
 import "os"
 import "path/filepath"
@@ -42,6 +50,8 @@
 import "time"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap"
 import "v.io/v23/context"
 import "v.io/v23/verror"
 
@@ -62,35 +72,37 @@
 	errCantDeleteBlob         = verror.Register(pkgPath+".errCantDeleteBlob", verror.NoRetry, "{1:}{2:} Can't delete blob {3}{:_}")
 	errBlobDeleted            = verror.Register(pkgPath+".errBlobDeleted", verror.NoRetry, "{1:}{2:} Blob is deleted{:_}")
 	errSizeTooBigForFragment  = verror.Register(pkgPath+".errSizeTooBigForFragment", verror.NoRetry, "{1:}{2:} writing blob {1}, size too big for fragment{:1}")
+	errStreamCancelled        = verror.Register(pkgPath+".errStreamCancelled", verror.NoRetry, "{1:}{2:} Advance() called on cancelled stream{:_}")
 )
 
 // For the moment, we disallow others from accessing the tree where blobs are
-// stored.  We could in the future relax this to 0711 or 0755.
+// stored.  We could in the future relax this to 0711/0755, and 0644.
 const dirPermissions = 0700
+const filePermissions = 0600
 
 // Subdirectories of the blobstore's tree
 const (
-	blobDir = "blob" // Subdirectory where blobs are indexed by blob id.
-	casDir  = "cas"  // Subdirectory where blobs are indexed by content hash.
-	tmpDir  = "tmp"  // Subdirectory where temporary files are created.
+	blobDir  = "blob"  // Subdirectory where blobs are indexed by blob id.
+	casDir   = "cas"   // Subdirectory where fragments are indexed by content hash.
+	chunkDir = "chunk" // Subdirectory where chunks are indexed by content hash.
+	tmpDir   = "tmp"   // Subdirectory where temporary files are created.
 )
 
 // An FsCaBlobStore represents a simple, content-addressable store.
 type FsCaBlobStore struct {
-	rootName string // The name of the root of the store.
+	rootName string             // The name of the root of the store.
+	cm       *chunkmap.ChunkMap // Mapping from chunks to blob locations and vice versa.
 
-	mu sync.Mutex // Protects fields below, plus
-	// blobDesc.fragment, blobDesc.activeDescIndex,
-	// and blobDesc.refCount.
+	// mu protects fields below, plus most fields in each blobDesc when used from a BlobWriter.
+	mu         sync.Mutex
 	activeDesc []*blobDesc        // The blob descriptors in use by active BlobReaders and BlobWriters.
 	toDelete   []*map[string]bool // Sets of items that active GC threads are about to delete. (Pointers to maps, to allow pointer comparison.)
 }
 
-// hashToFileName() returns the content-addressed name of the data with the
-// specified hash, given its content hash.  Requires len(hash)==16.  An md5
-// hash is suitable.
-func hashToFileName(hash []byte) string {
-	return filepath.Join(casDir,
+// hashToFileName() returns the name of the binary ID with the specified
+// prefix.  Requires len(id)==16.  An md5 hash is suitable.
+func hashToFileName(prefix string, hash []byte) string {
+	return filepath.Join(prefix,
 		fmt.Sprintf("%02x", hash[0]),
 		fmt.Sprintf("%02x", hash[1]),
 		fmt.Sprintf("%02x", hash[2]),
@@ -101,6 +113,23 @@
 			hash[12], hash[13], hash[14], hash[15]))
 }
 
+// fileNameToHash() converts a file name in the format generated by
+// hashToFileName(prefix, ...) to a vector of 16 bytes.  If the string is
+// malformed, the nil slice is returned.
+func fileNameToHash(prefix string, s string) []byte {
+	idStr := strings.TrimPrefix(filepath.ToSlash(s), prefix+"/")
+	hash := make([]byte, 16, 16)
+	n, err := fmt.Sscanf(idStr, "%02x/%02x/%02x/%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+		&hash[0], &hash[1], &hash[2], &hash[3],
+		&hash[4], &hash[5], &hash[6], &hash[7],
+		&hash[8], &hash[9], &hash[10], &hash[11],
+		&hash[12], &hash[13], &hash[14], &hash[15])
+	if n != 16 || err != nil {
+		hash = nil
+	}
+	return hash
+}
+
 // newBlobName() returns a new random name for a blob.
 func newBlobName() string {
 	return filepath.Join(blobDir,
@@ -143,7 +172,7 @@
 // Create() returns a pointer to an FsCaBlobStore stored in the file system at
 // "rootName".  If the directory rootName does not exist, it is created.
 func Create(ctx *context.T, rootName string) (fscabs *FsCaBlobStore, err error) {
-	dir := []string{tmpDir, casDir, blobDir}
+	dir := []string{tmpDir, casDir, chunkDir, blobDir}
 	for i := 0; i != len(dir) && err == nil; i++ {
 		fullName := filepath.Join(rootName, dir[i])
 		os.MkdirAll(fullName, dirPermissions)
@@ -153,9 +182,14 @@
 			err = verror.New(errNotADir, ctx, fullName)
 		}
 	}
+	var cm *chunkmap.ChunkMap
+	if err == nil {
+		cm, err = chunkmap.New(ctx, filepath.Join(rootName, chunkDir))
+	}
 	if err == nil {
 		fscabs = new(FsCaBlobStore)
 		fscabs.rootName = rootName
+		fscabs.cm = cm
 	}
 	return fscabs, err
 }
@@ -169,12 +203,15 @@
 func (fscabs *FsCaBlobStore) DeleteBlob(ctx *context.T, blobName string) (err error) {
 	// Disallow deletions of things outside the blob tree, or that may contain "..".
 	// For simplicity, the code currently disallows '.'.
-	if !strings.HasPrefix(blobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
+	blobID := fileNameToHash(blobDir, blobName)
+	if blobID == nil || strings.IndexByte(blobName, '.') != -1 {
 		err = verror.New(errInvalidBlobName, ctx, blobName)
 	} else {
 		err = os.Remove(filepath.Join(fscabs.rootName, blobName))
 		if err != nil {
 			err = verror.New(errCantDeleteBlob, ctx, blobName, err)
+		} else {
+			err = fscabs.cm.DeleteBlob(ctx, blobID)
 		}
 	}
 	return err
@@ -250,7 +287,7 @@
 
 // addFragment() ensures that the store *fscabs contains a fragment comprising
 // the catenation of the byte vectors named by item[..].block and the contents
-// of the files named by item[..].filename.  The block field is ignored if
+// of the files named by item[..].fileName.  The block field is ignored if
 // fileName!="".  The fragment is not physically added if already present.
 // The fragment is added to the fragment list of the descriptor *desc.
 func (fscabs *FsCaBlobStore) addFragment(ctx *context.T, extHasher hash.Hash,
@@ -305,7 +342,7 @@
 
 	// Compute the hash, and form the file name in the respository.
 	hash := hasher.Sum(nil)
-	relFileName := hashToFileName(hash)
+	relFileName := hashToFileName(casDir, hash)
 	absFileName := filepath.Join(fscabs.rootName, relFileName)
 
 	// Add the fragment's name to *desc's fragments so the garbage
@@ -316,7 +353,6 @@
 		size:     size,
 		offset:   0,
 		fileName: relFileName})
-	desc.size += size
 	fscabs.mu.Unlock()
 
 	// If the file does not already exist, ...
@@ -365,7 +401,11 @@
 		// Remove the entry added to fragment list above.
 		fscabs.mu.Lock()
 		desc.fragment = desc.fragment[0 : len(desc.fragment)-1]
-		desc.size -= size
+		fscabs.mu.Unlock()
+	} else { // commit the change by updating the size
+		fscabs.mu.Lock()
+		desc.size += size
+		desc.cv.Broadcast() // Tell chunkmap BlobReader there's more to read.
 		fscabs.mu.Unlock()
 	}
 
@@ -385,16 +425,24 @@
 	activeDescIndex int // Index into fscabs.activeDesc if refCount>0; under fscabs.mu.
 	refCount        int // Reference count; under fscabs.mu.
 
-	name     string         // Name of the blob.
-	fragment []blobFragment // All the fragements in this blob;
-	// modified under fscabs.mu and in BlobWriter
-	// owner's thread; may be read by GC under
-	// fscabs.mu.
-	size      int64 // Total size of the blob.
-	finalized bool  // Whether the blob has been finalized.
+	name string // Name of the blob.
+
+	// The following fields are modified under fscabs.mu and in BlobWriter
+	// owner's thread; they may be read by GC (when obtained from
+	// fscabs.activeDesc) and the chunk writer under fscabs.mu.  In the
+	// BlobWriter owner's thread, reading does not require a lock, but
+	// writing does.  In other contexts (BlobReader, or a desc that has
+	// just been allocated by getBlob()), no locking is needed.
+
+	fragment  []blobFragment // All the fragments in this blob
+	size      int64          // Total size of the blob.
+	finalized bool           // Whether the blob has been finalized.
 	// A finalized blob has a valid hash field, and no new bytes may be added
 	// to it.  A well-formed hash has 16 bytes.
 	hash []byte
+
+	openWriter bool       // Whether this descriptor is being written by an open BlobWriter.
+	cv         *sync.Cond // signalled when a BlobWriter writes or closes.
 }
 
 // isBeingDeleted() returns whether fragment fragName is about to be deleted
@@ -456,7 +504,8 @@
 
 // getBlob() returns the in-memory blob descriptor for the named blob.
 func (fscabs *FsCaBlobStore) getBlob(ctx *context.T, blobName string) (desc *blobDesc, err error) {
-	if !strings.HasPrefix(blobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
+	slashBlobName := filepath.ToSlash(blobName)
+	if !strings.HasPrefix(slashBlobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
 		err = verror.New(errInvalidBlobName, ctx, blobName)
 	} else {
 		absBlobName := filepath.Join(fscabs.rootName, blobName)
@@ -467,6 +516,7 @@
 			desc = new(blobDesc)
 			desc.activeDescIndex = -1
 			desc.name = blobName
+			desc.cv = sync.NewCond(&fscabs.mu)
 			scanner := bufio.NewScanner(fh)
 			for scanner.Scan() {
 				field := strings.Split(scanner.Text(), " ")
@@ -528,32 +578,49 @@
 	desc   *blobDesc // Description of the blob being written.
 	f      *file     // The file being written.
 	hasher hash.Hash // Running hash of blob.
+
+	// Fields to allow the ChunkMap to be written.
+	csBr  *BlobReader     // Reader over the blob that's currently being written.
+	cs    *chunker.Stream // Stream of chunks derived from csBr
+	csErr chan error      // writeChunkMap() sends its result here; Close/CloseWithoutFinalize receives it.
 }
 
-// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on a newly
-// created blob name, which can be found using the Name() method.  BlobWriters
-// should not be used concurrently by multiple threads.  The returned handle
-// should be closed with either the Close() or CloseWithoutFinalize() method to
-// avoid leaking file handles.
-func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (localblobstore.BlobWriter, error) {
+// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
+// a newly created blob.  If "name" is non-empty, it is used to name
+// the blob, and it must be in the format of a name returned by this
+// interface (probably by another instance on another device).
+// Otherwise, a new name is created, which can be found using
+// the Name() method.  It is an error to attempt to overwrite a blob
+// that already exists in this blob store.  BlobWriters should not be
+// used concurrently by multiple threads.  The returned handle should
+// be closed with either the Close() or CloseWithoutFinalize() method
+// to avoid leaking file handles.
+func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T, name string) (localblobstore.BlobWriter, error) {
 	var bw *BlobWriter
-	newName := newBlobName()
-	fileName := filepath.Join(fscabs.rootName, newName)
+	if name == "" {
+		name = newBlobName()
+	}
+	fileName := filepath.Join(fscabs.rootName, name)
 	os.MkdirAll(filepath.Dir(fileName), dirPermissions)
-	f, err := newFile(os.Create(fileName))
+	f, err := newFile(os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, filePermissions))
 	if err == nil {
 		bw = new(BlobWriter)
 		bw.fscabs = fscabs
 		bw.ctx = ctx
 		bw.desc = new(blobDesc)
 		bw.desc.activeDescIndex = -1
-		bw.desc.name = newName
+		bw.desc.name = name
+		bw.desc.cv = sync.NewCond(&fscabs.mu)
+		bw.desc.openWriter = true
 		bw.f = f
 		bw.hasher = md5.New()
 		if !fscabs.descRef(bw.desc) {
 			// Can't happen; descriptor refers to no fragments.
 			panic(verror.New(errBlobDeleted, ctx, bw.desc.name))
 		}
+		// Write the chunks of this blob into the ChunkMap, as they are
+		// written by this writer.
+		bw.forkWriteChunkMap()
 	}
 	return bw, err
 }
@@ -562,14 +629,17 @@
 // old, but unfinalized blob name.
 func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (localblobstore.BlobWriter, error) {
 	var err error
-	bw := new(BlobWriter)
-	bw.fscabs = fscabs
-	bw.ctx = ctx
-	bw.desc, err = bw.fscabs.getBlob(ctx, blobName)
-	if err == nil && bw.desc.finalized {
+	var bw *BlobWriter
+	var desc *blobDesc
+	desc, err = fscabs.getBlob(ctx, blobName)
+	if err == nil && desc.finalized {
 		err = verror.New(errBlobAlreadyFinalized, ctx, blobName)
-		bw = nil
-	} else {
+	} else if err == nil {
+		bw = new(BlobWriter)
+		bw.fscabs = fscabs
+		bw.ctx = ctx
+		bw.desc = desc
+		bw.desc.openWriter = true
 		fileName := filepath.Join(fscabs.rootName, bw.desc.name)
 		bw.f, err = newFile(os.OpenFile(fileName, os.O_WRONLY|os.O_APPEND, 0666))
 		bw.hasher = md5.New()
@@ -581,7 +651,7 @@
 			// non-zero.
 			panic(verror.New(errBlobDeleted, ctx, fileName))
 		}
-		br := fscabs.blobReaderFromDesc(ctx, bw.desc)
+		br := fscabs.blobReaderFromDesc(ctx, bw.desc, dontWaitForWriter)
 		buf := make([]byte, 8192, 8192)
 		for err == nil {
 			var n int
@@ -592,10 +662,89 @@
 		if err == io.EOF { // EOF is expected.
 			err = nil
 		}
+		if err == nil {
+			// Write the chunks of this blob into the ChunkMap, as
+			// they are written by this writer.
+			bw.forkWriteChunkMap()
+		}
 	}
 	return bw, err
 }
 
+// forkWriteChunkMap() creates a new thread to run writeChunkMap().  It adds
+// the chunks written to *bw to the blob store's ChunkMap.  The caller is
+// expected to call joinWriteChunkMap() at some later point.
+func (bw *BlobWriter) forkWriteChunkMap() {
+	// The descRef's ref count is incremented here to compensate
+	// for the decrement it will receive in br.Close() in joinWriteChunkMap.
+	if !bw.fscabs.descRef(bw.desc) {
+		// Can't happen; descriptor's ref count was already non-zero.
+		panic(verror.New(errBlobDeleted, bw.ctx, bw.desc.name))
+	}
+	bw.csBr = bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc, waitForWriter)
+	bw.cs = chunker.NewStream(bw.ctx, &chunker.DefaultParam, bw.csBr)
+	bw.csErr = make(chan error)
+	go bw.writeChunkMap()
+}
+
+// insertChunk() inserts chunk into the blob store's ChunkMap, associating it
+// with the specified byte offset in the blob blobID being written by *bw.  The byte
+// offset of the next chunk is returned.
+func (bw *BlobWriter) insertChunk(blobID []byte, chunkHash []byte, offset int64, size int64) (int64, error) {
+	err := bw.fscabs.cm.AssociateChunkWithLocation(bw.ctx, chunkHash[:],
+		chunkmap.Location{BlobID: blobID, Offset: offset, Size: size})
+	if err != nil {
+		bw.cs.Cancel()
+	}
+	return offset + size, err
+}
+
+// writeChunkMap() iterates over the chunk in stream bw.cs, and associates each
+// one with the blob being written.
+func (bw *BlobWriter) writeChunkMap() {
+	var err error
+	var offset int64
+	blobID := fileNameToHash(blobDir, bw.desc.name)
+	// Associate each chunk only after the next chunk has been seen (or
+	// the blob finalized), to avoid recording an artificially short chunk
+	// at the end of a partial transfer.
+	var chunkHash [md5.Size]byte
+	var chunkLen int64
+	if bw.cs.Advance() {
+		chunk := bw.cs.Value()
+		// Record the hash and size, since chunk's underlying buffer
+		// may be reused by the next call to Advance().
+		chunkHash = md5.Sum(chunk)
+		chunkLen = int64(len(chunk))
+		for bw.cs.Advance() {
+			offset, err = bw.insertChunk(blobID, chunkHash[:], offset, chunkLen)
+			chunk = bw.cs.Value()
+			chunkHash = md5.Sum(chunk)
+			chunkLen = int64(len(chunk))
+		}
+	}
+	if err == nil {
+		err = bw.cs.Err()
+	}
+	bw.fscabs.mu.Lock()
+	if err == nil && chunkLen != 0 && bw.desc.finalized {
+		offset, err = bw.insertChunk(blobID, chunkHash[:], offset, chunkLen)
+	}
+	bw.fscabs.mu.Unlock()
+	bw.csErr <- err // wake joinWriteChunkMap()
+}
+
+// joinWriteChunkMap waits for the completion of the thread forked by forkWriteChunkMap().
+// It returns when the chunks in the blob have been written to the blob store's ChunkMap.
+func (bw *BlobWriter) joinWriteChunkMap(err error) error {
+	err2 := <-bw.csErr // read error from end of writeChunkMap()
+	if err == nil {
+		err = err2
+	}
+	bw.csBr.Close()
+	return err
+}
+
 // Close() finalizes *bw, and indicates that the client will perform no further
 // append operations on *bw.  Any internal open file handles are closed.
 func (bw *BlobWriter) Close() (err error) {
@@ -608,7 +757,12 @@
 		_, err = fmt.Fprintf(bw.f.writer, "f %s\n", hashToString(h)) // finalize
 		_, err = bw.f.close(bw.ctx, err)
 		bw.f = nil
+		bw.fscabs.mu.Lock()
 		bw.desc.finalized = true
+		bw.desc.openWriter = false
+		bw.desc.cv.Broadcast() // Tell chunkmap BlobReader that writing has ceased.
+		bw.fscabs.mu.Unlock()
+		err = bw.joinWriteChunkMap(err)
 		bw.fscabs.descUnref(bw.desc)
 	}
 	return err
@@ -622,8 +776,13 @@
 	if bw.f == nil {
 		err = verror.New(errAlreadyClosed, bw.ctx, bw.desc.name)
 	} else {
+		bw.fscabs.mu.Lock()
+		bw.desc.openWriter = false
+		bw.desc.cv.Broadcast() // Tell chunkmap BlobReader that writing has ceased.
+		bw.fscabs.mu.Unlock()
 		_, err = bw.f.close(bw.ctx, err)
 		bw.f = nil
+		err = bw.joinWriteChunkMap(err)
 		bw.fscabs.descUnref(bw.desc)
 	}
 	return err
@@ -688,6 +847,7 @@
 						offset:   offset + desc.fragment[i].offset,
 						fileName: desc.fragment[i].fileName})
 					bw.desc.size += consume
+					bw.desc.cv.Broadcast() // Tell chunkmap BlobReader there's more to read.
 					bw.fscabs.mu.Unlock()
 				}
 				offset = 0
@@ -701,7 +861,7 @@
 			// non-zero.
 			panic(verror.New(errBlobDeleted, bw.ctx, blobName))
 		}
-		br := bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc)
+		br := bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc, dontWaitForWriter)
 		if err == nil {
 			_, err = br.Seek(origSize, 0)
 		}
@@ -752,7 +912,8 @@
 	fscabs *FsCaBlobStore
 	ctx    *context.T
 
-	desc *blobDesc // A description of the blob being read.
+	desc          *blobDesc // A description of the blob being read.
+	waitForWriter bool      // whether this reader should wait for a concurrent BlobWriter
 
 	pos int64 // The next position we will read from (used by Read/Seek, not ReadAt).
 
@@ -761,14 +922,23 @@
 	fh            *os.File // non-nil iff fragmentIndex != -1.
 }
 
+// constants to make the calls to blobReaderFromDesc invocations more readable
+const (
+	dontWaitForWriter = false
+	waitForWriter     = true
+)
+
 // blobReaderFromDesc() returns a pointer to a newly allocated BlobReader given
-// a pre-existing blobDesc.
-func (fscabs *FsCaBlobStore) blobReaderFromDesc(ctx *context.T, desc *blobDesc) *BlobReader {
+// a pre-existing blobDesc.  If waitForWriter is true, the reader will wait for
+// any BlobWriter to finish writing the part of the blob the reader is trying
+// to read.
+func (fscabs *FsCaBlobStore) blobReaderFromDesc(ctx *context.T, desc *blobDesc, waitForWriter bool) *BlobReader {
 	br := new(BlobReader)
 	br.fscabs = fscabs
 	br.ctx = ctx
 	br.fragmentIndex = -1
 	br.desc = desc
+	br.waitForWriter = waitForWriter
 	return br
 }
 
@@ -779,7 +949,7 @@
 	var desc *blobDesc
 	desc, err = fscabs.getBlob(ctx, blobName)
 	if err == nil {
-		br = fscabs.blobReaderFromDesc(ctx, desc)
+		br = fscabs.blobReaderFromDesc(ctx, desc, dontWaitForWriter)
 	}
 	return br, err
 }
@@ -821,24 +991,41 @@
 	return lo
 }
 
+// waitUntilAvailable() waits until position pos within *br is available for
+// reading, if this reader is waiting for writers.  This may be because:
+//  - *br is on an already written blob.
+//  - *br is on a blob being written that has been closed, or whose writes have
+//    passed position pos.
+// The value pos==math.MaxInt64 can be used to mean "until the writer is closed".
+// Requires br.fscabs.mu held.
+func (br *BlobReader) waitUntilAvailable(pos int64) {
+	for br.waitForWriter && br.desc.openWriter && br.desc.size < pos {
+		br.desc.cv.Wait()
+	}
+}
+
 // ReadAt() fills b[] with up to len(b) bytes of data starting at position "at"
 // within the blob that *br indicates, and returns the number of bytes read.
 func (br *BlobReader) ReadAt(b []byte, at int64) (n int, err error) {
+	br.fscabs.mu.Lock()
+	br.waitUntilAvailable(at + int64(len(b)))
 	i := findFragment(br.desc.fragment, at)
 	if i < len(br.desc.fragment) && at <= br.desc.size {
+		fragmenti := br.desc.fragment[i] // copy fragment data to allow releasing lock
+		br.fscabs.mu.Unlock()
 		if i != br.fragmentIndex {
 			br.closeInternal()
 		}
 		if br.fragmentIndex == -1 {
-			br.fh, err = os.Open(filepath.Join(br.fscabs.rootName, br.desc.fragment[i].fileName))
+			br.fh, err = os.Open(filepath.Join(br.fscabs.rootName, fragmenti.fileName))
 			if err == nil {
 				br.fragmentIndex = i
 			} else {
 				br.closeInternal()
 			}
 		}
-		var offset int64 = at - br.desc.fragment[i].pos + br.desc.fragment[i].offset
-		consume := br.desc.fragment[i].size - (at - br.desc.fragment[i].pos)
+		var offset int64 = at - fragmenti.pos + fragmenti.offset
+		consume := fragmenti.size - (at - fragmenti.pos)
 		if int64(len(b)) < consume {
 			consume = int64(len(b))
 		}
@@ -847,10 +1034,11 @@
 		} else if err == nil {
 			panic("failed to open blob fragment")
 		}
+		br.fscabs.mu.Lock()
 		// Return io.EOF if the Read reached the end of the last
 		// fragment, but not if it's merely the end of some interior
-		// fragment.
-		if int64(n)+at >= br.desc.size {
+		// fragment or the blob is still being extended.
+		if int64(n)+at >= br.desc.size && !(br.waitForWriter && br.desc.openWriter) {
 			if err == nil {
 				err = io.EOF
 			}
@@ -860,6 +1048,7 @@
 	} else {
 		err = verror.New(errIllegalPositionForRead, br.ctx, br.pos, br.desc.size)
 	}
+	br.fscabs.mu.Unlock()
 	return n, err
 }
 
@@ -879,11 +1068,13 @@
 // offset+current_seek_position if whence==1, and offset+end_of_blob if
 // whence==2, and then returns the current seek position.
 func (br *BlobReader) Seek(offset int64, whence int) (result int64, err error) {
+	br.fscabs.mu.Lock()
 	if whence == 0 {
 		result = offset
 	} else if whence == 1 {
 		result = offset + br.pos
 	} else if whence == 2 {
+		br.waitUntilAvailable(math.MaxInt64)
 		result = offset + br.desc.size
 	} else {
 		err = verror.New(errBadSeekWhence, br.ctx, whence)
@@ -898,17 +1089,26 @@
 	} else if err == nil {
 		br.pos = result
 	}
+	br.fscabs.mu.Unlock()
 	return result, err
 }
 
 // IsFinalized() returns whether *br has been finalized.
 func (br *BlobReader) IsFinalized() bool {
-	return br.desc.finalized
+	br.fscabs.mu.Lock()
+	br.waitUntilAvailable(math.MaxInt64)
+	finalized := br.desc.finalized
+	br.fscabs.mu.Unlock()
+	return finalized
 }
 
 // Size() returns *br's size.
 func (br *BlobReader) Size() int64 {
-	return br.desc.size
+	br.fscabs.mu.Lock()
+	br.waitUntilAvailable(math.MaxInt64)
+	size := br.desc.size
+	br.fscabs.mu.Unlock()
+	return size
 }
 
 // Name() returns *br's name.
@@ -918,7 +1118,11 @@
 
 // Hash() returns *br's hash.  It may be nil if the blob is not finalized.
 func (br *BlobReader) Hash() []byte {
-	return br.desc.hash
+	br.fscabs.mu.Lock()
+	br.waitUntilAvailable(math.MaxInt64)
+	hash := br.desc.hash
+	br.fscabs.mu.Unlock()
+	return hash
 }
 
 // -----------------------------------------------------------
@@ -936,6 +1140,10 @@
 	fscabs *FsCaBlobStore // The parent FsCaBlobStore.
 	err    error          // If non-nil, the error that terminated iteration.
 	stack  []dirListing   // The stack of dirListings leading to the current entry.
+	ctx    *context.T     // context passed to ListBlobIds() or ListCAIds()
+
+	mu        sync.Mutex // Protects cancelled.
+	cancelled bool       // Whether Cancel() has been called.
 }
 
 // ListBlobIds() returns an iterator that can be used to enumerate the blobs in
@@ -947,10 +1155,10 @@
 //    if fscabsi.Err() != nil {
 //      // The loop terminated early due to an error.
 //    }
-func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Iter {
+func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Stream {
 	stack := make([]dirListing, 1)
 	stack[0] = dirListing{pos: -1, nameList: []string{blobDir}}
-	return &FsCasIter{fscabs: fscabs, stack: stack}
+	return &FsCasIter{fscabs: fscabs, stack: stack, ctx: ctx}
 }
 
 // ListCAIds() returns an iterator that can be used to enumerate the
@@ -963,10 +1171,18 @@
 //    if fscabsi.Err() != nil {
 //      // The loop terminated early due to an error.
 //    }
-func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Iter {
+func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Stream {
 	stack := make([]dirListing, 1)
 	stack[0] = dirListing{pos: -1, nameList: []string{casDir}}
-	return &FsCasIter{fscabs: fscabs, stack: stack}
+	return &FsCasIter{fscabs: fscabs, stack: stack, ctx: ctx}
+}
+
+// isCancelled() returns whether Cancel() has been called.
+func (fscabsi *FsCasIter) isCancelled() bool {
+	fscabsi.mu.Lock()
+	cancelled := fscabsi.cancelled
+	fscabsi.mu.Unlock()
+	return cancelled
 }
 
 // Advance() stages an item so that it may be retrieved via Value.  Returns
@@ -976,7 +1192,7 @@
 	stack := fscabsi.stack
 	err := fscabsi.err
 
-	for err == nil && !advanced && len(stack) != 0 {
+	for err == nil && !advanced && len(stack) != 0 && !fscabsi.isCancelled() {
 		last := len(stack) - 1
 		stack[last].pos++
 		if stack[last].pos == len(stack[last].nameList) {
@@ -1005,6 +1221,13 @@
 		}
 	}
 
+	if fscabsi.isCancelled() {
+		if err == nil {
+			fscabsi.err = verror.New(errStreamCancelled, fscabsi.ctx)
+		}
+		advanced = false
+	}
+
 	fscabsi.err = err
 	return advanced
 }
@@ -1027,6 +1250,163 @@
 	return fscabsi.err
 }
 
+// Cancel() indicates that the iteration stream should terminate early.
+// Never blocks.  May be called concurrently with other methods on fscabsi.
+func (fscabsi *FsCasIter) Cancel() {
+	fscabsi.mu.Lock()
+	fscabsi.cancelled = true
+	fscabsi.mu.Unlock()
+}
+
+// -----------------------------------------------------------
+
+// An errorChunkStream is a localblobstore.ChunkStream that yields an error.
+type errorChunkStream struct {
+	err error
+}
+
+func (*errorChunkStream) Advance() bool       { return false }
+func (*errorChunkStream) Value([]byte) []byte { return nil }
+func (ecs *errorChunkStream) Err() error      { return ecs.err }
+func (*errorChunkStream) Cancel()             {}
+
+// BlobChunkStream() returns a ChunkStream that can be used to read the ordered
+// list of content hashes of chunks in blob blobName.  It is expected that this
+// list will be presented to RecipeFromChunks() on another device, to create a
+// recipe for transmitting the blob efficiently to that other device.
+func (fscabs *FsCaBlobStore) BlobChunkStream(ctx *context.T, blobName string) (cs localblobstore.ChunkStream) {
+	blobID := fileNameToHash(blobDir, blobName)
+	if blobID == nil {
+		cs = &errorChunkStream{err: verror.New(errInvalidBlobName, ctx, blobName)}
+	} else {
+		cs = fscabs.cm.NewChunkStream(ctx, blobID)
+	}
+	return cs
+}
+
+// -----------------------------------------------------------
+
+// LookupChunk returns the location of a chunk with the specified chunk hash
+// within the store.
+func (fscabs *FsCaBlobStore) LookupChunk(ctx *context.T, chunkHash []byte) (loc localblobstore.Location, err error) {
+	var chunkMapLoc chunkmap.Location
+	chunkMapLoc, err = fscabs.cm.LookupChunk(ctx, chunkHash)
+	if err == nil {
+		loc.BlobName = hashToFileName(blobDir, chunkMapLoc.BlobID)
+		loc.Size = chunkMapLoc.Size
+		loc.Offset = chunkMapLoc.Offset
+	}
+	return loc, err
+}
+
+// -----------------------------------------------------------
+
+// A RecipeStream implements localblobstore.RecipeStream.  It allows the client
+// to iterate over the recipe steps to recreate a blob identified by a stream
+// of chunk hashes (from chunkStream), but using parts of blobs in the current
+// blob store where possible.
+type RecipeStream struct {
+	fscabs *FsCaBlobStore
+	ctx    *context.T
+
+	chunkStream     localblobstore.ChunkStream // the stream of chunks in the blob
+	pendingChunkBuf [16]byte                   // a buffer for pendingChunk
+	pendingChunk    []byte                     // the last unprocessed chunk hash read chunkStream, or nil if none
+	step            localblobstore.RecipeStep  // the recipe step to be returned by Value()
+	mu              sync.Mutex                 // protects cancelled
+	cancelled       bool                       // whether Cancel() has been called
+}
+
+// RecipeStreamFromChunkStream() returns a pointer to a RecipeStream that allows
+// the client to iterate over each RecipeStep needed to create the blob formed
+// by the chunks in chunkStream.
+func (fscabs *FsCaBlobStore) RecipeStreamFromChunkStream(ctx *context.T, chunkStream localblobstore.ChunkStream) localblobstore.RecipeStream {
+	rs := new(RecipeStream)
+	rs.fscabs = fscabs
+	rs.ctx = ctx
+	rs.chunkStream = chunkStream
+	return rs
+}
+
+// isCancelled() returns whether rs.Cancel() has been called.
+func (rs *RecipeStream) isCancelled() bool {
+	rs.mu.Lock()
+	cancelled := rs.cancelled
+	rs.mu.Unlock()
+	return cancelled
+}
+
+// Advance() stages an item so that it may be retrieved via Value().
+// Returns true iff there is an item to retrieve.  Advance() must be
+// called before Value() is called.  The caller is expected to read
+// until Advance() returns false, or to call Cancel().
+func (rs *RecipeStream) Advance() (ok bool) {
+	if rs.pendingChunk == nil && rs.chunkStream.Advance() {
+		rs.pendingChunk = rs.chunkStream.Value(rs.pendingChunkBuf[:])
+	}
+	for !ok && rs.pendingChunk != nil && !rs.isCancelled() {
+		var err error
+		var loc0 chunkmap.Location
+		loc0, err = rs.fscabs.cm.LookupChunk(rs.ctx, rs.pendingChunk)
+		if err == nil {
+			blobName := hashToFileName(blobDir, loc0.BlobID)
+			var blobDesc *blobDesc
+			if blobDesc, err = rs.fscabs.getBlob(rs.ctx, blobName); err != nil {
+				// The ChunkMap contained a reference to a
+				// deleted blob.  Delete the reference in the
+				// ChunkMap; the next loop iteration will
+				// consider the chunk again.
+				rs.fscabs.cm.DeleteBlob(rs.ctx, loc0.BlobID)
+			} else {
+				rs.fscabs.descUnref(blobDesc)
+				// The chunk is in a known blob.  Combine
+				// contiguous chunks into a single recipe
+				// entry.
+				rs.pendingChunk = nil // consumed
+				for rs.pendingChunk == nil && rs.chunkStream.Advance() {
+					rs.pendingChunk = rs.chunkStream.Value(rs.pendingChunkBuf[:])
+					var loc chunkmap.Location
+					loc, err = rs.fscabs.cm.LookupChunk(rs.ctx, rs.pendingChunk)
+					if err == nil && bytes.Compare(loc0.BlobID, loc.BlobID) == 0 && loc.Offset == loc0.Offset+loc0.Size {
+						loc0.Size += loc.Size
+						rs.pendingChunk = nil // consumed
+					}
+				}
+				rs.step = localblobstore.RecipeStep{Blob: blobName, Offset: loc0.Offset, Size: loc0.Size}
+				ok = true
+			}
+		} else { // The chunk is not in the ChunkMap; yield a single chunk hash.
+			rs.step = localblobstore.RecipeStep{Chunk: rs.pendingChunk}
+			rs.pendingChunk = nil // consumed
+			ok = true
+		}
+	}
+	return ok && !rs.isCancelled()
+}
+
+// Value() returns the item that was staged by Advance().  May panic if
+// Advance() returned false or was not called.  Never blocks.
+func (rs *RecipeStream) Value() localblobstore.RecipeStep {
+	return rs.step
+}
+
+// Err() returns any error encountered by Advance.  Never blocks.
+func (rs *RecipeStream) Err() error {
+	// There are no errors to return here.  The errors encountered in
+	// Advance() are expected and recoverable.
+	return nil
+}
+
+// Cancel() indicates that the client wishes to cease reading from the stream.
+// It causes the next call to Advance() to return false.  Never blocks.
+// It may be called concurrently with other calls on the stream.
+func (rs *RecipeStream) Cancel() {
+	rs.mu.Lock()
+	rs.cancelled = true
+	rs.mu.Unlock()
+	rs.chunkStream.Cancel()
+}
+
 // -----------------------------------------------------------
 
 // gcTemp() attempts to delete files in dirName older than threshold.
@@ -1057,16 +1437,46 @@
 	for caIter.Advance() {
 		caSet[caIter.Value()] = true
 	}
+	err = caIter.Err()
 
-	// Remove from caSet all fragments referenced by extant blobs.
-	blobIter := fscabs.ListBlobIds(ctx)
-	for blobIter.Advance() {
-		var blobDesc *blobDesc
-		if blobDesc, err = fscabs.getBlob(ctx, blobIter.Value()); err == nil {
-			for i := range blobDesc.fragment {
-				delete(caSet, blobDesc.fragment[i].fileName)
+	// cmBlobs maps the names of blobs found in the ChunkMap to their IDs.
+	// (The IDs can be derived from the names; the map is really being used
+	// to record which blobs exist, and the value merely avoids repeated
+	// conversions.)
+	cmBlobs := make(map[string][]byte)
+	if err == nil {
+		// Record all the blobs known to the ChunkMap;
+		bs := fscabs.cm.NewBlobStream(ctx)
+		for bs.Advance() {
+			blobID := bs.Value(nil)
+			cmBlobs[hashToFileName(blobDir, blobID)] = blobID
+		}
+	}
+
+	if err == nil {
+		// Remove from cmBlobs all extant blobs, and remove from
+		// caSet all their fragments.
+		blobIter := fscabs.ListBlobIds(ctx)
+		for blobIter.Advance() {
+			var blobDesc *blobDesc
+			if blobDesc, err = fscabs.getBlob(ctx, blobIter.Value()); err == nil {
+				delete(cmBlobs, blobDesc.name)
+				for i := range blobDesc.fragment {
+					delete(caSet, blobDesc.fragment[i].fileName)
+				}
+				fscabs.descUnref(blobDesc)
 			}
-			fscabs.descUnref(blobDesc)
+		}
+	}
+
+	if err == nil {
+		// Remove all blobs still mentioned in cmBlobs from the ChunkMap;
+		// these are the ones that no longer exist in the blobs directory.
+		for _, blobID := range cmBlobs {
+			err = fscabs.cm.DeleteBlob(ctx, blobID)
+			if err != nil {
+				break
+			}
 		}
 	}
 
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
index 886f5c4..75bc54e 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
@@ -41,3 +41,31 @@
 	// Test it.
 	localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
 }
+
+// This test case tests the incremental transfer of blobs via chunks.
+func TestWritingViaChunks(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	var err error
+
+	// Make a pair of blobstores, each in its own temporary directory.
+	const nBlobStores = 2
+	var testDirName [nBlobStores]string
+	var bs [nBlobStores]localblobstore.BlobStore
+	for i := 0; i != nBlobStores; i++ {
+		testDirName[i], err = ioutil.TempDir("", "localblobstore_test")
+		if err != nil {
+			t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+		}
+		defer os.RemoveAll(testDirName[i])
+
+		bs[i], err = fs_cablobstore.Create(ctx, testDirName[i])
+		if err != nil {
+			t.Fatalf("fs_cablobstore.Create failed: %v", err)
+		}
+	}
+
+	// Test it.
+	localblobstore_testlib.WriteViaChunks(t, ctx, bs)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_test.go b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
index 973a9c8..49beec8 100644
--- a/x/ref/services/syncbase/localblobstore/localblobstore_test.go
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
@@ -41,3 +41,31 @@
 	// Test it.
 	localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
 }
+
+// This test case tests the incremental transfer of blobs via chunks.
+func TestWritingViaChunks(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	var err error
+
+	// Make a pair of blobstores, each in its own temporary directory.
+	const nBlobStores = 2
+	var testDirName [nBlobStores]string
+	var bs [nBlobStores]localblobstore.BlobStore
+	for i := 0; i != nBlobStores; i++ {
+		testDirName[i], err = ioutil.TempDir("", "localblobstore_test")
+		if err != nil {
+			t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+		}
+		defer os.RemoveAll(testDirName[i])
+
+		bs[i], err = fs_cablobstore.Create(ctx, testDirName[i])
+		if err != nil {
+			t.Fatalf("fs_cablobstore.Create failed: %v", err)
+		}
+	}
+
+	// Test it.
+	localblobstore_testlib.WriteViaChunks(t, ctx, bs)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
index 662e964..481bea1 100644
--- a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
@@ -14,6 +14,7 @@
 import "testing"
 
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
 import "v.io/v23/context"
 import "v.io/v23/verror"
 
@@ -59,7 +60,7 @@
 	content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
 	var bw localblobstore.BlobWriter
 	var err error
-	bw, err = bs.NewBlobWriter(ctx)
+	bw, err = bs.NewBlobWriter(ctx, "")
 	if err != nil {
 		t.Errorf("localblobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
 	}
@@ -546,3 +547,276 @@
 	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
 	checkFragments(t, ctx, bs, fragmentMap, testDirName)
 }
+
+// writeBlobFromReader() writes the contents of rd to blobstore bs, as blob
+// "name", or picks a name name if "name" is empty.  It returns the name of the
+// blob.  Errors cause the test to terminate.  Error messages contain the
+// "callSite" value to allow the test to tell which call site is which.
+func writeBlobFromReader(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, name string, rd io.Reader, callSite int) string {
+	var err error
+	var bw localblobstore.BlobWriter
+	if bw, err = bs.NewBlobWriter(ctx, name); err != nil {
+		t.Fatalf("callSite %d: NewBlobWriter failed: %v", callSite, err)
+	}
+	blobName := bw.Name()
+	buf := make([]byte, 8192) // buffer for data read from rd.
+	for i := 0; err == nil; i++ {
+		var n int
+		if n, err = rd.Read(buf); err != nil && err != io.EOF {
+			t.Fatalf("callSite %d: unexpected error from reader: %v", callSite, err)
+		}
+		if n > 0 {
+			if err = bw.AppendFragment(localblobstore.BlockOrFile{Block: buf[:n]}); err != nil {
+				t.Fatalf("callSite %d: BlobWriter.AppendFragment failed: %v", callSite, err)
+			}
+			// Every so often, close without finalizing, and reopen.
+			if (i % 7) == 0 {
+				if err = bw.CloseWithoutFinalize(); err != nil {
+					t.Fatalf("callSite %d: BlobWriter.CloseWithoutFinalize failed: %v", callSite, err)
+				}
+				if bw, err = bs.ResumeBlobWriter(ctx, blobName); err != nil {
+					t.Fatalf("callSite %d: ResumeBlobWriter %q failed: %v", callSite, blobName, err)
+				}
+			}
+		}
+	}
+	if err = bw.Close(); err != nil {
+		t.Fatalf("callSite %d: BlobWriter.Close failed: %v", callSite, err)
+	}
+	return blobName
+}
+
+// checkBlobAgainstReader() verifies that the blob blobName has the same bytes as the reader rd.
+// Errors cause the test to terminate.  Error messages contain the
+// "callSite" value to allow the test to tell which call site is which.
+func checkBlobAgainstReader(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobName string, rd io.Reader, callSite int) {
+	// Open a reader on the blob.
+	var blob_rd io.Reader
+	var blob_err error
+	if blob_rd, blob_err = bs.NewBlobReader(ctx, blobName); blob_err != nil {
+		t.Fatalf("callSite %d: NewBlobReader on %q failed: %v", callSite, blobName, blob_err)
+	}
+
+	// Variables for reading the two streams, indexed by "reader" and "blob".
+	type stream struct {
+		name string
+		rd   io.Reader // Reader for this stream
+		buf  []byte    // buffer for data
+		i    int       // bytes processed within current buffer
+		n    int       // valid bytes in current buffer
+		err  error     // error, or nil
+	}
+
+	s := [2]stream{
+		{name: "reader", rd: rd, buf: make([]byte, 8192)},
+		{name: blobName, rd: blob_rd, buf: make([]byte, 8192)},
+	}
+
+	// Descriptive names for the two elements of s, when we aren't treating them the same.
+	reader := &s[0]
+	blob := &s[1]
+
+	var pos int // position within file, for error reporting.
+
+	for x := 0; x != 2; x++ {
+		s[x].n, s[x].err = s[x].rd.Read(s[x].buf)
+		s[x].i = 0
+	}
+	for blob.n != 0 && reader.n != 0 {
+		for reader.i != reader.n && blob.i != blob.n && reader.buf[reader.i] == blob.buf[blob.i] {
+			pos++
+			blob.i++
+			reader.i++
+		}
+		if reader.i != reader.n && blob.i != blob.n {
+			t.Fatalf("callSite %d: BlobStore %q: BlobReader on blob %q and rd reader generated different bytes at position %d: 0x%x vs 0x%x",
+				callSite, bs.Root(), blobName, pos, reader.buf[reader.i], blob.buf[blob.i])
+		}
+		for x := 0; x != 2; x++ { // read more data from each reader, if needed
+			if s[x].i == s[x].n {
+				s[x].i = 0
+				s[x].n = 0
+				if s[x].err == nil {
+					s[x].n, s[x].err = s[x].rd.Read(s[x].buf)
+				}
+			}
+		}
+	}
+	for x := 0; x != 2; x++ {
+		if s[x].err != io.EOF {
+			t.Fatalf("callSite %d: %s got error %v", callSite, s[x].name, s[x].err)
+		}
+		if s[x].n != 0 {
+			t.Fatalf("callSite %d: %s is longer than %s", callSite, s[x].name, s[1-x].name)
+		}
+	}
+}
+
+// checkBlobAgainstReader() verifies that the blob blobName has the same chunks
+// (according to BlobChunkStream) as a chunker applied to the reader rd.
+// Errors cause the test to terminate.  Error messages contain the
+// "callSite" value to allow the test to tell which call site is which.
+func checkBlobChunksAgainstReader(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobName string, rd io.Reader, callSite int) {
+	buf := make([]byte, 8192) // buffer used to hold data from the chunk stream from rd.
+	rawChunks := chunker.NewStream(ctx, &chunker.DefaultParam, rd)
+	cs := bs.BlobChunkStream(ctx, blobName)
+	pos := 0 // byte position within the blob, to be retported in error messages
+	i := 0   // chunk index, to be reported in error messages
+	rawMore, more := rawChunks.Advance(), cs.Advance()
+	for rawMore && more {
+		c := rawChunks.Value()
+		rawChunk := md5.Sum(rawChunks.Value())
+		chunk := cs.Value(buf)
+		if bytes.Compare(rawChunk[:], chunk) != 0 {
+			t.Errorf("raw random stream and chunk record for blob %q have different chunk %d:\n\t%v\nvs\n\t%v\n\tpos %d\n\tlen %d\n\tc %v",
+				blobName, i, rawChunk, chunk, pos, len(c), c)
+		}
+		pos += len(c)
+		i++
+		rawMore, more = rawChunks.Advance(), cs.Advance()
+	}
+	if rawMore {
+		t.Fatalf("callSite %d: blob %q has fewer chunks than raw stream", callSite, blobName)
+	}
+	if more {
+		t.Fatalf("callSite %d: blob %q has more chunks than raw stream", callSite, blobName)
+	}
+	if rawChunks.Err() != nil {
+		t.Fatalf("callSite %d: error reading raw chunk stream: %v", callSite, rawChunks.Err())
+	}
+	if cs.Err() != nil {
+		t.Fatalf("callSite %d: error reading chunk stream for blob %q; %v", callSite, blobName, cs.Err())
+	}
+}
+
+// WriteViaChunks() tests that a large blob in one blob store can be transmitted
+// to another incrementally, without transferring chunks already in the other blob store.
+func WriteViaChunks(t *testing.T, ctx *context.T, bs [2]localblobstore.BlobStore) {
+	// The original blob will be a megabyte.
+	totalLength := 1024 * 1024
+
+	// Write a random blob to bs[0], using seed 1, then check that the
+	// bytes and chunk we get from the blob just written are the same as
+	// those obtained from an identical byte stream.
+	blob0 := writeBlobFromReader(t, ctx, bs[0], "", NewRandReader(1, totalLength, 0, io.EOF), 0)
+	checkBlobAgainstReader(t, ctx, bs[0], blob0, NewRandReader(1, totalLength, 0, io.EOF), 1)
+	checkBlobChunksAgainstReader(t, ctx, bs[0], blob0, NewRandReader(1, totalLength, 0, io.EOF), 2)
+
+	// ---------------------------------------------------------------------
+	// Write into bs[1] a blob that is similar to blob0, but not identical, and check it as above.
+	insertionInterval := 20 * 1024
+	blob1 := writeBlobFromReader(t, ctx, bs[1], "", NewRandReader(1, totalLength, insertionInterval, io.EOF), 3)
+	checkBlobAgainstReader(t, ctx, bs[1], blob1, NewRandReader(1, totalLength, insertionInterval, io.EOF), 4)
+	checkBlobChunksAgainstReader(t, ctx, bs[1], blob1, NewRandReader(1, totalLength, insertionInterval, io.EOF), 5)
+
+	// ---------------------------------------------------------------------
+	// Count the number of chunks, and the number of steps in the recipe
+	// for copying blob0 from bs[0] to bs[1].  We expect the that the
+	// former to be significantly bigger than the latter, because the
+	// insertionInterval is significantly larger than the expected chunk
+	// size.
+	cs := bs[0].BlobChunkStream(ctx, blob0)          // Stream of chunks in blob0
+	rs := bs[1].RecipeStreamFromChunkStream(ctx, cs) // Recipe from bs[1]
+
+	recipeLen := 0
+	chunkCount := 0
+	for rs.Advance() {
+		step := rs.Value()
+		if step.Chunk != nil {
+			chunkCount++
+		}
+		recipeLen++
+	}
+	if rs.Err() != nil {
+		t.Fatalf("RecipeStream got error: %v", rs.Err())
+	}
+
+	cs = bs[0].BlobChunkStream(ctx, blob0) // Get the original chunk count.
+	origChunkCount := 0
+	for cs.Advance() {
+		origChunkCount++
+	}
+	if cs.Err() != nil {
+		t.Fatalf("ChunkStream got error: %v", cs.Err())
+	}
+	if origChunkCount < chunkCount*5 {
+		t.Errorf("expected fewer chunks in repipe: recipeLen %d  chunkCount %d  origChunkCount %d\n",
+			recipeLen, chunkCount, origChunkCount)
+	}
+
+	// Copy blob0 from bs[0] to bs[1], using chunks from blob1 (already in bs[1]) where possible.
+	cs = bs[0].BlobChunkStream(ctx, blob0) // Stream of chunks in blob0
+	// In a real application, at this point the stream cs would be sent to the device with bs[1].
+	rs = bs[1].RecipeStreamFromChunkStream(ctx, cs) // Recipe from bs[1]
+	// Write blob with known blob name.
+	var bw localblobstore.BlobWriter
+	var err error
+	if bw, err = bs[1].NewBlobWriter(ctx, blob0); err != nil {
+		t.Fatalf("bs[1].NewBlobWriter yields error: %v", err)
+	}
+	var br localblobstore.BlobReader
+	const maxFragment = 1024 * 1024
+	blocks := make([]localblobstore.BlockOrFile, maxFragment/chunker.DefaultParam.MinChunk)
+	for gotStep := rs.Advance(); gotStep; {
+		step := rs.Value()
+		if step.Chunk == nil {
+			// This part of the blob can be read from an existing blob locally (at bs[1]).
+			if err = bw.AppendBlob(step.Blob, step.Size, step.Offset); err != nil {
+				t.Fatalf("AppendBlob(%v) yields error: %v", step, err)
+			}
+			gotStep = rs.Advance()
+		} else {
+			var fragmentSize int64
+			// In a real application, the sequence of chunk hashes
+			// in recipe steps would be communicated back to bs[0],
+			// which then finds the associated chunks.
+			var b int
+			for b = 0; gotStep && step.Chunk != nil && fragmentSize+chunker.DefaultParam.MaxChunk < maxFragment; b++ {
+				var loc localblobstore.Location
+				if loc, err = bs[0].LookupChunk(ctx, step.Chunk); err != nil {
+					t.Fatalf("bs[0] unexpectedly does not have chunk %v", step.Chunk)
+				}
+				if br != nil && br.Name() != loc.BlobName { // Close blob if we need a different one.
+					if err = br.Close(); err != nil {
+						t.Fatalf("unexpected error in BlobReader.Close(): %v", err)
+					}
+					br = nil
+				}
+				if br == nil { // Open blob if needed.
+					if br, err = bs[0].NewBlobReader(ctx, loc.BlobName); err != nil {
+						t.Fatalf("unexpected failure to create BlobReader on %q: %v", loc.BlobName, err)
+					}
+				}
+				if loc.Size > chunker.DefaultParam.MaxChunk {
+					t.Fatalf("chunk exceeds max chunk size: %d vs %d", loc.Size, chunker.DefaultParam.MaxChunk)
+				}
+				fragmentSize += loc.Size
+				if blocks[b].Block == nil {
+					blocks[b].Block = make([]byte, chunker.DefaultParam.MaxChunk)
+				}
+				blocks[b].Block = blocks[b].Block[:loc.Size]
+				var i int
+				var n int64
+				for n = int64(0); n != loc.Size; n += int64(i) {
+					if i, err = br.ReadAt(blocks[b].Block[n:loc.Size], n+loc.Offset); err != nil && err != io.EOF {
+						t.Fatalf("ReadAt on %q failed: %v", br.Name(), err)
+					}
+				}
+				if gotStep = rs.Advance(); gotStep {
+					step = rs.Value()
+				}
+			}
+			if err = bw.AppendFragment(blocks[:b]...); err != nil {
+				t.Fatalf("AppendFragment on %q failed: %v", bw.Name(), err)
+			}
+		}
+	}
+	if err = bw.Close(); err != nil {
+		t.Fatalf("BlobWriter.Close on %q failed: %v", bw.Name(), err)
+	}
+
+	// Check that the transferred blob in bs[1] is the same as the original
+	// stream used to make the blob in bs[0].
+	checkBlobAgainstReader(t, ctx, bs[1], blob0, NewRandReader(1, totalLength, 0, io.EOF), 6)
+	checkBlobChunksAgainstReader(t, ctx, bs[1], blob0, NewRandReader(1, totalLength, 0, io.EOF), 7)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/randreader.go b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/randreader.go
new file mode 100644
index 0000000..85e32d3
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/randreader.go
@@ -0,0 +1,53 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package localblobstore_testlib
+
+import "math/rand"
+
+// A RandReader contains a pointer to a rand.Read, and a size limit.  Its
+// pointers implement the Read() method from io.Reader, which yields bytes
+// obtained from the random number generator.
+type RandReader struct {
+	rand           *rand.Rand // Source of random bytes.
+	pos            int        // Number of bytes read.
+	limit          int        // Max number of bytes that may be read.
+	insertInterval int        // If non-zero, number of bytes between insertions of zero bytes.
+	eofErr         error      // error to be returned at the end of the stream
+}
+
+// NewRandReader() returns a new RandReader with the specified seed and size limit.
+// It yields eofErr when the end of the stream is reached.
+// If insertInterval is non-zero, a zero byte is inserted into the stream every
+// insertInterval bytes, before resuming getting bytes from the random number
+// generator.
+func NewRandReader(seed int64, limit int, insertInterval int, eofErr error) *RandReader {
+	r := new(RandReader)
+	r.rand = rand.New(rand.NewSource(seed))
+	r.limit = limit
+	r.insertInterval = insertInterval
+	r.eofErr = eofErr
+	return r
+}
+
+// Read() implements the io.Reader Read() method for *RandReader.
+func (r *RandReader) Read(buf []byte) (n int, err error) {
+	// Generate bytes up to the end of the stream, or the end of the buffer.
+	max := r.limit - r.pos
+	if len(buf) < max {
+		max = len(buf)
+	}
+	for ; n != max; n++ {
+		if r.insertInterval == 0 || (r.pos%r.insertInterval) != 0 {
+			buf[n] = byte(r.rand.Int31n(256))
+		} else {
+			buf[n] = 0
+		}
+		r.pos++
+	}
+	if r.pos == r.limit {
+		err = r.eofErr
+	}
+	return n, err
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go b/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go
new file mode 100644
index 0000000..c3f11d4
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_transfer_test.go
@@ -0,0 +1,368 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Example code for transferring a blob from one device to another.
+// See the simulateResumption constant to choose whether to simulate a full
+// transfer or a resumed one.
+package localblobstore_test
+
+import "bytes"
+import "fmt"
+import "io"
+import "io/ioutil"
+import "math/rand"
+import "os"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
+import "v.io/v23/context"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
+
+// simulateResumption tells the receiver whether to simulate having a partial
+// blob before blob transfer.
+const simulateResumption = true
+
+// createBlobStore() returns a new BlobStore, and the name of the directory
+// used to implement it.
+func createBlobStore(ctx *context.T) (bs localblobstore.BlobStore, dirName string) {
+	var err error
+	if dirName, err = ioutil.TempDir("", "localblobstore_transfer_test"); err != nil {
+		panic(err)
+	}
+	if bs, err = fs_cablobstore.Create(ctx, dirName); err != nil {
+		panic(err)
+	}
+	return bs, dirName
+}
+
+// createBlob writes a blob to bs of k 32kByte blocks drawn from a determinstic
+// but arbitrary random stream, starting at block offset within that stream.
+// Returns its name, which is "blob" if non-empty, and chosen arbitrarily otherwise.
+// The blob is finalized iff "complete" is true.
+func createBlob(ctx *context.T, bs localblobstore.BlobStore, blob string, complete bool, offset int, count int) string {
+	var bw localblobstore.BlobWriter
+	var err error
+	if bw, err = bs.NewBlobWriter(ctx, blob); err != nil {
+		panic(err)
+	}
+	blob = bw.Name()
+	var buffer [32 * 1024]byte
+	block := localblobstore.BlockOrFile{Block: buffer[:]}
+	r := rand.New(rand.NewSource(1)) // Always seed with 1 for repeatability.
+	for i := 0; i != offset+count; i++ {
+		for b := 0; b != len(buffer); b++ {
+			buffer[b] = byte(r.Int31n(256))
+		}
+		if i >= offset {
+			if err = bw.AppendFragment(block); err != nil {
+				panic(err)
+			}
+		}
+	}
+	if complete {
+		err = bw.Close()
+	} else {
+		err = bw.CloseWithoutFinalize()
+	}
+	if err != nil {
+		panic(err)
+	}
+	return blob
+}
+
+// A channelChunkStream turns a channel of chunk hashes into a ChunkStream.
+type channelChunkStream struct {
+	channel <-chan []byte
+	ok      bool
+	value   []byte
+}
+
+// newChannelChunkStream returns a ChunkStream, given a channel containing the
+// relevant chunk hashes.
+func newChannelChunkStream(ch <-chan []byte) localblobstore.ChunkStream {
+	return &channelChunkStream{channel: ch, ok: true}
+}
+
+// The following are the standard ChunkStream methods.
+func (cs *channelChunkStream) Advance() bool {
+	if cs.ok {
+		cs.value, cs.ok = <-cs.channel
+	}
+	return cs.ok
+}
+func (cs *channelChunkStream) Value(buf []byte) []byte { return cs.value }
+func (cs *channelChunkStream) Err() error              { return nil }
+func (cs *channelChunkStream) Cancel()                 {}
+
+// Example_blobTransfer() demonstrates how to transfer a blob incrementally
+// from one device's blob store to another.  In this code, the communication
+// between sender and receiver is modelled with Go channels.
+func Example_blobTransfer() {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	// ----------------------------------------------
+	// Channels used to send chunk hashes to receiver always end in
+	// ToSender or ToReceiver.
+	type blobData struct {
+		name     string
+		size     int64
+		checksum []byte
+	}
+	blobDataToReceiver := make(chan blobData)  // indicate basic data for blob
+	needChunksToSender := make(chan bool)      // indicate receiver does not have entire blob
+	chunkHashesToReceiver := make(chan []byte) // for initial trasfer of chunk hashes
+	chunkHashesToSender := make(chan []byte)   // to report which chunks receiver needs
+	chunksToReceiver := make(chan []byte)      // to report which chunks receiver needs
+
+	sDone := make(chan bool) // closed when sender done
+	rDone := make(chan bool) // closed when receiver done
+
+	// ----------------------------------------------
+	// The sender.
+	go func(ctx *context.T,
+		blobDataToReceiver chan<- blobData,
+		needChunksToSender <-chan bool,
+		chunkHashesToReceiver chan<- []byte,
+		chunkHashesToSender <-chan []byte,
+		chunksToReceiver chan<- []byte,
+		done chan<- bool) {
+
+		defer close(done)
+		var err error
+
+		bsS, bsSDir := createBlobStore(ctx)
+		defer os.RemoveAll(bsSDir)
+
+		blob := createBlob(ctx, bsS, "", true, 0, 32) // Create a 1M blob at the sender.
+
+		// 1. Send basic blob data to receiver.
+		var br localblobstore.BlobReader
+		if br, err = bsS.NewBlobReader(ctx, blob); err != nil {
+			panic(err)
+		}
+		blobDataToReceiver <- blobData{name: blob, size: br.Size(), checksum: br.Hash()}
+		br.Close()
+		close(blobDataToReceiver)
+
+		// 3. Get indication from receiver of whether it needs blob.
+		needChunks := <-needChunksToSender
+
+		if !needChunks { // Receiver has blob; done.
+			return
+		}
+
+		// 4. Send the chunk hashes to the receiver.  This proceeds concurrently
+		//    with the step below.
+		go func(ctx *context.T, blob string, chunkHashesToReceiver chan<- []byte) {
+			cs := bsS.BlobChunkStream(ctx, blob)
+			for cs.Advance() {
+				chunkHashesToReceiver <- cs.Value(nil)
+			}
+			if cs.Err() != nil {
+				panic(cs.Err())
+			}
+			close(chunkHashesToReceiver)
+		}(ctx, blob, chunkHashesToReceiver)
+
+		// 7. Get needed chunk hashes from receiver, find the relevant
+		//    data, and send it back to the receiver.
+		var cbr localblobstore.BlobReader // Cached read handle on most-recent-read blob, or nil
+		// Given chunk hash h from chunkHashesToSender, send chunk to chunksToReceiver.
+		for h := range chunkHashesToSender {
+			loc, err := bsS.LookupChunk(ctx, h)
+			for err == nil && (cbr == nil || cbr.Name() != loc.BlobName) {
+				if cbr != nil && cbr.Name() != loc.BlobName {
+					cbr.Close()
+					cbr = nil
+				}
+				if cbr == nil {
+					if cbr, err = bsS.NewBlobReader(ctx, loc.BlobName); err != nil {
+						bsS.GC(ctx) // A partially-deleted blob may be confusing things.
+						loc, err = bsS.LookupChunk(ctx, h)
+					}
+				}
+			}
+			var i int = 1
+			var n int64
+			buffer := make([]byte, loc.Size) // buffer for current chunk
+			for n = int64(0); n != loc.Size && i != 0 && err == nil; n += int64(i) {
+				if i, err = cbr.ReadAt(buffer[n:loc.Size], n+loc.Offset); err == io.EOF {
+					err = nil // EOF is expected
+				}
+			}
+			if n == loc.Size { // Got chunk.
+				chunksToReceiver <- buffer[:loc.Size]
+			}
+			if err != nil {
+				break
+			}
+		}
+		close(chunksToReceiver)
+		if cbr != nil {
+			cbr.Close()
+		}
+
+	}(ctx, blobDataToReceiver, needChunksToSender, chunkHashesToReceiver, chunkHashesToSender, chunksToReceiver, sDone)
+
+	// ----------------------------------------------
+	// The receiver.
+	go func(ctx *context.T,
+		blobDataToReceiver <-chan blobData,
+		needChunksToSender chan<- bool,
+		chunkHashesToReceiver <-chan []byte,
+		chunkHashesToSender chan<- []byte,
+		chunksToReceiver <-chan []byte,
+		done chan<- bool) {
+
+		defer close(done)
+		var err error
+
+		bsR, bsRDir := createBlobStore(ctx)
+		defer os.RemoveAll(bsRDir)
+
+		// 2. Receive basic blob data from sender.
+		blobInfo := <-blobDataToReceiver
+
+		if simulateResumption {
+			// Write a fraction of the (unfinalized) blob on the receiving side
+			// to check that the transfer process can resume a partial blob.
+			createBlob(ctx, bsR, blobInfo.name, false, 0, 10)
+		}
+
+		// 3. Tell sender whether the recevier already has the complete
+		//    blob.
+		needChunks := true
+		var br localblobstore.BlobReader
+		if br, err = bsR.NewBlobReader(ctx, blobInfo.name); err == nil {
+			if br.IsFinalized() {
+				if len(br.Hash()) == len(blobInfo.checksum) && bytes.Compare(br.Hash(), blobInfo.checksum) != 0 {
+					panic("receiver has a finalized blob with same name but different hash")
+				}
+				needChunks = false // The receiver already has the blob.
+			}
+			br.Close()
+		}
+		needChunksToSender <- needChunks
+		close(needChunksToSender)
+
+		if !needChunks { // Receiver has blob; done.
+			return
+		}
+
+		// 5. Receive the chunk hashes from the sender, and turn them
+		//    into a recipe.
+		cs := newChannelChunkStream(chunkHashesToReceiver)
+		rs := bsR.RecipeStreamFromChunkStream(ctx, cs)
+
+		// 6. The following thread sends the chunk hashes that the
+		//    receiver does not have are to the sender.  It also makes
+		//    a duplicate of the stream on the channel rsCopy.  The
+		//    buffering in rsCopy allows the receiver to put several
+		//    chunks into a fragment.
+		rsCopy := make(chan localblobstore.RecipeStep, 100) // A buffered copy of the rs stream.
+		go func(ctx *context.T, rs localblobstore.RecipeStream, rsCopy chan<- localblobstore.RecipeStep, chunkHashesToSender chan<- []byte) {
+			for rs.Advance() {
+
+				step := rs.Value()
+				if step.Chunk != nil { // Data must be fetched from sender.
+					chunkHashesToSender <- step.Chunk
+				}
+				rsCopy <- step
+			}
+			close(chunkHashesToSender)
+			close(rsCopy)
+		}(ctx, rs, rsCopy, chunkHashesToSender)
+
+		// 8. The following thread splices the chunks from the sender
+		//    (on chunksToReceiver) into the recipe stream copy
+		//    (rsCopy) to generate a full recipe stream (rsFull) in
+		//    which chunks are actual data, rather than just hashes.
+		rsFull := make(chan localblobstore.RecipeStep) // A recipe stream containing chunk data, not just hashes.
+		go func(ctx *context.T, rsCopy <-chan localblobstore.RecipeStep, chunksToReceiver <-chan []byte, rsFull chan<- localblobstore.RecipeStep) {
+			var ok bool
+			for step := range rsCopy {
+				if step.Chunk != nil { // Data must be fetched from sender.
+					if step.Chunk, ok = <-chunksToReceiver; !ok {
+						break
+					}
+				}
+				rsFull <- step
+			}
+			close(rsFull)
+		}(ctx, rsCopy, chunksToReceiver, rsFull)
+
+		// 9. Write the blob using the recipe.
+		var chunksTransferred int
+		const fragmentThreshold = 1024 * 1024 // Try to write on-disc fragments fragments at least this big.
+		var ignoreBytes int64
+		var bw localblobstore.BlobWriter
+		if bw, err = bsR.ResumeBlobWriter(ctx, blobInfo.name); err != nil {
+			bw, err = bsR.NewBlobWriter(ctx, blobInfo.name)
+		} else {
+			ignoreBytes = bw.Size()
+		}
+		if err == nil {
+			var fragment []localblobstore.BlockOrFile
+			var fragmentSize int64
+			for step := range rsFull {
+				if step.Chunk == nil { // Data can be obtained from local blob.
+					if ignoreBytes >= step.Size { // Ignore chunks we already have.
+						ignoreBytes -= step.Size
+					} else {
+						err = bw.AppendBlob(step.Blob, step.Size-ignoreBytes, step.Offset+ignoreBytes)
+						ignoreBytes = 0
+					}
+				} else if ignoreBytes >= int64(len(step.Chunk)) { // Ignoer chunks we already have.
+					ignoreBytes -= int64(len(step.Chunk))
+				} else { // Data is from a chunk send by the sender.
+					chunksTransferred++
+					fragment = append(fragment, localblobstore.BlockOrFile{Block: step.Chunk[ignoreBytes:]})
+					fragmentSize += int64(len(step.Chunk)) - ignoreBytes
+					ignoreBytes = 0
+					if fragmentSize > fragmentThreshold {
+						err = bw.AppendFragment(fragment...)
+						fragment = fragment[:0]
+						fragmentSize = 0
+					}
+				}
+				if err != nil {
+					break
+				}
+			}
+			if err == nil && len(fragment) != 0 {
+				err = bw.AppendFragment(fragment...)
+			}
+			if err2 := bw.Close(); err == nil {
+				err = err2
+			}
+			if err != nil {
+				panic(err)
+			}
+		}
+
+		// 10. Verify that the blob was written correctly.
+		if br, err = bsR.NewBlobReader(ctx, blobInfo.name); err != nil {
+			panic(err)
+		}
+		if br.Size() != blobInfo.size {
+			panic("transferred blob has wrong size")
+		}
+		if len(br.Hash()) != len(blobInfo.checksum) || bytes.Compare(br.Hash(), blobInfo.checksum) != 0 {
+			panic("transferred blob has wrong checksum")
+		}
+		if err = br.Close(); err != nil {
+			panic(err)
+		}
+		fmt.Printf("%d chunks transferred\n", chunksTransferred)
+	}(ctx, blobDataToReceiver, needChunksToSender, chunkHashesToReceiver, chunkHashesToSender, chunksToReceiver, rDone)
+
+	// ----------------------------------------------
+	// Wait for sender and receiver to finish.
+	_ = <-sDone
+	_ = <-rDone
+
+	// Output: 635 chunks transferred
+}
diff --git a/x/ref/services/syncbase/localblobstore/model.go b/x/ref/services/syncbase/localblobstore/model.go
index b1a4565..16af4cf 100644
--- a/x/ref/services/syncbase/localblobstore/model.go
+++ b/x/ref/services/syncbase/localblobstore/model.go
@@ -4,6 +4,50 @@
 
 // Package localblobstore is the interface to a local blob store.
 // Implementations include fs_cablobstore.
+//
+// Expected use
+// ============
+// These examples assume that bs, bsS (sender) and bsR (receiver) are blobstores.
+//
+// Writing blobs
+//      bw, err := bs.NewBlobWriter(ctx, "")  // For a new blob, implementation picks blob name.
+//      if err == nil {
+//		blobName := bw.Name()  // Get name the implementation picked.
+//   	  	... use bw.AppendFragment() to append data to the blob...
+//   	  	... and/or bw.AppendBlob() to append data that's in another existing blob...
+//        	err = bw.Close()
+//   	}
+//
+// Resume writing a blob that was partially written due to a crash (not yet finalized).
+//	bw, err := bs.ResumeBlobWriter(ctx, name)
+//	if err == nil {
+//		size := bw.Size() // The store has this many bytes from the blob.
+//		... write the remaining data using bwAppendFragment() and/or bw.AppendBlob()...
+//		err = bw.Close()
+//	}
+//
+// Reading blobs
+//	br, err := bs.NewBlobReader(ctx, name)
+//	if err == nil {
+//		... read bytes with br.ReadAt() or br.Read(), perhas with br.Seek()...
+//		err = br.Close()
+//	}
+//
+// Transferring blobs from one store to another:
+// See example in localblobstore_transfer_test.go
+// Summary:
+// - The sender sends the chunksum of the blob from BlobReader's Hash().
+// - The receiver checks whether it already has the blob, with the same
+//   checksum.
+// - If the receiver does not have the blob, the sender sends the list of chunk
+//   hashes in the blob using BlobChunkStream().
+// - The receiver uses RecipeStreamFromChunkStream() with the chunk hash stream
+//   from the sender, and tells the sender the chunk hashes of the chunks it
+//   needs.
+// - The sender uses LookupChunk() to find the data for each chunk the receiver
+//   needs, and sends it to the receiver.
+// - The receiver applies the recipe steps, with the actual chunkj data from
+//   the sender and its own local data.
 package localblobstore
 
 import "v.io/v23/context"
@@ -17,12 +61,16 @@
 	NewBlobReader(ctx *context.T, blobName string) (br BlobReader, err error)
 
 	// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
-	// a newly created blob name, which can be found using the Name()
-	// method.  BlobWriters should not be used concurrently by multiple
-	// threads.  The returned handle should be closed with either the
-	// Close() or CloseWithoutFinalize() method to avoid leaking file
-	// handles.
-	NewBlobWriter(ctx *context.T) (bw BlobWriter, err error)
+	// a newly created blob.  If "name" is non-empty, its is used to name
+	// the blob, and it must be in the format of a name returned by this
+	// interface (probably by another instance on another device).
+	// Otherwise, otherwise a new name is created, which can be found using
+	// the Name() method.  It is an error to attempt to overwrite a blob
+	// that already exists in this blob store.  BlobWriters should not be
+	// used concurrently by multiple threads.  The returned handle should
+	// be closed with either the Close() or CloseWithoutFinalize() method
+	// to avoid leaking file handles.
+	NewBlobWriter(ctx *context.T, name string) (bw BlobWriter, err error)
 
 	// ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on
 	// an old, but unfinalized blob name.
@@ -36,6 +84,31 @@
 	// other calls to GC(), and with uses of BlobReaders and BlobWriters.
 	GC(ctx *context.T) error
 
+	// BlobChunkStream() returns a ChunkStream that can be used to read the
+	// ordered list of content hashes of chunks in blob blobName.  It is
+	// expected that this list will be presented to
+	// RecipeStreamFromChunkStream() on another device, to create a recipe
+	// for transmitting the blob efficiently to that other device.
+	BlobChunkStream(ctx *context.T, blobName string) ChunkStream
+
+	// RecipeStreamFromChunkStream() returns a pointer to a RecipeStream
+	// that allows the client to iterate over each RecipeStep needed to
+	// create the blob formed by the chunks in chunkStream.  It is expected
+	// that this will be called on a receiving device, and be given a
+	// ChunkStream from a sending device, to yield a recipe for efficient
+	// chunk transfer.  RecipeStep values with non-nil Chunk fields need
+	// the chunk from the sender; once the data is returned is can be
+	// written with BlobWriter.AppendFragment().  Those with blob
+	// references can be written locally with BlobWriter.AppendBlob().
+	RecipeStreamFromChunkStream(ctx *context.T, chunkStream ChunkStream) RecipeStream
+
+	// LookupChunk() returns the location of a chunk with the specified chunk
+	// hash within the store.  It is expected that chunk hashes from
+	// RecipeStep entries from RecipeStreamFromChunkStream() will be mapped
+	// to blob Location values on the sender for transmission to the
+	// receiver.
+	LookupChunk(ctx *context.T, chunkHash []byte) (loc Location, err error)
+
 	// ListBlobIds() returns an iterator that can be used to enumerate the
 	// blobs in a BlobStore.  Expected use is:
 	//
@@ -46,7 +119,7 @@
 	//	if iter.Err() != nil {
 	//	  // The loop terminated early due to an error.
 	//	}
-	ListBlobIds(ctx *context.T) (iter Iter)
+	ListBlobIds(ctx *context.T) (iter Stream)
 
 	// ListCAIds() returns an iterator that can be used to enumerate the
 	// content-addressable fragments in a BlobStore.  Expected use is:
@@ -58,12 +131,20 @@
 	//	if iter.Err() != nil {
 	//	  // The loop terminated early due to an error.
 	//	}
-	ListCAIds(ctx *context.T) (iter Iter)
+	ListCAIds(ctx *context.T) (iter Stream)
 
 	// Root() returns the name of the root directory where the BlobStore is stored.
 	Root() string
 }
 
+// A Location describes chunk's location within a blob.  It is returned by
+// BlobStore.LookupChunk().
+type Location struct {
+	BlobName string // name of blob
+	Offset   int64  // byte offset of chunk within blob
+	Size     int64  // size of chunk
+}
+
 // A BlobReader allows a blob to be read using the standard ReadAt(), Read(),
 // and Seek() calls.  A BlobReader can be created with NewBlobReader(), and
 // should be closed with the Close() method to avoid leaking file handles.
@@ -153,18 +234,67 @@
 	Hash() []byte
 }
 
-// A Iter represents an iterator that allows the client to enumerate
-// all the blobs of fragments in a BlobStore.
-type Iter interface {
-	// Advance() stages an item so that it may be retrieved via Value.
-	// Returns true iff there is an item to retrieve.  Advance must be
-	// called before Value is called.
-	Advance() (advanced bool)
+// A Stream represents an iterator that allows the client to enumerate
+// all the blobs or fragments in a BlobStore.
+//
+// The interfaces Stream, ChunkStream, RecipeStream all have four calls,
+// and differ only in the Value() call.
+type Stream interface {
+	// Advance() stages an item so that it may be retrieved via Value().
+	// Returns true iff there is an item to retrieve.  Advance() must be
+	// called before Value() is called.  The caller is expected to read
+	// until Advance() returns false, or to call Cancel().
+	Advance() bool
 
-	// Value() returns the item that was staged by Advance.  May panic if
-	// Advance returned false or was not called.  Never blocks.
+	// Value() returns the item that was staged by Advance().  May panic if
+	// Advance() returned false or was not called.  Never blocks.
 	Value() (name string)
 
 	// Err() returns any error encountered by Advance.  Never blocks.
 	Err() error
+
+	// Cancel() indicates that the client wishes to cease reading from the stream.
+	// It causes the next call to Advance() to return false.  Never blocks.
+	// It may be called concurrently with other calls on the stream.
+	Cancel()
+}
+
+// A ChunkStream represents an iterator that allows the client to enumerate
+// the chunks in a blob.   See the comments for Stream for usage.
+type ChunkStream interface {
+	Advance() bool
+
+	// Value() returns the chunkHash that was staged by Advance().  May
+	// panic if Advance() returned false or was not called.  Never blocks.
+	// The result may share storage with buf[] if it is large enough;
+	// otherwise, a new buffer is allocated.  It is legal to call with
+	// buf==nil.
+	Value(buf []byte) (chunkHash []byte)
+
+	Err() error
+	Cancel()
+}
+
+// A RecipeStep describes one piece of a recipe for making a blob.
+// The step consists either of appending the chunk with content hash Chunk and size Size,
+// or (if Chunk==nil) the Size bytes from Blob, starting at Offset.
+type RecipeStep struct {
+	Chunk  []byte
+	Blob   string
+	Size   int64
+	Offset int64
+}
+
+// A RecipeStream represents an iterator that allows the client to obtain the
+// steps needed to construct a blob with a given ChunkStream, attempting to
+// reuse data in existing blobs.  See the comments for Stream for usage.
+type RecipeStream interface {
+	Advance() bool
+
+	// Value() returns the RecipeStep that was staged by Advance().  May panic if
+	// Advance() returned false or was not called.  Never blocks.
+	Value() RecipeStep
+
+	Err() error
+	Cancel()
 }