v.io/syncbase/services/syncbase/localblobstore/fs_cablobstore: A blob store implementation.

Introduce a local blob store implementation.
This implementation, fs_cablobstore, uses a local file system to store
content-addressed fragments, and blobs are composed of sequences of such
fragments..

Change-Id: I275e8172c4890f5d9a940464c10be10d2e6af500
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
new file mode 100644
index 0000000..8e173d1
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -0,0 +1,1111 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package fs_cablobstore implements a content addressable blob store
+// on top of a file system.  It assumes that either os.Link() or
+// os.Rename() is available.
+package fs_cablobstore
+
+// Internals:
+//   The blobstore consists of a directory with "blob", "cas", and "tmp"
+//   subdirectories.
+//   - "tmp" is used for temporary files that are moved into place via
+//     link()/unlink() or rename(), depending on what's available.
+//   - "cas" contains files whose names are content hashes of the files being
+//     named.  A few slashes are thrown into the name near the front so that no
+//     single directory gets too large.
+//   - "blob" contains files whose names are random numbers.  These names are
+//     visible externally as "blob names".  Again, a few slashes are thrown
+//     into the name near the front so that no single directory gets too large.
+//     Each of these files contains a series of lines of the form:
+//        d <size> <offset> <cas-fragment>
+//     followed optionally by a line of the form:
+//        f <md5-hash>
+//     Each "d" line indicates that the next <size> bytes of the blob appear at
+//     <offset> bytes into <cas-fragment>, which is in the "cas" subtree.  The
+//     "f" line indicates that the blob is "finalized" and gives its complete
+//     md5 hash.  No fragments may be appended to a finalized blob.
+
+import "bufio"
+import "crypto/md5"
+import "fmt"
+import "hash"
+import "io"
+import "io/ioutil"
+import "math/rand"
+import "os"
+import "path/filepath"
+import "strconv"
+import "strings"
+import "sync"
+import "time"
+
+import "v.io/v23/context"
+import "v.io/v23/verror"
+
+const pkgPath = "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
+
+var (
+	errNotADir                = verror.Register(pkgPath+".errNotADir", verror.NoRetry, "{1:}{2:} Not a directory{:_}")
+	errAppendFailed           = verror.Register(pkgPath+".errAppendFailed", verror.NoRetry, "{1:}{2:} fs_cablobstore.Append failed{:_}")
+	errMalformedField         = verror.Register(pkgPath+".errMalformedField", verror.NoRetry, "{1:}{2:} Malformed field in blob specification{:_}")
+	errAlreadyClosed          = verror.Register(pkgPath+".errAlreadyClosed", verror.NoRetry, "{1:}{2:} BlobWriter is already closed{:_}")
+	errBlobAlreadyFinalized   = verror.Register(pkgPath+".errBlobAlreadyFinalized", verror.NoRetry, "{1:}{2:} Blob is already finalized{:_}")
+	errIllegalPositionForRead = verror.Register(pkgPath+".errIllegalPositionForRead", verror.NoRetry, "{1:}{2:} BlobReader: illegal position {3} on Blob of size {4}{:_}")
+	errBadSeekWhence          = verror.Register(pkgPath+".errBadSeekWhence", verror.NoRetry, "{1:}{2:} BlobReader: Bad value for 'whence' in Seek{:_}")
+	errNegativeSeekPosition   = verror.Register(pkgPath+".errNegativeSeekPosition", verror.NoRetry, "{1:}{2:} BlobReader: negative position for Seek: offset {3}, whence {4}{:_}")
+	errBadSizeOrOffset        = verror.Register(pkgPath+".errBadSizeOrOffset", verror.NoRetry, "{1:}{2:} Bad size ({3}) or offset ({4}) in blob {5} (size {6}){:_}")
+	errMalformedBlobHash      = verror.Register(pkgPath+".errMalformedBlobHash", verror.NoRetry, "{1:}{2:} Blob {3} hash malformed hash{:_}")
+	errInvalidBlobName        = verror.Register(pkgPath+".errInvalidBlobName", verror.NoRetry, "{1:}{2:} Invalid blob name {3}{:_}")
+	errCantDeleteBlob         = verror.Register(pkgPath+".errCantDeleteBlob", verror.NoRetry, "{1:}{2:} Can't delete blob {3}{:_}")
+	errBlobDeleted            = verror.Register(pkgPath+".errBlobDeleted", verror.NoRetry, "{1:}{2:} Blob is deleted{:_}")
+	errSizeTooBigForFragment  = verror.Register(pkgPath+".errSizeTooBigForFragment", verror.NoRetry, "{1:}{2:} writing blob {1}, size too big for fragment{:1}")
+)
+
+// For the moment, we disallow others from accessing the tree where blobs are
+// stored.  We could in the future relax this to 0711 or 0755.
+const dirPermissions = 0700
+
+// Subdirectories of the blobstore's tree
+const (
+	blobDir = "blob" // Subdirectory where blobs are indexed by blob id.
+	casDir  = "cas"  // Subdirectory where blobs are indexed by content hash.
+	tmpDir  = "tmp"  // Subdirectory where temporary files are created.
+)
+
+// An FsCaBlobStore represents a simple, content-addressable store.
+type FsCaBlobStore struct {
+	rootName string // The name of the root of the store.
+
+	mu sync.Mutex // Protects fields below, plus
+	// blobDesc.fragment, blobDesc.activeDescIndex,
+	// and blobDesc.refCount.
+	activeDesc []*blobDesc        // The blob descriptors in use by active BlobReaders and BlobWriters.
+	toDelete   []*map[string]bool // Sets of items that active GC threads are about to delete. (Pointers to maps, to allow pointer comparison.)
+}
+
+// hashToFileName() returns the content-addressed name of the data with the
+// specified hash, given its content hash.  Requires len(hash)==16.  An md5
+// hash is suitable.
+func hashToFileName(hash []byte) string {
+	return filepath.Join(casDir,
+		fmt.Sprintf("%02x", hash[0]),
+		fmt.Sprintf("%02x", hash[1]),
+		fmt.Sprintf("%02x", hash[2]),
+		fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+			hash[3],
+			hash[4], hash[5], hash[6], hash[7],
+			hash[8], hash[9], hash[10], hash[11],
+			hash[12], hash[13], hash[14], hash[15]))
+}
+
+// newBlobName() returns a new random name for a blob.
+func newBlobName() string {
+	return filepath.Join(blobDir,
+		fmt.Sprintf("%02x", rand.Int31n(256)),
+		fmt.Sprintf("%02x", rand.Int31n(256)),
+		fmt.Sprintf("%02x", rand.Int31n(256)),
+		fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+			rand.Int31n(256),
+			rand.Int31n(256), rand.Int31n(256), rand.Int31n(256), rand.Int31n(256),
+			rand.Int31n(256), rand.Int31n(256), rand.Int31n(256), rand.Int31n(256),
+			rand.Int31n(256), rand.Int31n(256), rand.Int31n(256), rand.Int31n(256)))
+}
+
+// hashToString() returns a string representation of the hash.
+// Requires len(hash)==16.  An md5 hash is suitable.
+func hashToString(hash []byte) string {
+	return fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+		hash[0], hash[1], hash[2], hash[3],
+		hash[4], hash[5], hash[6], hash[7],
+		hash[8], hash[9], hash[10], hash[11],
+		hash[12], hash[13], hash[14], hash[15])
+}
+
+// stringToHash() converts a string in the format generated by hashToString()
+// to a vector of 16 bytes.  If the string is malformed, the nil slice is
+// returned.
+func stringToHash(s string) []byte {
+	hash := make([]byte, 16, 16)
+	n, err := fmt.Sscanf(s, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+		&hash[0], &hash[1], &hash[2], &hash[3],
+		&hash[4], &hash[5], &hash[6], &hash[7],
+		&hash[8], &hash[9], &hash[10], &hash[11],
+		&hash[12], &hash[13], &hash[14], &hash[15])
+	if n != 16 || err != nil {
+		hash = nil
+	}
+	return hash
+}
+
+// Create() returns a pointer to an FsCaBlobStore stored in the file system at
+// "rootName".  If the directory rootName does not exist, it is created.
+func Create(ctx *context.T, rootName string) (fscabs *FsCaBlobStore, err error) {
+	dir := []string{tmpDir, casDir, blobDir}
+	for i := 0; i != len(dir) && err == nil; i++ {
+		fullName := filepath.Join(rootName, dir[i])
+		os.MkdirAll(fullName, dirPermissions)
+		var fi os.FileInfo
+		fi, err = os.Stat(fullName)
+		if err == nil && !fi.IsDir() {
+			err = verror.New(errNotADir, ctx, fullName)
+		}
+	}
+	if err == nil {
+		fscabs = new(FsCaBlobStore)
+		fscabs.rootName = rootName
+	}
+	return fscabs, err
+}
+
+// Root() returns the name of the root directory where *fscabs is stored.
+func (fscabs *FsCaBlobStore) Root() string {
+	return fscabs.rootName
+}
+
+// DeleteBlob() deletes the named blob from *fscabs.
+func (fscabs *FsCaBlobStore) DeleteBlob(ctx *context.T, blobName string) (err error) {
+	// Disallow deletions of things outside the blob tree, or that may contain "..".
+	// For simplicity, the code currently disallows '.'.
+	if !strings.HasPrefix(blobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
+		err = verror.New(errInvalidBlobName, ctx, blobName)
+	} else {
+		err = os.Remove(filepath.Join(fscabs.rootName, blobName))
+		if err != nil {
+			err = verror.New(errCantDeleteBlob, ctx, blobName, err)
+		}
+	}
+	return err
+}
+
+// -----------------------------------------------------------
+
+// A file encapsulates both an os.File and a bufio.Writer on that file.
+type file struct {
+	fh     *os.File
+	writer *bufio.Writer
+}
+
+// newFile() returns a *file containing fh and a bufio.Writer on that file, if
+// err is nil.
+func newFile(fh *os.File, err error) (*file, error) {
+	var f *file
+	if err == nil {
+		f = new(file)
+		f.fh = fh
+		f.writer = bufio.NewWriter(f.fh)
+	}
+	return f, err
+}
+
+// newTempFile() returns a *file on a new temporary file created in the
+// directory dir.
+func newTempFile(ctx *context.T, dir string) (*file, error) {
+	return newFile(ioutil.TempFile(dir, "newfile"))
+}
+
+// close() flushes buffers (if err==nil initially) and closes the file,
+// returning its name.
+func (f *file) close(ctx *context.T, err error) (string, error) {
+	name := f.fh.Name()
+	// Flush the data out to disc and close the file.
+	if err == nil {
+		err = f.writer.Flush()
+	}
+	if err == nil {
+		err = f.fh.Sync()
+	}
+	err2 := f.fh.Close()
+	if err == nil {
+		err = err2
+	}
+	return name, err
+}
+
+// closeAndRename() calls f.close(), and if err==nil initially and no new
+// errors are seen, renames the file to newName.
+func (f *file) closeAndRename(ctx *context.T, newName string, err error) error {
+	var oldName string
+	oldName, err = f.close(ctx, err)
+	if err == nil { // if temp file written successfully...
+		// Link or rename the file into place, hoping at least one is
+		// supported on this file system.
+		os.MkdirAll(filepath.Dir(newName), dirPermissions)
+		err = os.Link(oldName, newName)
+		if err == nil {
+			os.Remove(oldName)
+		} else {
+			err = os.Rename(oldName, newName)
+		}
+	}
+	if err != nil {
+		os.Remove(oldName)
+	}
+	return err
+}
+
+// -----------------------------------------------------------
+
+// A BlockOrFile represents a vector of bytes, and contains either a data block
+// (as a []byte), or a (file name, size, offset) triple.
+type BlockOrFile struct {
+	Block    []byte // If FileName is empty, the bytes represented.
+	FileName string // If non-empty, the name of the file containing the bytes.
+	Size     int64  // If FileName is non-empty, the number of bytes (or -1 for "all")
+	Offset   int64  // If FileName is non-empty, the offset of the relevant bytes within the file.
+}
+
+// addFragment() ensures that the store *fscabs contains a fragment comprising
+// the catenation of the byte vectors named by item[..].block and the contents
+// of the files named by item[..].filename.  The block field is ignored if
+// fileName!="".  The fragment is not physically added if already present.
+// The fragment is added to the fragment list of the descriptor *desc.
+func (fscabs *FsCaBlobStore) addFragment(ctx *context.T, extHasher hash.Hash,
+	desc *blobDesc, item ...BlockOrFile) (fileName string, size int64, err error) {
+
+	hasher := md5.New()
+	var buf []byte
+	var fileHandleList []*os.File
+
+	// Hash the inputs.
+	for i := 0; i != len(item) && err == nil; i++ {
+		if len(item[i].FileName) != 0 {
+			if buf == nil {
+				buf = make([]byte, 8192, 8192)
+				fileHandleList = make([]*os.File, 0, len(item))
+			}
+			var fileHandle *os.File
+			fileHandle, err = os.Open(filepath.Join(fscabs.rootName, item[i].FileName))
+			if err == nil {
+				fileHandleList = append(fileHandleList, fileHandle)
+				at := item[i].Offset
+				toRead := item[i].Size
+				var haveRead int64
+				for err == nil && (toRead == -1 || haveRead < toRead) {
+					var n int
+					n, err = fileHandle.ReadAt(buf, at)
+					if err == nil {
+						if toRead != -1 && int64(n)+haveRead > toRead {
+							n = int(toRead - haveRead)
+						}
+						haveRead += int64(n)
+						at += int64(n)
+						size += int64(n)
+						hasher.Write(buf[0:n]) // Cannot fail; see Hash interface.
+						extHasher.Write(buf[0:n])
+					}
+				}
+				if err == io.EOF {
+					if toRead == -1 || haveRead == toRead {
+						err = nil // The loop read all that was asked; EOF is a possible outcome.
+					} else { // The loop read less than was asked; request must have been too big.
+						err = verror.New(errSizeTooBigForFragment, ctx, desc.name, item[i].FileName)
+					}
+				}
+			}
+		} else {
+			hasher.Write(item[i].Block) // Cannot fail; see Hash interface.
+			extHasher.Write(item[i].Block)
+			size += int64(len(item[i].Block))
+		}
+	}
+
+	// Compute the hash, and form the file name in the respository.
+	hash := hasher.Sum(nil)
+	relFileName := hashToFileName(hash)
+	absFileName := filepath.Join(fscabs.rootName, relFileName)
+
+	// Add the fragment's name to *desc's fragments so the garbage
+	// collector will not delete it.
+	fscabs.mu.Lock()
+	desc.fragment = append(desc.fragment, blobFragment{
+		pos:      desc.size,
+		size:     size,
+		offset:   0,
+		fileName: relFileName})
+	desc.size += size
+	fscabs.mu.Unlock()
+
+	// If the file does not already exist, ...
+	if _, statErr := os.Stat(absFileName); err == nil && os.IsNotExist(statErr) {
+		// ... try to create it by writing to a temp file and renaming.
+		var t *file
+		t, err = newTempFile(ctx, filepath.Join(fscabs.rootName, tmpDir))
+		if err == nil {
+			// Copy the byte-sequences and input files to the temp file.
+			j := 0
+			for i := 0; i != len(item) && err == nil; i++ {
+				if len(item[i].FileName) != 0 {
+					at := item[i].Offset
+					toRead := item[i].Size
+					var haveRead int64
+					for err == nil && (toRead == -1 || haveRead < toRead) {
+						var n int
+						n, err = fileHandleList[j].ReadAt(buf, at)
+						if err == nil {
+							if toRead != -1 && int64(n)+haveRead > toRead {
+								n = int(toRead - haveRead)
+							}
+							haveRead += int64(n)
+							at += int64(n)
+							_, err = t.writer.Write(buf[0:n])
+						}
+					}
+					if err == io.EOF { // EOF is the expected outcome.
+						err = nil
+					}
+					j++
+				} else {
+					_, err = t.writer.Write(item[i].Block)
+				}
+			}
+			err = t.closeAndRename(ctx, absFileName, err)
+		}
+	} // else file already exists, nothing more to do.
+
+	for i := 0; i != len(fileHandleList); i++ {
+		fileHandleList[i].Close()
+	}
+
+	if err != nil {
+		err = verror.New(errAppendFailed, ctx, fscabs.rootName, err)
+		// Remove the entry added to fragment list above.
+		fscabs.mu.Lock()
+		desc.fragment = desc.fragment[0 : len(desc.fragment)-1]
+		desc.size -= size
+		fscabs.mu.Unlock()
+	}
+
+	return relFileName, size, err
+}
+
+// A blobFragment represents a vector of bytes and its position within a blob.
+type blobFragment struct {
+	pos      int64  // position of this fragment within its containing blob.
+	size     int64  // size of this fragment.
+	offset   int64  // offset within fileName.
+	fileName string // name of file describing this fragment.
+}
+
+// A blobDesc is the in-memory representation of a blob.
+type blobDesc struct {
+	activeDescIndex int // Index into fscabs.activeDesc if refCount>0; under fscabs.mu.
+	refCount        int // Reference count; under fscabs.mu.
+
+	name     string         // Name of the blob.
+	fragment []blobFragment // All the fragements in this blob;
+	// modified under fscabs.mu and in BlobWriter
+	// owner's thread; may be read by GC under
+	// fscabs.mu.
+	size      int64 // Total size of the blob.
+	finalized bool  // Whether the blob has been finalized.
+	// A finalized blob has a valid hash field, and no new bytes may be added
+	// to it.  A well-formed hash has 16 bytes.
+	hash []byte
+}
+
+// isBeingDeleted() returns whether fragment fragName is about to be deleted
+// by the garbage collector.   Requires fscabs.mu held.
+func (fscabs *FsCaBlobStore) isBeingDeleted(fragName string) (beingDeleted bool) {
+	for i := 0; i != len(fscabs.toDelete) && !beingDeleted; i++ {
+		_, beingDeleted = (*(fscabs.toDelete[i]))[fragName]
+	}
+	return beingDeleted
+}
+
+// descRef() increments the reference count of *desc and returns whether
+// successful.  It may fail if the fragments referenced by the descriptor are
+// being deleted by the garbage collector.
+func (fscabs *FsCaBlobStore) descRef(desc *blobDesc) bool {
+	beingDeleted := false
+	fscabs.mu.Lock()
+	if desc.refCount == 0 {
+		// On the first reference, check whether the fragments are
+		// being deleted, and if not, add *desc to the
+		// fscabs.activeDesc vector.
+		for i := 0; i != len(desc.fragment) && !beingDeleted; i++ {
+			beingDeleted = fscabs.isBeingDeleted(desc.fragment[i].fileName)
+		}
+		if !beingDeleted {
+			desc.activeDescIndex = len(fscabs.activeDesc)
+			fscabs.activeDesc = append(fscabs.activeDesc, desc)
+		}
+	}
+	if !beingDeleted {
+		desc.refCount++
+	}
+	fscabs.mu.Unlock()
+	return !beingDeleted
+}
+
+// descUnref() decrements the reference count of *desc if desc!=nil; if that
+// removes the last reference, *desc is removed from the fscabs.activeDesc
+// vector.
+func (fscabs *FsCaBlobStore) descUnref(desc *blobDesc) {
+	if desc != nil {
+		fscabs.mu.Lock()
+		desc.refCount--
+		if desc.refCount < 0 {
+			panic("negative reference count")
+		} else if desc.refCount == 0 {
+			// Remove desc from fscabs.activeDesc by moving the
+			// last entry in fscabs.activeDesc to desc's slot.
+			n := len(fscabs.activeDesc)
+			lastDesc := fscabs.activeDesc[n-1]
+			lastDesc.activeDescIndex = desc.activeDescIndex
+			fscabs.activeDesc[desc.activeDescIndex] = lastDesc
+			fscabs.activeDesc = fscabs.activeDesc[0 : n-1]
+			desc.activeDescIndex = -1
+		}
+		fscabs.mu.Unlock()
+	}
+}
+
+// getBlob() returns the in-memory blob descriptor for the named blob.
+func (fscabs *FsCaBlobStore) getBlob(ctx *context.T, blobName string) (desc *blobDesc, err error) {
+	if !strings.HasPrefix(blobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 {
+		err = verror.New(errInvalidBlobName, ctx, blobName)
+	} else {
+		absBlobName := filepath.Join(fscabs.rootName, blobName)
+		var fh *os.File
+		fh, err = os.Open(absBlobName)
+		if err == nil {
+			var line string
+			desc = new(blobDesc)
+			desc.activeDescIndex = -1
+			desc.name = blobName
+			scanner := bufio.NewScanner(fh)
+			for scanner.Scan() {
+				field := strings.Split(scanner.Text(), " ")
+				if len(field) == 4 && field[0] == "d" {
+					var fragSize int64
+					var fragOffset int64
+					fragSize, err = strconv.ParseInt(field[1], 0, 64)
+					if err == nil {
+						fragOffset, err = strconv.ParseInt(field[2], 0, 64)
+					}
+					if err == nil {
+						// No locking needed here because desc
+						// is newly allocated and not yet passed to descRef().
+						desc.fragment = append(desc.fragment,
+							blobFragment{
+								fileName: field[3],
+								pos:      desc.size,
+								size:     fragSize,
+								offset:   fragOffset})
+					}
+					desc.size += fragSize
+				} else if len(field) == 2 && field[0] == "f" {
+					desc.hash = stringToHash(field[1])
+					desc.finalized = true
+					if desc.hash == nil {
+						err = verror.New(errMalformedBlobHash, ctx, blobName, field[1])
+					}
+				} else if len(field) > 0 && len(field[0]) == 1 && "a" <= field[0] && field[0] <= "z" {
+					// unrecognized line, reserved for extensions: ignore.
+				} else {
+					err = verror.New(errMalformedField, ctx, line)
+				}
+			}
+			err = scanner.Err()
+			fh.Close()
+		}
+	}
+	// Ensure that we return either a properly referenced desc, or nil.
+	if err != nil {
+		desc = nil
+	} else if !fscabs.descRef(desc) {
+		err = verror.New(errBlobDeleted, ctx, blobName)
+		desc = nil
+	}
+	return desc, err
+}
+
+// -----------------------------------------------------------
+
+// A BlobWriter allows a blob to be written.  If a blob has not yet been
+// finalized, it also allows that blob to be extended.  A BlobWriter may be
+// created with NewBlobWriter(), and should be closed with Close() or
+// CloseWithoutFinalize().
+type BlobWriter struct {
+	// The BlobWriter exists within a particular FsCaBlobStore and context.T
+	fscabs *FsCaBlobStore
+	ctx    *context.T
+
+	desc   *blobDesc // Description of the blob being written.
+	f      *file     // The file being written.
+	hasher hash.Hash // Running hash of blob.
+}
+
+// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on a newly
+// created blob name, which can be found using the Name() method.  BlobWriters
+// should not be used concurrently by multiple threads.  The returned handle
+// should be closed with either the Close() or CloseWithoutFinalize() method to
+// avoid leaking file handles.
+func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (bw *BlobWriter, err error) {
+	newName := newBlobName()
+	var f *file
+	fileName := filepath.Join(fscabs.rootName, newName)
+	os.MkdirAll(filepath.Dir(fileName), dirPermissions)
+	f, err = newFile(os.Create(fileName))
+	if err == nil {
+		bw = new(BlobWriter)
+		bw.fscabs = fscabs
+		bw.ctx = ctx
+		bw.desc = new(blobDesc)
+		bw.desc.activeDescIndex = -1
+		bw.desc.name = newName
+		bw.f = f
+		bw.hasher = md5.New()
+		if !fscabs.descRef(bw.desc) {
+			// Can't happen; descriptor refers to no fragments.
+			panic(verror.New(errBlobDeleted, ctx, bw.desc.name))
+		}
+	}
+	return bw, err
+}
+
+// ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on an
+// old, but unfinalized blob name.
+func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (bw *BlobWriter, err error) {
+	bw = new(BlobWriter)
+	bw.fscabs = fscabs
+	bw.ctx = ctx
+	bw.desc, err = bw.fscabs.getBlob(ctx, blobName)
+	if err == nil && bw.desc.finalized {
+		err = verror.New(errBlobAlreadyFinalized, ctx, blobName)
+		bw = nil
+	} else {
+		fileName := filepath.Join(fscabs.rootName, bw.desc.name)
+		bw.f, err = newFile(os.OpenFile(fileName, os.O_WRONLY|os.O_APPEND, 0666))
+		bw.hasher = md5.New()
+		// Add the existing fragments to the running hash.
+		// The descRef's ref count is incremented here to compensate
+		// for the decrement it will receive in br.Close(), below.
+		if !fscabs.descRef(bw.desc) {
+			// Can't happen; descriptor's ref count was already
+			// non-zero.
+			panic(verror.New(errBlobDeleted, ctx, fileName))
+		}
+		br := fscabs.blobReaderFromDesc(ctx, bw.desc)
+		buf := make([]byte, 8192, 8192)
+		for err == nil {
+			var n int
+			n, err = br.Read(buf)
+			bw.hasher.Write(buf[0:n])
+		}
+		br.Close()
+		if err == io.EOF { // EOF is expected.
+			err = nil
+		}
+	}
+	return bw, err
+}
+
+// Close() finalizes *bw, and indicates that the client will perform no further
+// append operations on *bw.  Any internal open file handles are closed.
+func (bw *BlobWriter) Close() (err error) {
+	if bw.f == nil {
+		err = verror.New(errAlreadyClosed, bw.ctx, bw.desc.name)
+	} else if bw.desc.finalized {
+		err = verror.New(errBlobAlreadyFinalized, bw.ctx, bw.desc.name)
+	} else {
+		h := bw.hasher.Sum(nil)
+		_, err = fmt.Fprintf(bw.f.writer, "f %s\n", hashToString(h)) // finalize
+		_, err = bw.f.close(bw.ctx, err)
+		bw.f = nil
+		bw.desc.finalized = true
+		bw.fscabs.descUnref(bw.desc)
+	}
+	return err
+}
+
+// CloseWithoutFinalize() indicates that the client will perform no further
+// append operations on *bw, but does not finalize the blob.  Any internal open
+// file handles are closed.  Clients are expected to need this operation
+// infrequently.
+func (bw *BlobWriter) CloseWithoutFinalize() (err error) {
+	if bw.f == nil {
+		err = verror.New(errAlreadyClosed, bw.ctx, bw.desc.name)
+	} else {
+		_, err = bw.f.close(bw.ctx, err)
+		bw.f = nil
+		bw.fscabs.descUnref(bw.desc)
+	}
+	return err
+}
+
+// AppendFragment() appends a fragment to the blob being written by *bw, where
+// the fragment is composed of the byte vectors described by the elements of
+// item[].  The fragment is copied into the blob store.
+func (bw *BlobWriter) AppendFragment(item ...BlockOrFile) (err error) {
+	if bw.f == nil {
+		panic("fs_cablobstore.BlobWriter programming error: AppendFragment() after Close()")
+	}
+	var fragmentName string
+	var size int64
+	fragmentName, size, err = bw.fscabs.addFragment(bw.ctx, bw.hasher, bw.desc, item...)
+	if err == nil {
+		_, err = fmt.Fprintf(bw.f.writer, "d %d %d %s\n", size, 0 /*offset*/, fragmentName)
+	}
+	if err == nil {
+		err = bw.f.writer.Flush()
+	}
+	return err
+}
+
+// AppendBlob() adds a (substring of a) pre-existing blob to the blob being
+// written by *bw.  The fragments of the pre-existing blob are not physically
+// copied; they are referenced by both blobs.
+func (bw *BlobWriter) AppendBlob(blobName string, size int64, offset int64) (err error) {
+	if bw.f == nil {
+		panic("fs_cablobstore.BlobWriter programming error: AppendBlob() after Close()")
+	}
+	var desc *blobDesc
+	desc, err = bw.fscabs.getBlob(bw.ctx, blobName)
+	origSize := bw.desc.size
+	if err == nil {
+		if size == -1 {
+			size = desc.size - offset
+		}
+		if offset < 0 || desc.size < offset+size {
+			err = verror.New(errBadSizeOrOffset, bw.ctx, size, offset, blobName, desc.size)
+		}
+		for i := 0; i != len(desc.fragment) && err == nil && size > 0; i++ {
+			if desc.fragment[i].size <= offset {
+				offset -= desc.fragment[i].size
+			} else {
+				consume := desc.fragment[i].size - offset
+				if size < consume {
+					consume = size
+				}
+				_, err = fmt.Fprintf(bw.f.writer, "d %d %d %s\n",
+					consume, offset+desc.fragment[i].offset, desc.fragment[i].fileName)
+				if err == nil {
+					// Add fragment so garbage collector can see it.
+					// The garbage collector cannot be
+					// about to delete the fragment, because
+					// getBlob() already checked for that
+					// above, and kept a reference.
+					bw.fscabs.mu.Lock()
+					bw.desc.fragment = append(bw.desc.fragment, blobFragment{
+						pos:      bw.desc.size,
+						size:     consume,
+						offset:   offset + desc.fragment[i].offset,
+						fileName: desc.fragment[i].fileName})
+					bw.desc.size += consume
+					bw.fscabs.mu.Unlock()
+				}
+				offset = 0
+				size -= consume
+			}
+		}
+		bw.fscabs.descUnref(desc)
+		// Add the new fragments to the running hash.
+		if !bw.fscabs.descRef(bw.desc) {
+			// Can't happen; descriptor's ref count was already
+			// non-zero.
+			panic(verror.New(errBlobDeleted, bw.ctx, blobName))
+		}
+		br := bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc)
+		if err == nil {
+			_, err = br.Seek(origSize, 0)
+		}
+		buf := make([]byte, 8192, 8192)
+		for err == nil {
+			var n int
+			n, err = br.Read(buf)
+			bw.hasher.Write(buf[0:n]) // Cannot fail; see Hash interface.
+		}
+		br.Close()
+		if err == io.EOF { // EOF is expected.
+			err = nil
+		}
+		if err == nil {
+			err = bw.f.writer.Flush()
+		}
+	}
+	return err
+}
+
+// IsFinalized() returns whether *bw has been finalized.
+func (bw *BlobWriter) IsFinalized() bool {
+	return bw.desc.finalized
+}
+
+// Size() returns *bw's size.
+func (bw *BlobWriter) Size() int64 {
+	return bw.desc.size
+}
+
+// Name() returns *bw's name.
+func (bw *BlobWriter) Name() string {
+	return bw.desc.name
+}
+
+// Hash() returns *bw's hash, reflecting the bytes written so far.
+func (bw *BlobWriter) Hash() []byte {
+	return bw.hasher.Sum(nil)
+}
+
+// -----------------------------------------------------------
+
+// A BlobReader allows a blob to be read using the standard ReadAt(), Read(),
+// and Seek() calls.  A BlobReader can be created with NewBlobReader(), and
+// should be closed with the Close() method to avoid leaking file handles.
+type BlobReader struct {
+	// The BlobReader exists within a particular FsCaBlobStore and context.T.
+	fscabs *FsCaBlobStore
+	ctx    *context.T
+
+	desc *blobDesc // A description of the blob being read.
+
+	pos int64 // The next position we will read from (used by Read/Seek, not ReadAt).
+
+	// The fields below represent a cached open fragment desc.fragment[fragmentIndex].
+	fragmentIndex int      // -1 or  0 <= fragmentIndex < len(desc.fragment).
+	fh            *os.File // non-nil iff fragmentIndex != -1.
+}
+
+// blobReaderFromDesc() returns a pointer to a newly allocated BlobReader given
+// a pre-existing blobDesc.
+func (fscabs *FsCaBlobStore) blobReaderFromDesc(ctx *context.T, desc *blobDesc) *BlobReader {
+	br := new(BlobReader)
+	br.fscabs = fscabs
+	br.ctx = ctx
+	br.fragmentIndex = -1
+	br.desc = desc
+	return br
+}
+
+// NewBlobReader() returns a pointer to a newly allocated BlobReader on the
+// specified blobName.  BlobReaders should not be used concurrently by multiple
+// threads.  Returned handles should be closed with Close().
+func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br *BlobReader, err error) {
+	var desc *blobDesc
+	desc, err = fscabs.getBlob(ctx, blobName)
+	if err == nil {
+		br = fscabs.blobReaderFromDesc(ctx, desc)
+	}
+	return br, err
+}
+
+// closeInternal() closes any open file handles within *br.
+func (br *BlobReader) closeInternal() {
+	if br.fh != nil {
+		br.fh.Close()
+		br.fh = nil
+	}
+	br.fragmentIndex = -1
+}
+
+// Close() indicates that the client will perform no further operations on *br.
+// It closes any open file handles within a BlobReader.
+func (br *BlobReader) Close() error {
+	br.closeInternal()
+	br.fscabs.descUnref(br.desc)
+	return nil
+}
+
+// findFragment() returns the index of the first element of fragment[] that may
+// contain "offset", based on the "pos" fields of each element.
+// Requires that fragment[] be sorted on the "pos" fields of the elements.
+func findFragment(fragment []blobFragment, offset int64) int {
+	lo := 0
+	hi := len(fragment)
+	for lo < hi {
+		mid := (lo + hi) >> 1
+		if offset < fragment[mid].pos {
+			hi = mid
+		} else {
+			lo = mid + 1
+		}
+	}
+	if lo > 0 {
+		lo--
+	}
+	return lo
+}
+
+// ReadAt() fills b[] with up to len(b) bytes of data starting at position "at"
+// within the blob that *br indicates, and returns the number of bytes read.
+func (br *BlobReader) ReadAt(b []byte, at int64) (n int, err error) {
+	i := findFragment(br.desc.fragment, at)
+	if i < len(br.desc.fragment) && at <= br.desc.size {
+		if i != br.fragmentIndex {
+			br.closeInternal()
+		}
+		if br.fragmentIndex == -1 {
+			br.fh, err = os.Open(filepath.Join(br.fscabs.rootName, br.desc.fragment[i].fileName))
+			if err == nil {
+				br.fragmentIndex = i
+			} else {
+				br.closeInternal()
+			}
+		}
+		var offset int64 = at - br.desc.fragment[i].pos + br.desc.fragment[i].offset
+		consume := br.desc.fragment[i].size - (at - br.desc.fragment[i].pos)
+		if int64(len(b)) < consume {
+			consume = int64(len(b))
+		}
+		if br.fh != nil {
+			n, err = br.fh.ReadAt(b[0:consume], offset)
+		} else if err == nil {
+			panic("failed to open blob fragment")
+		}
+		// Return io.EOF if the Read reached the end of the last
+		// fragment, but not if it's merely the end of some interior
+		// fragment.
+		if int64(n)+at >= br.desc.size {
+			if err == nil {
+				err = io.EOF
+			}
+		} else if err == io.EOF {
+			err = nil
+		}
+	} else {
+		err = verror.New(errIllegalPositionForRead, br.ctx, br.pos, br.desc.size)
+	}
+	return n, err
+}
+
+// Read() fills b[] with up to len(b) bytes of data starting at the current
+// seek position of *br within the blob that *br indicates, and then both
+// returns the number of bytes read and advances *br's seek position by that
+// amount.
+func (br *BlobReader) Read(b []byte) (n int, err error) {
+	n, err = br.ReadAt(b, br.pos)
+	if err == nil {
+		br.pos += int64(n)
+	}
+	return n, err
+}
+
+// Seek() sets the seek position of *br to offset if whence==0,
+// offset+current_seek_position if whence==1, and offset+end_of_blob if
+// whence==2, and then returns the current seek position.
+func (br *BlobReader) Seek(offset int64, whence int) (result int64, err error) {
+	if whence == 0 {
+		result = offset
+	} else if whence == 1 {
+		result = offset + br.pos
+	} else if whence == 2 {
+		result = offset + br.desc.size
+	} else {
+		err = verror.New(errBadSeekWhence, br.ctx, whence)
+		result = br.pos
+	}
+	if result < 0 {
+		err = verror.New(errNegativeSeekPosition, br.ctx, offset, whence)
+		result = br.pos
+	} else if result > br.desc.size {
+		err = verror.New(errIllegalPositionForRead, br.ctx, result, br.desc.size)
+		result = br.pos
+	} else if err == nil {
+		br.pos = result
+	}
+	return result, err
+}
+
+// IsFinalized() returns whether *br has been finalized.
+func (br *BlobReader) IsFinalized() bool {
+	return br.desc.finalized
+}
+
+// Size() returns *br's size.
+func (br *BlobReader) Size() int64 {
+	return br.desc.size
+}
+
+// Name() returns *br's name.
+func (br *BlobReader) Name() string {
+	return br.desc.name
+}
+
+// Hash() returns *br's hash.  It may be nil if the blob is not finalized.
+func (br *BlobReader) Hash() []byte {
+	return br.desc.hash
+}
+
+// -----------------------------------------------------------
+
+// A dirListing is a list of names in a directory, plus a position, which
+// indexes the last item in nameList that has been processed.
+type dirListing struct {
+	pos      int      // Current position in nameList; may be -1 at the start of iteration.
+	nameList []string // List of directory entries.
+}
+
+// An FsCasIter represents an iterator that allows the client to enumerate all
+// the blobs or fragments in a FsCaBlobStore.
+type FsCasIter struct {
+	fscabs *FsCaBlobStore // The parent FsCaBlobStore.
+	err    error          // If non-nil, the error that terminated iteration.
+	stack  []dirListing   // The stack of dirListings leading to the current entry.
+}
+
+// ListBlobIds() returns an iterator that can be used to enumerate the blobs in
+// an FsCaBlobStore.  Expected use is:
+//    fscabsi := fscabs.ListBlobIds(ctx)
+//    for fscabsi.Advance() {
+//      // Process fscabsi.Value() here.
+//    }
+//    if fscabsi.Err() != nil {
+//      // The loop terminated early due to an error.
+//    }
+func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) (fscabsi *FsCasIter) {
+	stack := make([]dirListing, 1)
+	stack[0] = dirListing{pos: -1, nameList: []string{blobDir}}
+	return &FsCasIter{fscabs: fscabs, stack: stack}
+}
+
+// ListCAIds() returns an iterator that can be used to enumerate the
+// content-addressable fragments in an FsCaBlobStore.
+// Expected use is:
+//    fscabsi := fscabs.ListCAIds(ctx)
+//    for fscabsi.Advance() {
+//      // Process fscabsi.Value() here.
+//    }
+//    if fscabsi.Err() != nil {
+//      // The loop terminated early due to an error.
+//    }
+func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) (fscabsi *FsCasIter) {
+	stack := make([]dirListing, 1)
+	stack[0] = dirListing{pos: -1, nameList: []string{casDir}}
+	return &FsCasIter{fscabs: fscabs, stack: stack}
+}
+
+// Advance() stages an item so that it may be retrieved via Value.  Returns
+// true iff there is an item to retrieve.  Advance must be called before Value
+// is called.
+func (fscabsi *FsCasIter) Advance() (advanced bool) {
+	stack := fscabsi.stack
+	err := fscabsi.err
+
+	for err == nil && !advanced && len(stack) != 0 {
+		last := len(stack) - 1
+		stack[last].pos++
+		if stack[last].pos == len(stack[last].nameList) {
+			stack = stack[0:last]
+			fscabsi.stack = stack
+		} else {
+			fullName := filepath.Join(fscabsi.fscabs.rootName, fscabsi.Value())
+			var fi os.FileInfo
+			fi, err = os.Lstat(fullName)
+			if err != nil {
+				// error: nothing to do
+			} else if fi.IsDir() {
+				var dirHandle *os.File
+				dirHandle, err = os.Open(fullName)
+				if err == nil {
+					var nameList []string
+					nameList, err = dirHandle.Readdirnames(0)
+					dirHandle.Close()
+					stack = append(stack, dirListing{pos: -1, nameList: nameList})
+					fscabsi.stack = stack
+					last = len(stack) - 1
+				}
+			} else {
+				advanced = true
+			}
+		}
+	}
+
+	fscabsi.err = err
+	return advanced
+}
+
+// Value() returns the item that was staged by Advance.  May panic if Advance
+// returned false or was not called.  Never blocks.
+func (fscabsi *FsCasIter) Value() (name string) {
+	stack := fscabsi.stack
+	if fscabsi.err == nil && len(stack) != 0 && stack[0].pos >= 0 {
+		name = stack[0].nameList[stack[0].pos]
+		for i := 1; i != len(stack); i++ {
+			name = filepath.Join(name, stack[i].nameList[stack[i].pos])
+		}
+	}
+	return name
+}
+
+// Err() returns any error encountered by Advance.  Never blocks.
+func (fscabsi *FsCasIter) Err() error {
+	return fscabsi.err
+}
+
+// -----------------------------------------------------------
+
+// gcTemp() attempts to delete files in dirName older than threshold.
+// Errors are ignored.
+func gcTemp(dirName string, threshold time.Time) {
+	fh, err := os.Open(dirName)
+	if err == nil {
+		fi, _ := fh.Readdir(0)
+		fh.Close()
+		for i := 0; i < len(fi); i++ {
+			if fi[i].ModTime().Before(threshold) {
+				os.Remove(filepath.Join(dirName, fi[i].Name()))
+			}
+		}
+	}
+}
+
+// GC() removes old temp files and content-addressed blocks that are no longer
+// referenced by any blob.  It may be called concurrently with other calls to
+// GC(), and with uses of BlobReaders and BlobWriters.
+func (fscabs *FsCaBlobStore) GC(ctx *context.T) (err error) {
+	// Remove old temporary files.
+	gcTemp(filepath.Join(fscabs.rootName, tmpDir), time.Now().Add(-10*time.Hour))
+
+	// Add a key to caSet for each content-addressed fragment in *fscabs,
+	caSet := make(map[string]bool)
+	caIter := fscabs.ListCAIds(ctx)
+	for caIter.Advance() {
+		caSet[caIter.Value()] = true
+	}
+
+	// Remove from caSet all fragments referenced by extant blobs.
+	blobIter := fscabs.ListBlobIds(ctx)
+	for blobIter.Advance() {
+		var blobDesc *blobDesc
+		if blobDesc, err = fscabs.getBlob(ctx, blobIter.Value()); err == nil {
+			for i := range blobDesc.fragment {
+				delete(caSet, blobDesc.fragment[i].fileName)
+			}
+			fscabs.descUnref(blobDesc)
+		}
+	}
+
+	if err == nil {
+		// Remove from caSet all fragments referenced by open BlobReaders and
+		// BlobWriters.  Advertise to new readers and writers which blobs are
+		// about to be deleted.
+		fscabs.mu.Lock()
+		for _, desc := range fscabs.activeDesc {
+			for i := range desc.fragment {
+				delete(caSet, desc.fragment[i].fileName)
+			}
+		}
+		fscabs.toDelete = append(fscabs.toDelete, &caSet)
+		fscabs.mu.Unlock()
+
+		// Delete the things that still remain in caSet; they are no longer
+		// referenced.
+		for caName := range caSet {
+			os.Remove(filepath.Join(fscabs.rootName, caName))
+		}
+
+		// Stop advertising what's been deleted.
+		fscabs.mu.Lock()
+		n := len(fscabs.toDelete)
+		var i int
+		// We require that &caSet still be in the list.
+		for i = 0; fscabs.toDelete[i] != &caSet; i++ {
+		}
+		fscabs.toDelete[i] = fscabs.toDelete[n-1]
+		fscabs.toDelete = fscabs.toDelete[0 : n-1]
+		fscabs.mu.Unlock()
+	}
+	return err
+}
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
new file mode 100644
index 0000000..ded5e3c
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
@@ -0,0 +1,584 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test for fs_cablobstore
+package fs_cablobstore_test
+
+import "bytes"
+import "crypto/md5"
+import "fmt"
+import "io"
+import "io/ioutil"
+import "os"
+import "path/filepath"
+import "testing"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
+import "v.io/v23/context"
+import "v.io/v23/verror"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
+
+func TestCreate(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	// Make a temporary directory.
+	var err error
+	var testDirName string
+	testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
+	if err != nil {
+		t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
+	}
+	defer os.RemoveAll(testDirName)
+
+	// Check that we can create an fs_cablobstore.
+	var fscabs *fs_cablobstore.FsCaBlobStore
+	fscabs, err = fs_cablobstore.Create(ctx, testDirName)
+	if err != nil {
+		t.Errorf("fs_cablobstore.Create failed: %v", err)
+	}
+
+	// Check that there are no files in the newly-created tree.
+	iterator := fscabs.ListBlobIds(ctx)
+	for iterator.Advance() {
+		fileName := iterator.Value()
+		t.Errorf("unexpected file %q\n", fileName)
+	}
+	if iterator.Err() != nil {
+		t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
+	}
+}
+
+// A blobOrBlockOrFile represents some bytes that may be contained in a named
+// blob, a named file, or in an explicit slice of bytes.
+type blobOrBlockOrFile struct {
+	blob   string // If non-emtpy, the name of the blob containing the bytes.
+	file   string // If non-empty and blob is empty, the name of the file containing the bytes.
+	size   int64  // Size of part of file or blob, or -1 for "everything until EOF".
+	offset int64  // Offset within file or blob.
+	block  []byte // If both blob and file are empty, a slice containing the bytes.
+}
+
+// A testBlob records that some specified content has been stored with a given
+// blob name in the blob store.
+type testBlob struct {
+	content  []byte // content that has been stored.
+	blobName string // the name of the blob.
+}
+
+// removeBlobFromBlobVector() removes the entry named blobName from
+// blobVector[], returning the new vector.
+func removeBlobFromBlobVector(blobVector []testBlob, blobName string) []testBlob {
+	n := len(blobVector)
+	i := 0
+	for i = 0; i != n && blobName != blobVector[i].blobName; i++ {
+	}
+	if i != n {
+		blobVector[i] = blobVector[n-1]
+		blobVector = blobVector[0 : n-1]
+	}
+	return blobVector
+}
+
+// writeBlob() writes a new blob to *fscabs, and returns its name.  The new
+// blob's content is described by the elements of data[].  Any error messages
+// generated include the index of the blob in blobVector and its content; the
+// latter is assumed to be printable.  The expected content of the the blob is
+// "content", so that this routine can check it.  If useResume is true, and data[]
+// has length more than 1, the function artificially uses ResumeBlobWriter(),
+// to test it.
+func writeBlob(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob,
+	content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
+	var bw *fs_cablobstore.BlobWriter
+	var err error
+	bw, err = fscabs.NewBlobWriter(ctx)
+	if err != nil {
+		t.Fatalf("fs_cablobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
+	}
+	blobName := bw.Name()
+
+	// Construct the blob from the pieces.
+	// There is a loop within the loop to exercise the possibility of
+	// passing multiple fragments to AppendFragment().
+	for i := 0; i != len(data) && err == nil; {
+		if len(data[i].blob) != 0 {
+			err = bw.AppendBlob(data[i].blob, data[i].size, data[i].offset)
+			if err != nil {
+				t.Errorf("fs_cablobstore.AppendBlob %d:%s blob %s failed: %v", len(blobVector), string(content), data[i].blob, err)
+			}
+			i++
+		} else {
+			var pieces []fs_cablobstore.BlockOrFile
+			for ; i != len(data) && len(data[i].blob) == 0; i++ {
+				if len(data[i].file) != 0 {
+					pieces = append(pieces, fs_cablobstore.BlockOrFile{
+						FileName: data[i].file,
+						Size:     data[i].size,
+						Offset:   data[i].offset})
+				} else {
+					pieces = append(pieces, fs_cablobstore.BlockOrFile{Block: data[i].block})
+				}
+			}
+			err = bw.AppendFragment(pieces...)
+			if err != nil {
+				t.Errorf("fs_cablobstore.AppendFragment %d:%s failed on %v: %v", len(blobVector), string(content), pieces, err)
+			}
+		}
+		if useResume && i < len(data)-1 && err == nil {
+			err = bw.CloseWithoutFinalize()
+			if err == nil {
+				bw, err = fscabs.ResumeBlobWriter(ctx, blobName)
+			}
+		}
+	}
+
+	if bw != nil {
+		if bw.Size() != int64(len(content)) {
+			t.Errorf("fs_cablobstore.Size before finalization %d:%s got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+		}
+		if bw.IsFinalized() {
+			t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+		}
+		err = bw.Close()
+		if err != nil {
+			t.Errorf("fs_cablobstore.Close %d:%s failed: %v", len(blobVector), string(content), err)
+		}
+		if !bw.IsFinalized() {
+			t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+		}
+		if bw.Size() != int64(len(content)) {
+			t.Errorf("fs_cablobstore.Size %d:%s after finalization got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+		}
+		if bw.Name() != blobName {
+			t.Errorf("fs_cablobstore %d:%s name changed when finalized was %s now %s", len(blobVector), string(content), blobName, bw.Name())
+		}
+		hasher := md5.New()
+		hasher.Write(content)
+		if bytes.Compare(bw.Hash(), hasher.Sum(nil)) != 0 {
+			t.Errorf("fs_cablobstore %d:%s BlobWriter.Hash got %v, expected %v", len(blobVector), string(content), bw.Hash(), hasher.Sum(nil))
+		}
+	}
+
+	return append(blobVector,
+		testBlob{
+			content:  content,
+			blobName: blobName,
+		})
+}
+
+// readBlob() returns a substring of the content of the blob named blobName in *fscabs.
+// The return values are:
+// - the "size" bytes from the content, starting at the given "offset",
+//   measured from "whence" (as defined by io.Seeker.Seek).
+// - the position to which BlobBeader seeks to,
+// - the md5 hash of the bytes read, and
+// - the md5 hash of the bytes of the blob, as returned by BlobReader.Hash(),
+// - and error.
+func readBlob(ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobName string,
+	size int64, offset int64, whence int) (content []byte, pos int64, hash []byte, fullHash []byte, err error) {
+
+	var br *fs_cablobstore.BlobReader
+	hasher := md5.New()
+	br, err = fscabs.NewBlobReader(ctx, blobName)
+	if err == nil {
+		buf := make([]byte, 8192, 8192)
+		fullHash = br.Hash()
+		pos, err = br.Seek(offset, whence)
+		if err == nil {
+			var n int
+			first := true // Read at least once, to test reading zero bytes.
+			for err == nil && (size == -1 || int64(len(content)) < size || first) {
+				// Read just what was asked for.
+				var toRead []byte = buf
+				if size >= 0 && int(size)-len(content) < len(buf) {
+					toRead = buf[0 : int(size)-len(content)]
+				}
+				n, err = br.Read(toRead)
+				hasher.Write(toRead[0:n])
+				if size >= 0 && int64(len(content)+n) > size {
+					n = int(size) - len(content)
+				}
+				content = append(content, toRead[0:n]...)
+				first = false
+			}
+		}
+		br.Close()
+	}
+	return content, pos, hasher.Sum(nil), fullHash, err
+}
+
+// checkWrittenBlobsAreReadable() checks that the blobs in blobVector[] can be
+// read, and that they contain the appropriate data.
+func checkWrittenBlobsAreReadable(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob) {
+	for i := range blobVector {
+		var size int64
+		data := blobVector[i].content
+		dataLen := int64(len(data))
+		blobName := blobVector[i].blobName
+		for size = -1; size != dataLen+1; size++ {
+			var offset int64
+			for offset = -dataLen - 1; offset != dataLen+1; offset++ {
+				for whence := -1; whence != 4; whence++ {
+					content, pos, hash, fullHash, err := readBlob(ctx, fscabs, blobName, size, offset, whence)
+
+					// Compute expected seek position.
+					expectedPos := offset
+					if whence == 2 {
+						expectedPos += dataLen
+					}
+
+					// Computed expected size.
+					expectedSize := size
+					if expectedSize == -1 || expectedPos+expectedSize > dataLen {
+						expectedSize = dataLen - expectedPos
+					}
+
+					// Check that reads behave as expected.
+					if (whence == -1 || whence == 3) &&
+						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errBadSeekWhence" {
+						// Expected error from bad "whence" value.
+					} else if expectedPos < 0 &&
+						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errNegativeSeekPosition" {
+						// Expected error from negative Seek position.
+					} else if expectedPos > dataLen &&
+						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errIllegalPositionForRead" {
+						// Expected error from too high a Seek position.
+					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == io.EOF &&
+						pos == expectedPos && expectedPos+expectedSize == dataLen {
+						// Expected success with EOF.
+					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == nil &&
+						pos == expectedPos && expectedPos+expectedSize != dataLen {
+						if pos == 0 && size == -1 && bytes.Compare(hash, fullHash) != 0 {
+							t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d; got hash %v, expected %v  (blob is %q)",
+								string(data), size, offset, whence,
+								hash, fullHash, blobName)
+						} // Else expected success without EOF.
+					} else {
+						t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d yields %q pos %d %v   (blob is %q)",
+							string(data), size, offset, whence,
+							content, pos, err, blobName)
+					}
+				}
+			}
+		}
+	}
+}
+
+// checkAllBlobs() checks all the blobs in *fscabs to ensure they correspond to
+// those in blobVector[].
+func checkAllBlobs(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob, testDirName string) {
+	blobCount := 0
+	iterator := fscabs.ListBlobIds(ctx)
+	for iterator.Advance() {
+		fileName := iterator.Value()
+		i := 0
+		for ; i != len(blobVector) && fileName != blobVector[i].blobName; i++ {
+		}
+		if i == len(blobVector) {
+			t.Errorf("fs_cablobstore.ListBlobIds found unexpected file %s", fileName)
+		} else {
+			content, pos, hash, fullHash, err := readBlob(ctx, fscabs, fileName, -1, 0, 0)
+			if err != nil && err != io.EOF {
+				t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+			} else if bytes.Compare(blobVector[i].content, content) != 0 {
+				t.Errorf("fs_cablobstore.ListCAIds found unexpected blob content: %q, contains %q, expected %q",
+					filepath.Join(testDirName, fileName), content, string(blobVector[i].content))
+			} else if pos != 0 {
+				t.Errorf("fs_cablobstore.ListCAIds Seek on %q returned %d instead of 0",
+					filepath.Join(testDirName, fileName), pos)
+			}
+			if bytes.Compare(hash, fullHash) != 0 {
+				t.Errorf("fs_cablobstore.ListCAIds read on %q; got hash %v, expected %v",
+					fileName, hash, fullHash)
+			}
+		}
+		blobCount++
+	}
+	if iterator.Err() != nil {
+		t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
+	}
+	if blobCount != len(blobVector) {
+		t.Errorf("fs_cablobstore.ListBlobIds iteration expected 4 files, got %d", blobCount)
+	}
+}
+
+// checkFragments() checks all the fragments in *fscabs to ensure they
+// correspond to those fragmentMap[].
+func checkFragments(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, fragmentMap map[string]bool, testDirName string) {
+	caCount := 0
+	iterator := fscabs.ListCAIds(ctx)
+	for iterator.Advance() {
+		fileName := iterator.Value()
+		content, err := ioutil.ReadFile(filepath.Join(testDirName, fileName))
+		if err != nil && err != io.EOF {
+			t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+		} else if !fragmentMap[string(content)] {
+			t.Errorf("fs_cablobstore.ListCAIds found unexpected fragment entry: %q, contains %q", filepath.Join(testDirName, fileName), content)
+		} else {
+			hasher := md5.New()
+			hasher.Write(content)
+			hash := hasher.Sum(nil)
+			nameFromContent := filepath.Join("cas",
+				fmt.Sprintf("%02x", hash[0]),
+				fmt.Sprintf("%02x", hash[1]),
+				fmt.Sprintf("%02x", hash[2]),
+				fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+					hash[3],
+					hash[4], hash[5], hash[6], hash[7],
+					hash[8], hash[9], hash[10], hash[11],
+					hash[12], hash[13], hash[14], hash[15]))
+			if nameFromContent != fileName {
+				t.Errorf("fs_cablobstore.ListCAIds hash of fragment: got %q, expected %q (content=%s)", nameFromContent, fileName, string(content))
+			}
+		}
+		caCount++
+	}
+	if iterator.Err() != nil {
+		t.Errorf("fs_cablobstore.ListCAIds iteration failed: %v", iterator.Err())
+	}
+	if caCount != len(fragmentMap) {
+		t.Errorf("fs_cablobstore.ListCAIds iteration expected %d files, got %d", len(fragmentMap), caCount)
+	}
+}
+
+// This test case tests adding files, retrieving them and deleting them.  One
+// can't retrieve or delete something that hasn't been created, so it's all one
+// test case.
+func TestAddRetrieveAndDelete(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	// Make a temporary directory.
+	var err error
+	var testDirName string
+	testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
+	if err != nil {
+		t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
+	}
+	defer os.RemoveAll(testDirName)
+
+	// Create an fs_cablobstore.
+	var fscabs *fs_cablobstore.FsCaBlobStore
+	fscabs, err = fs_cablobstore.Create(ctx, testDirName)
+	if err != nil {
+		t.Fatalf("fs_cablobstore.Create failed: %v", err)
+	}
+
+	// Create the strings:  "wom", "bat", "wombat", "batwom", "atwo", "atwoatwoombatatwo".
+	womData := []byte("wom")
+	batData := []byte("bat")
+	wombatData := []byte("wombat")
+	batwomData := []byte("batwom")
+	atwoData := []byte("atwo")
+	atwoatwoombatatwoData := []byte("atwoatwoombatatwo")
+
+	// fragmentMap will have an entry per content-addressed fragment.
+	fragmentMap := make(map[string]bool)
+
+	// Create the blobs, by various means.
+
+	var blobVector []testBlob // Accumulate the blobs we create here.
+
+	blobVector = writeBlob(t, ctx, fscabs, blobVector,
+		womData, false,
+		blobOrBlockOrFile{block: womData})
+	womName := blobVector[len(blobVector)-1].blobName
+	fragmentMap[string(womData)] = true
+
+	blobVector = writeBlob(t, ctx, fscabs, blobVector,
+		batData, false,
+		blobOrBlockOrFile{block: batData})
+	batName := blobVector[len(blobVector)-1].blobName
+	fragmentMap[string(batData)] = true
+
+	blobVector = writeBlob(t, ctx, fscabs, blobVector,
+		wombatData, false,
+		blobOrBlockOrFile{block: wombatData})
+	firstWombatName := blobVector[len(blobVector)-1].blobName
+	fragmentMap[string(wombatData)] = true
+
+	blobVector = writeBlob(t, ctx, fscabs, blobVector,
+		wombatData, true,
+		blobOrBlockOrFile{block: womData},
+		blobOrBlockOrFile{block: batData})
+
+	blobVector = writeBlob(t, ctx, fscabs, blobVector,
+		wombatData, false,
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   -1,
+			offset: 0})
+
+	blobVector = writeBlob(t, ctx, fscabs, blobVector,
+		wombatData, false,
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   6,
+			offset: 0})
+
+	blobVector = writeBlob(t, ctx, fscabs, blobVector,
+		batwomData, false,
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   3,
+			offset: 3},
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   3,
+			offset: 0})
+	batwomName := blobVector[len(blobVector)-1].blobName
+
+	blobVector = writeBlob(t, ctx, fscabs, blobVector,
+		atwoData, false,
+		blobOrBlockOrFile{
+			blob:   batwomName,
+			size:   4,
+			offset: 1})
+	atwoName := blobVector[len(blobVector)-1].blobName
+
+	blobVector = writeBlob(t, ctx, fscabs, blobVector,
+		atwoatwoombatatwoData, true,
+		blobOrBlockOrFile{
+			blob:   atwoName,
+			size:   -1,
+			offset: 0},
+		blobOrBlockOrFile{
+			blob:   atwoName,
+			size:   4,
+			offset: 0},
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   -1,
+			offset: 1},
+		blobOrBlockOrFile{
+			blob:   batName,
+			size:   -1,
+			offset: 1},
+		blobOrBlockOrFile{
+			blob:   womName,
+			size:   2,
+			offset: 0})
+	atwoatwoombatatwoName := blobVector[len(blobVector)-1].blobName
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
+	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
+	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Nothing should change if we garbage collect.
+	fscabs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
+	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
+	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Ensure that deleting non-existent blobs fails.
+	err = fscabs.DeleteBlob(ctx, "../../../../etc/passwd")
+	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+	}
+	err = fscabs.DeleteBlob(ctx, "foo/00/00/00/00000000000000000000000000")
+	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+	}
+
+	// -------------------------------------------------
+	// Delete a blob.
+	err = fscabs.DeleteBlob(ctx, batName)
+	if err != nil {
+		t.Errorf("DeleteBlob failed to delete blob %q: %v", batName, err)
+	}
+	blobVector = removeBlobFromBlobVector(blobVector, batName)
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
+	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
+	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Nothing should change if we garbage collect.
+	fscabs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
+	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
+	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Open a BlobReader on a blob we're about to delete,
+	// so its fragments won't be garbage collected.
+
+	var br *fs_cablobstore.BlobReader
+	br, err = fscabs.NewBlobReader(ctx, atwoatwoombatatwoName)
+	if err != nil {
+		t.Errorf("NewBlobReader failed in blob %q: %v", atwoatwoombatatwoName, err)
+	}
+
+	// -------------------------------------------------
+	// Delete a blob.  This should be the last on-disc reference to the
+	// content-addressed fragment "bat", but the fragment won't be deleted
+	// until close the reader and garbage collect.
+	err = fscabs.DeleteBlob(ctx, atwoatwoombatatwoName)
+	if err != nil {
+		t.Errorf("DeleteBlob failed to delete blob %q: %v", atwoatwoombatatwoName, err)
+	}
+	blobVector = removeBlobFromBlobVector(blobVector, atwoatwoombatatwoName)
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
+	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
+	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Garbage collection should change nothing; the fragment involved
+	// is still referenced from the open reader *br.
+	fscabs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
+	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
+	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+
+	// Close the open BlobReader and garbage collect.
+	err = br.Close()
+	if err != nil {
+		t.Errorf("BlobReader.Close failed on blob %q: %v", atwoatwoombatatwoName, err)
+	}
+	delete(fragmentMap, string(batData))
+
+	fscabs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
+	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
+	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Delete all blobs.
+	for len(blobVector) != 0 {
+		err = fscabs.DeleteBlob(ctx, blobVector[0].blobName)
+		if err != nil {
+			t.Errorf("DeleteBlob failed to delete blob %q: %v", blobVector[0].blobName, err)
+		}
+		blobVector = removeBlobFromBlobVector(blobVector, blobVector[0].blobName)
+	}
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
+	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
+	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// The remaining fragments should be removed when we garbage collect.
+	for frag := range fragmentMap {
+		delete(fragmentMap, frag)
+	}
+	fscabs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
+	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
+	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+}