Merge "syncbase/vsync: Integrate log and gen vector functionality (Part 2/2)."
diff --git a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
index 8e173d1..43cc542 100644
--- a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
+++ b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -41,6 +41,7 @@
 import "sync"
 import "time"
 
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
 import "v.io/v23/context"
 import "v.io/v23/verror"
 
@@ -247,22 +248,13 @@
 
 // -----------------------------------------------------------
 
-// A BlockOrFile represents a vector of bytes, and contains either a data block
-// (as a []byte), or a (file name, size, offset) triple.
-type BlockOrFile struct {
-	Block    []byte // If FileName is empty, the bytes represented.
-	FileName string // If non-empty, the name of the file containing the bytes.
-	Size     int64  // If FileName is non-empty, the number of bytes (or -1 for "all")
-	Offset   int64  // If FileName is non-empty, the offset of the relevant bytes within the file.
-}
-
 // addFragment() ensures that the store *fscabs contains a fragment comprising
 // the catenation of the byte vectors named by item[..].block and the contents
 // of the files named by item[..].filename.  The block field is ignored if
 // fileName!="".  The fragment is not physically added if already present.
 // The fragment is added to the fragment list of the descriptor *desc.
 func (fscabs *FsCaBlobStore) addFragment(ctx *context.T, extHasher hash.Hash,
-	desc *blobDesc, item ...BlockOrFile) (fileName string, size int64, err error) {
+	desc *blobDesc, item ...localblobstore.BlockOrFile) (fileName string, size int64, err error) {
 
 	hasher := md5.New()
 	var buf []byte
@@ -543,12 +535,12 @@
 // should not be used concurrently by multiple threads.  The returned handle
 // should be closed with either the Close() or CloseWithoutFinalize() method to
 // avoid leaking file handles.
-func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (bw *BlobWriter, err error) {
+func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (localblobstore.BlobWriter, error) {
+	var bw *BlobWriter
 	newName := newBlobName()
-	var f *file
 	fileName := filepath.Join(fscabs.rootName, newName)
 	os.MkdirAll(filepath.Dir(fileName), dirPermissions)
-	f, err = newFile(os.Create(fileName))
+	f, err := newFile(os.Create(fileName))
 	if err == nil {
 		bw = new(BlobWriter)
 		bw.fscabs = fscabs
@@ -568,8 +560,9 @@
 
 // ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on an
 // old, but unfinalized blob name.
-func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (bw *BlobWriter, err error) {
-	bw = new(BlobWriter)
+func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (localblobstore.BlobWriter, error) {
+	var err error
+	bw := new(BlobWriter)
 	bw.fscabs = fscabs
 	bw.ctx = ctx
 	bw.desc, err = bw.fscabs.getBlob(ctx, blobName)
@@ -639,7 +632,7 @@
 // AppendFragment() appends a fragment to the blob being written by *bw, where
 // the fragment is composed of the byte vectors described by the elements of
 // item[].  The fragment is copied into the blob store.
-func (bw *BlobWriter) AppendFragment(item ...BlockOrFile) (err error) {
+func (bw *BlobWriter) AppendFragment(item ...localblobstore.BlockOrFile) (err error) {
 	if bw.f == nil {
 		panic("fs_cablobstore.BlobWriter programming error: AppendFragment() after Close()")
 	}
@@ -782,7 +775,7 @@
 // NewBlobReader() returns a pointer to a newly allocated BlobReader on the
 // specified blobName.  BlobReaders should not be used concurrently by multiple
 // threads.  Returned handles should be closed with Close().
-func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br *BlobReader, err error) {
+func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br localblobstore.BlobReader, err error) {
 	var desc *blobDesc
 	desc, err = fscabs.getBlob(ctx, blobName)
 	if err == nil {
@@ -954,7 +947,7 @@
 //    if fscabsi.Err() != nil {
 //      // The loop terminated early due to an error.
 //    }
-func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) (fscabsi *FsCasIter) {
+func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Iter {
 	stack := make([]dirListing, 1)
 	stack[0] = dirListing{pos: -1, nameList: []string{blobDir}}
 	return &FsCasIter{fscabs: fscabs, stack: stack}
@@ -970,7 +963,7 @@
 //    if fscabsi.Err() != nil {
 //      // The loop terminated early due to an error.
 //    }
-func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) (fscabsi *FsCasIter) {
+func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Iter {
 	stack := make([]dirListing, 1)
 	stack[0] = dirListing{pos: -1, nameList: []string{casDir}}
 	return &FsCasIter{fscabs: fscabs, stack: stack}
diff --git a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
index ded5e3c..886f5c4 100644
--- a/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
+++ b/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
@@ -5,345 +5,16 @@
 // A test for fs_cablobstore
 package fs_cablobstore_test
 
-import "bytes"
-import "crypto/md5"
-import "fmt"
-import "io"
 import "io/ioutil"
 import "os"
-import "path/filepath"
 import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
-import "v.io/v23/context"
-import "v.io/v23/verror"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
 import "v.io/x/ref/test"
 import _ "v.io/x/ref/runtime/factories/generic"
 
-func TestCreate(t *testing.T) {
-	ctx, shutdown := test.V23Init()
-	defer shutdown()
-
-	// Make a temporary directory.
-	var err error
-	var testDirName string
-	testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
-	if err != nil {
-		t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
-	}
-	defer os.RemoveAll(testDirName)
-
-	// Check that we can create an fs_cablobstore.
-	var fscabs *fs_cablobstore.FsCaBlobStore
-	fscabs, err = fs_cablobstore.Create(ctx, testDirName)
-	if err != nil {
-		t.Errorf("fs_cablobstore.Create failed: %v", err)
-	}
-
-	// Check that there are no files in the newly-created tree.
-	iterator := fscabs.ListBlobIds(ctx)
-	for iterator.Advance() {
-		fileName := iterator.Value()
-		t.Errorf("unexpected file %q\n", fileName)
-	}
-	if iterator.Err() != nil {
-		t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
-	}
-}
-
-// A blobOrBlockOrFile represents some bytes that may be contained in a named
-// blob, a named file, or in an explicit slice of bytes.
-type blobOrBlockOrFile struct {
-	blob   string // If non-emtpy, the name of the blob containing the bytes.
-	file   string // If non-empty and blob is empty, the name of the file containing the bytes.
-	size   int64  // Size of part of file or blob, or -1 for "everything until EOF".
-	offset int64  // Offset within file or blob.
-	block  []byte // If both blob and file are empty, a slice containing the bytes.
-}
-
-// A testBlob records that some specified content has been stored with a given
-// blob name in the blob store.
-type testBlob struct {
-	content  []byte // content that has been stored.
-	blobName string // the name of the blob.
-}
-
-// removeBlobFromBlobVector() removes the entry named blobName from
-// blobVector[], returning the new vector.
-func removeBlobFromBlobVector(blobVector []testBlob, blobName string) []testBlob {
-	n := len(blobVector)
-	i := 0
-	for i = 0; i != n && blobName != blobVector[i].blobName; i++ {
-	}
-	if i != n {
-		blobVector[i] = blobVector[n-1]
-		blobVector = blobVector[0 : n-1]
-	}
-	return blobVector
-}
-
-// writeBlob() writes a new blob to *fscabs, and returns its name.  The new
-// blob's content is described by the elements of data[].  Any error messages
-// generated include the index of the blob in blobVector and its content; the
-// latter is assumed to be printable.  The expected content of the the blob is
-// "content", so that this routine can check it.  If useResume is true, and data[]
-// has length more than 1, the function artificially uses ResumeBlobWriter(),
-// to test it.
-func writeBlob(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob,
-	content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
-	var bw *fs_cablobstore.BlobWriter
-	var err error
-	bw, err = fscabs.NewBlobWriter(ctx)
-	if err != nil {
-		t.Fatalf("fs_cablobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
-	}
-	blobName := bw.Name()
-
-	// Construct the blob from the pieces.
-	// There is a loop within the loop to exercise the possibility of
-	// passing multiple fragments to AppendFragment().
-	for i := 0; i != len(data) && err == nil; {
-		if len(data[i].blob) != 0 {
-			err = bw.AppendBlob(data[i].blob, data[i].size, data[i].offset)
-			if err != nil {
-				t.Errorf("fs_cablobstore.AppendBlob %d:%s blob %s failed: %v", len(blobVector), string(content), data[i].blob, err)
-			}
-			i++
-		} else {
-			var pieces []fs_cablobstore.BlockOrFile
-			for ; i != len(data) && len(data[i].blob) == 0; i++ {
-				if len(data[i].file) != 0 {
-					pieces = append(pieces, fs_cablobstore.BlockOrFile{
-						FileName: data[i].file,
-						Size:     data[i].size,
-						Offset:   data[i].offset})
-				} else {
-					pieces = append(pieces, fs_cablobstore.BlockOrFile{Block: data[i].block})
-				}
-			}
-			err = bw.AppendFragment(pieces...)
-			if err != nil {
-				t.Errorf("fs_cablobstore.AppendFragment %d:%s failed on %v: %v", len(blobVector), string(content), pieces, err)
-			}
-		}
-		if useResume && i < len(data)-1 && err == nil {
-			err = bw.CloseWithoutFinalize()
-			if err == nil {
-				bw, err = fscabs.ResumeBlobWriter(ctx, blobName)
-			}
-		}
-	}
-
-	if bw != nil {
-		if bw.Size() != int64(len(content)) {
-			t.Errorf("fs_cablobstore.Size before finalization %d:%s got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
-		}
-		if bw.IsFinalized() {
-			t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
-		}
-		err = bw.Close()
-		if err != nil {
-			t.Errorf("fs_cablobstore.Close %d:%s failed: %v", len(blobVector), string(content), err)
-		}
-		if !bw.IsFinalized() {
-			t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
-		}
-		if bw.Size() != int64(len(content)) {
-			t.Errorf("fs_cablobstore.Size %d:%s after finalization got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
-		}
-		if bw.Name() != blobName {
-			t.Errorf("fs_cablobstore %d:%s name changed when finalized was %s now %s", len(blobVector), string(content), blobName, bw.Name())
-		}
-		hasher := md5.New()
-		hasher.Write(content)
-		if bytes.Compare(bw.Hash(), hasher.Sum(nil)) != 0 {
-			t.Errorf("fs_cablobstore %d:%s BlobWriter.Hash got %v, expected %v", len(blobVector), string(content), bw.Hash(), hasher.Sum(nil))
-		}
-	}
-
-	return append(blobVector,
-		testBlob{
-			content:  content,
-			blobName: blobName,
-		})
-}
-
-// readBlob() returns a substring of the content of the blob named blobName in *fscabs.
-// The return values are:
-// - the "size" bytes from the content, starting at the given "offset",
-//   measured from "whence" (as defined by io.Seeker.Seek).
-// - the position to which BlobBeader seeks to,
-// - the md5 hash of the bytes read, and
-// - the md5 hash of the bytes of the blob, as returned by BlobReader.Hash(),
-// - and error.
-func readBlob(ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobName string,
-	size int64, offset int64, whence int) (content []byte, pos int64, hash []byte, fullHash []byte, err error) {
-
-	var br *fs_cablobstore.BlobReader
-	hasher := md5.New()
-	br, err = fscabs.NewBlobReader(ctx, blobName)
-	if err == nil {
-		buf := make([]byte, 8192, 8192)
-		fullHash = br.Hash()
-		pos, err = br.Seek(offset, whence)
-		if err == nil {
-			var n int
-			first := true // Read at least once, to test reading zero bytes.
-			for err == nil && (size == -1 || int64(len(content)) < size || first) {
-				// Read just what was asked for.
-				var toRead []byte = buf
-				if size >= 0 && int(size)-len(content) < len(buf) {
-					toRead = buf[0 : int(size)-len(content)]
-				}
-				n, err = br.Read(toRead)
-				hasher.Write(toRead[0:n])
-				if size >= 0 && int64(len(content)+n) > size {
-					n = int(size) - len(content)
-				}
-				content = append(content, toRead[0:n]...)
-				first = false
-			}
-		}
-		br.Close()
-	}
-	return content, pos, hasher.Sum(nil), fullHash, err
-}
-
-// checkWrittenBlobsAreReadable() checks that the blobs in blobVector[] can be
-// read, and that they contain the appropriate data.
-func checkWrittenBlobsAreReadable(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob) {
-	for i := range blobVector {
-		var size int64
-		data := blobVector[i].content
-		dataLen := int64(len(data))
-		blobName := blobVector[i].blobName
-		for size = -1; size != dataLen+1; size++ {
-			var offset int64
-			for offset = -dataLen - 1; offset != dataLen+1; offset++ {
-				for whence := -1; whence != 4; whence++ {
-					content, pos, hash, fullHash, err := readBlob(ctx, fscabs, blobName, size, offset, whence)
-
-					// Compute expected seek position.
-					expectedPos := offset
-					if whence == 2 {
-						expectedPos += dataLen
-					}
-
-					// Computed expected size.
-					expectedSize := size
-					if expectedSize == -1 || expectedPos+expectedSize > dataLen {
-						expectedSize = dataLen - expectedPos
-					}
-
-					// Check that reads behave as expected.
-					if (whence == -1 || whence == 3) &&
-						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errBadSeekWhence" {
-						// Expected error from bad "whence" value.
-					} else if expectedPos < 0 &&
-						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errNegativeSeekPosition" {
-						// Expected error from negative Seek position.
-					} else if expectedPos > dataLen &&
-						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errIllegalPositionForRead" {
-						// Expected error from too high a Seek position.
-					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
-						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == io.EOF &&
-						pos == expectedPos && expectedPos+expectedSize == dataLen {
-						// Expected success with EOF.
-					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
-						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == nil &&
-						pos == expectedPos && expectedPos+expectedSize != dataLen {
-						if pos == 0 && size == -1 && bytes.Compare(hash, fullHash) != 0 {
-							t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d; got hash %v, expected %v  (blob is %q)",
-								string(data), size, offset, whence,
-								hash, fullHash, blobName)
-						} // Else expected success without EOF.
-					} else {
-						t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d yields %q pos %d %v   (blob is %q)",
-							string(data), size, offset, whence,
-							content, pos, err, blobName)
-					}
-				}
-			}
-		}
-	}
-}
-
-// checkAllBlobs() checks all the blobs in *fscabs to ensure they correspond to
-// those in blobVector[].
-func checkAllBlobs(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob, testDirName string) {
-	blobCount := 0
-	iterator := fscabs.ListBlobIds(ctx)
-	for iterator.Advance() {
-		fileName := iterator.Value()
-		i := 0
-		for ; i != len(blobVector) && fileName != blobVector[i].blobName; i++ {
-		}
-		if i == len(blobVector) {
-			t.Errorf("fs_cablobstore.ListBlobIds found unexpected file %s", fileName)
-		} else {
-			content, pos, hash, fullHash, err := readBlob(ctx, fscabs, fileName, -1, 0, 0)
-			if err != nil && err != io.EOF {
-				t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
-			} else if bytes.Compare(blobVector[i].content, content) != 0 {
-				t.Errorf("fs_cablobstore.ListCAIds found unexpected blob content: %q, contains %q, expected %q",
-					filepath.Join(testDirName, fileName), content, string(blobVector[i].content))
-			} else if pos != 0 {
-				t.Errorf("fs_cablobstore.ListCAIds Seek on %q returned %d instead of 0",
-					filepath.Join(testDirName, fileName), pos)
-			}
-			if bytes.Compare(hash, fullHash) != 0 {
-				t.Errorf("fs_cablobstore.ListCAIds read on %q; got hash %v, expected %v",
-					fileName, hash, fullHash)
-			}
-		}
-		blobCount++
-	}
-	if iterator.Err() != nil {
-		t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
-	}
-	if blobCount != len(blobVector) {
-		t.Errorf("fs_cablobstore.ListBlobIds iteration expected 4 files, got %d", blobCount)
-	}
-}
-
-// checkFragments() checks all the fragments in *fscabs to ensure they
-// correspond to those fragmentMap[].
-func checkFragments(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, fragmentMap map[string]bool, testDirName string) {
-	caCount := 0
-	iterator := fscabs.ListCAIds(ctx)
-	for iterator.Advance() {
-		fileName := iterator.Value()
-		content, err := ioutil.ReadFile(filepath.Join(testDirName, fileName))
-		if err != nil && err != io.EOF {
-			t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
-		} else if !fragmentMap[string(content)] {
-			t.Errorf("fs_cablobstore.ListCAIds found unexpected fragment entry: %q, contains %q", filepath.Join(testDirName, fileName), content)
-		} else {
-			hasher := md5.New()
-			hasher.Write(content)
-			hash := hasher.Sum(nil)
-			nameFromContent := filepath.Join("cas",
-				fmt.Sprintf("%02x", hash[0]),
-				fmt.Sprintf("%02x", hash[1]),
-				fmt.Sprintf("%02x", hash[2]),
-				fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
-					hash[3],
-					hash[4], hash[5], hash[6], hash[7],
-					hash[8], hash[9], hash[10], hash[11],
-					hash[12], hash[13], hash[14], hash[15]))
-			if nameFromContent != fileName {
-				t.Errorf("fs_cablobstore.ListCAIds hash of fragment: got %q, expected %q (content=%s)", nameFromContent, fileName, string(content))
-			}
-		}
-		caCount++
-	}
-	if iterator.Err() != nil {
-		t.Errorf("fs_cablobstore.ListCAIds iteration failed: %v", iterator.Err())
-	}
-	if caCount != len(fragmentMap) {
-		t.Errorf("fs_cablobstore.ListCAIds iteration expected %d files, got %d", len(fragmentMap), caCount)
-	}
-}
-
 // This test case tests adding files, retrieving them and deleting them.  One
 // can't retrieve or delete something that hasn't been created, so it's all one
 // test case.
@@ -354,231 +25,19 @@
 	// Make a temporary directory.
 	var err error
 	var testDirName string
-	testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
+	testDirName, err = ioutil.TempDir("", "localblobstore_test")
 	if err != nil {
-		t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
+		t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
 	}
 	defer os.RemoveAll(testDirName)
 
 	// Create an fs_cablobstore.
-	var fscabs *fs_cablobstore.FsCaBlobStore
-	fscabs, err = fs_cablobstore.Create(ctx, testDirName)
+	var bs localblobstore.BlobStore
+	bs, err = fs_cablobstore.Create(ctx, testDirName)
 	if err != nil {
 		t.Fatalf("fs_cablobstore.Create failed: %v", err)
 	}
 
-	// Create the strings:  "wom", "bat", "wombat", "batwom", "atwo", "atwoatwoombatatwo".
-	womData := []byte("wom")
-	batData := []byte("bat")
-	wombatData := []byte("wombat")
-	batwomData := []byte("batwom")
-	atwoData := []byte("atwo")
-	atwoatwoombatatwoData := []byte("atwoatwoombatatwo")
-
-	// fragmentMap will have an entry per content-addressed fragment.
-	fragmentMap := make(map[string]bool)
-
-	// Create the blobs, by various means.
-
-	var blobVector []testBlob // Accumulate the blobs we create here.
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		womData, false,
-		blobOrBlockOrFile{block: womData})
-	womName := blobVector[len(blobVector)-1].blobName
-	fragmentMap[string(womData)] = true
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		batData, false,
-		blobOrBlockOrFile{block: batData})
-	batName := blobVector[len(blobVector)-1].blobName
-	fragmentMap[string(batData)] = true
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		wombatData, false,
-		blobOrBlockOrFile{block: wombatData})
-	firstWombatName := blobVector[len(blobVector)-1].blobName
-	fragmentMap[string(wombatData)] = true
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		wombatData, true,
-		blobOrBlockOrFile{block: womData},
-		blobOrBlockOrFile{block: batData})
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		wombatData, false,
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   -1,
-			offset: 0})
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		wombatData, false,
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   6,
-			offset: 0})
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		batwomData, false,
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   3,
-			offset: 3},
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   3,
-			offset: 0})
-	batwomName := blobVector[len(blobVector)-1].blobName
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		atwoData, false,
-		blobOrBlockOrFile{
-			blob:   batwomName,
-			size:   4,
-			offset: 1})
-	atwoName := blobVector[len(blobVector)-1].blobName
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		atwoatwoombatatwoData, true,
-		blobOrBlockOrFile{
-			blob:   atwoName,
-			size:   -1,
-			offset: 0},
-		blobOrBlockOrFile{
-			blob:   atwoName,
-			size:   4,
-			offset: 0},
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   -1,
-			offset: 1},
-		blobOrBlockOrFile{
-			blob:   batName,
-			size:   -1,
-			offset: 1},
-		blobOrBlockOrFile{
-			blob:   womName,
-			size:   2,
-			offset: 0})
-	atwoatwoombatatwoName := blobVector[len(blobVector)-1].blobName
-
-	// -------------------------------------------------
-	// Check that the state is as we expect.
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Nothing should change if we garbage collect.
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Ensure that deleting non-existent blobs fails.
-	err = fscabs.DeleteBlob(ctx, "../../../../etc/passwd")
-	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
-		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
-	}
-	err = fscabs.DeleteBlob(ctx, "foo/00/00/00/00000000000000000000000000")
-	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
-		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
-	}
-
-	// -------------------------------------------------
-	// Delete a blob.
-	err = fscabs.DeleteBlob(ctx, batName)
-	if err != nil {
-		t.Errorf("DeleteBlob failed to delete blob %q: %v", batName, err)
-	}
-	blobVector = removeBlobFromBlobVector(blobVector, batName)
-
-	// -------------------------------------------------
-	// Check that the state is as we expect.
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Nothing should change if we garbage collect.
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Open a BlobReader on a blob we're about to delete,
-	// so its fragments won't be garbage collected.
-
-	var br *fs_cablobstore.BlobReader
-	br, err = fscabs.NewBlobReader(ctx, atwoatwoombatatwoName)
-	if err != nil {
-		t.Errorf("NewBlobReader failed in blob %q: %v", atwoatwoombatatwoName, err)
-	}
-
-	// -------------------------------------------------
-	// Delete a blob.  This should be the last on-disc reference to the
-	// content-addressed fragment "bat", but the fragment won't be deleted
-	// until close the reader and garbage collect.
-	err = fscabs.DeleteBlob(ctx, atwoatwoombatatwoName)
-	if err != nil {
-		t.Errorf("DeleteBlob failed to delete blob %q: %v", atwoatwoombatatwoName, err)
-	}
-	blobVector = removeBlobFromBlobVector(blobVector, atwoatwoombatatwoName)
-
-	// -------------------------------------------------
-	// Check that the state is as we expect.
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Garbage collection should change nothing; the fragment involved
-	// is still referenced from the open reader *br.
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-
-	// Close the open BlobReader and garbage collect.
-	err = br.Close()
-	if err != nil {
-		t.Errorf("BlobReader.Close failed on blob %q: %v", atwoatwoombatatwoName, err)
-	}
-	delete(fragmentMap, string(batData))
-
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Delete all blobs.
-	for len(blobVector) != 0 {
-		err = fscabs.DeleteBlob(ctx, blobVector[0].blobName)
-		if err != nil {
-			t.Errorf("DeleteBlob failed to delete blob %q: %v", blobVector[0].blobName, err)
-		}
-		blobVector = removeBlobFromBlobVector(blobVector, blobVector[0].blobName)
-	}
-
-	// -------------------------------------------------
-	// Check that the state is as we expect.
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// The remaining fragments should be removed when we garbage collect.
-	for frag := range fragmentMap {
-		delete(fragmentMap, frag)
-	}
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+	// Test it.
+	localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
 }
diff --git a/services/syncbase/localblobstore/localblobstore_test.go b/services/syncbase/localblobstore/localblobstore_test.go
new file mode 100644
index 0000000..973a9c8
--- /dev/null
+++ b/services/syncbase/localblobstore/localblobstore_test.go
@@ -0,0 +1,43 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test for localblobstore
+package localblobstore_test
+
+import "io/ioutil"
+import "os"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
+
+// This test case tests adding files, retrieving them and deleting them.  One
+// can't retrieve or delete something that hasn't been created, so it's all one
+// test case.
+func TestAddRetrieveAndDelete(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	// Make a temporary directory.
+	var err error
+	var testDirName string
+	testDirName, err = ioutil.TempDir("", "localblobstore_test")
+	if err != nil {
+		t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+	}
+	defer os.RemoveAll(testDirName)
+
+	// Create an fs_cablobstore.
+	var bs localblobstore.BlobStore
+	bs, err = fs_cablobstore.Create(ctx, testDirName)
+	if err != nil {
+		t.Fatalf("fs_cablobstore.Create failed: %v", err)
+	}
+
+	// Test it.
+	localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
+}
diff --git a/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go b/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
new file mode 100644
index 0000000..662e964
--- /dev/null
+++ b/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
@@ -0,0 +1,548 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test library for localblobstores.
+package localblobstore_testlib
+
+import "bytes"
+import "crypto/md5"
+import "fmt"
+import "io"
+import "io/ioutil"
+import "path/filepath"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/v23/context"
+import "v.io/v23/verror"
+
+// A blobOrBlockOrFile represents some bytes that may be contained in a named
+// blob, a named file, or in an explicit slice of bytes.
+type blobOrBlockOrFile struct {
+	blob   string // If non-empty, the name of the blob containing the bytes.
+	file   string // If non-empty and blob is empty, the name of the file containing the bytes.
+	size   int64  // Size of part of file or blob, or -1 for "everything until EOF".
+	offset int64  // Offset within file or blob.
+	block  []byte // If both blob and file are empty, a slice containing the bytes.
+}
+
+// A testBlob records that some specified content has been stored with a given
+// blob name in the blob store.
+type testBlob struct {
+	content  []byte // content that has been stored.
+	blobName string // the name of the blob.
+}
+
+// removeBlobFromBlobVector() removes the entry named blobName from
+// blobVector[], returning the new vector.
+func removeBlobFromBlobVector(blobVector []testBlob, blobName string) []testBlob {
+	n := len(blobVector)
+	i := 0
+	for i = 0; i != n && blobName != blobVector[i].blobName; i++ {
+	}
+	if i != n {
+		blobVector[i] = blobVector[n-1]
+		blobVector = blobVector[0 : n-1]
+	}
+	return blobVector
+}
+
+// writeBlob() writes a new blob to bs, and returns its name.  The new
+// blob's content is described by the elements of data[].  Any error messages
+// generated include the index of the blob in blobVector and its content; the
+// latter is assumed to be printable.  The expected content of the the blob is
+// "content", so that this routine can check it.  If useResume is true, and data[]
+// has length more than 1, the function artificially uses ResumeBlobWriter(),
+// to test it.
+func writeBlob(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob,
+	content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
+	var bw localblobstore.BlobWriter
+	var err error
+	bw, err = bs.NewBlobWriter(ctx)
+	if err != nil {
+		t.Errorf("localblobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
+	}
+	blobName := bw.Name()
+
+	// Construct the blob from the pieces.
+	// There is a loop within the loop to exercise the possibility of
+	// passing multiple fragments to AppendFragment().
+	for i := 0; i != len(data) && err == nil; {
+		if len(data[i].blob) != 0 {
+			err = bw.AppendBlob(data[i].blob, data[i].size, data[i].offset)
+			if err != nil {
+				t.Errorf("localblobstore.AppendBlob %d:%s blob %s failed: %v", len(blobVector), string(content), data[i].blob, err)
+			}
+			i++
+		} else {
+			var pieces []localblobstore.BlockOrFile
+			for ; i != len(data) && len(data[i].blob) == 0; i++ {
+				if len(data[i].file) != 0 {
+					pieces = append(pieces, localblobstore.BlockOrFile{
+						FileName: data[i].file,
+						Size:     data[i].size,
+						Offset:   data[i].offset})
+				} else {
+					pieces = append(pieces, localblobstore.BlockOrFile{Block: data[i].block})
+				}
+			}
+			err = bw.AppendFragment(pieces...)
+			if err != nil {
+				t.Errorf("localblobstore.AppendFragment %d:%s failed on %v: %v", len(blobVector), string(content), pieces, err)
+			}
+		}
+		if useResume && i < len(data)-1 && err == nil {
+			err = bw.CloseWithoutFinalize()
+			if err == nil {
+				bw, err = bs.ResumeBlobWriter(ctx, blobName)
+			}
+		}
+	}
+
+	if bw != nil {
+		if bw.Size() != int64(len(content)) {
+			t.Errorf("localblobstore.Size before finalization %d:%s got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+		}
+		if bw.IsFinalized() {
+			t.Errorf("localblobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+		}
+		err = bw.Close()
+		if err != nil {
+			t.Errorf("localblobstore.Close %d:%s failed: %v", len(blobVector), string(content), err)
+		}
+		if !bw.IsFinalized() {
+			t.Errorf("localblobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+		}
+		if bw.Size() != int64(len(content)) {
+			t.Errorf("localblobstore.Size %d:%s after finalization got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+		}
+		if bw.Name() != blobName {
+			t.Errorf("localblobstore %d:%s name changed when finalized was %s now %s", len(blobVector), string(content), blobName, bw.Name())
+		}
+		hasher := md5.New()
+		hasher.Write(content)
+		if bytes.Compare(bw.Hash(), hasher.Sum(nil)) != 0 {
+			t.Errorf("localblobstore %d:%s BlobWriter.Hash got %v, expected %v", len(blobVector), string(content), bw.Hash(), hasher.Sum(nil))
+		}
+	}
+
+	return append(blobVector,
+		testBlob{
+			content:  content,
+			blobName: blobName,
+		})
+}
+
+// readBlob() returns a substring of the content of the blob named blobName in bs.
+// The return values are:
+// - the "size" bytes from the content, starting at the given "offset",
+//   measured from "whence" (as defined by io.Seeker.Seek).
+// - the position to which BlobBeader seeks to,
+// - the md5 hash of the bytes read, and
+// - the md5 hash of the bytes of the blob, as returned by BlobReader.Hash(),
+// - and error.
+func readBlob(ctx *context.T, bs localblobstore.BlobStore, blobName string,
+	size int64, offset int64, whence int) (content []byte, pos int64, hash []byte, fullHash []byte, err error) {
+
+	var br localblobstore.BlobReader
+	hasher := md5.New()
+	br, err = bs.NewBlobReader(ctx, blobName)
+	if err == nil {
+		buf := make([]byte, 8192, 8192)
+		fullHash = br.Hash()
+		pos, err = br.Seek(offset, whence)
+		if err == nil {
+			var n int
+			first := true // Read at least once, to test reading zero bytes.
+			for err == nil && (size == -1 || int64(len(content)) < size || first) {
+				// Read just what was asked for.
+				var toRead []byte = buf
+				if size >= 0 && int(size)-len(content) < len(buf) {
+					toRead = buf[0 : int(size)-len(content)]
+				}
+				n, err = br.Read(toRead)
+				hasher.Write(toRead[0:n])
+				if size >= 0 && int64(len(content)+n) > size {
+					n = int(size) - len(content)
+				}
+				content = append(content, toRead[0:n]...)
+				first = false
+			}
+		}
+		br.Close()
+	}
+	return content, pos, hasher.Sum(nil), fullHash, err
+}
+
+// checkWrittenBlobsAreReadable() checks that the blobs in blobVector[] can be
+// read, and that they contain the appropriate data.
+func checkWrittenBlobsAreReadable(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob) {
+	for i := range blobVector {
+		var size int64
+		data := blobVector[i].content
+		dataLen := int64(len(data))
+		blobName := blobVector[i].blobName
+		for size = -1; size != dataLen+1; size++ {
+			var offset int64
+			for offset = -dataLen - 1; offset != dataLen+1; offset++ {
+				for whence := -1; whence != 4; whence++ {
+					content, pos, hash, fullHash, err := readBlob(ctx, bs, blobName, size, offset, whence)
+
+					// Compute expected seek position.
+					expectedPos := offset
+					if whence == 2 {
+						expectedPos += dataLen
+					}
+
+					// Computed expected size.
+					expectedSize := size
+					if expectedSize == -1 || expectedPos+expectedSize > dataLen {
+						expectedSize = dataLen - expectedPos
+					}
+
+					// Check that reads behave as expected.
+					if (whence == -1 || whence == 3) &&
+						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errBadSeekWhence" {
+						// Expected error from bad "whence" value.
+					} else if expectedPos < 0 &&
+						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errNegativeSeekPosition" {
+						// Expected error from negative Seek position.
+					} else if expectedPos > dataLen &&
+						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errIllegalPositionForRead" {
+						// Expected error from too high a Seek position.
+					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == io.EOF &&
+						pos == expectedPos && expectedPos+expectedSize == dataLen {
+						// Expected success with EOF.
+					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == nil &&
+						pos == expectedPos && expectedPos+expectedSize != dataLen {
+						if pos == 0 && size == -1 && bytes.Compare(hash, fullHash) != 0 {
+							t.Errorf("localblobstore read test on %q size %d offset %d whence %d; got hash %v, expected %v  (blob is %q)",
+								string(data), size, offset, whence,
+								hash, fullHash, blobName)
+						} // Else expected success without EOF.
+					} else {
+						t.Errorf("localblobstore read test on %q size %d offset %d whence %d yields %q pos %d %v   (blob is %q)",
+							string(data), size, offset, whence,
+							content, pos, err, blobName)
+					}
+				}
+			}
+		}
+	}
+}
+
+// checkAllBlobs() checks all the blobs in bs to ensure they correspond to
+// those in blobVector[].
+func checkAllBlobs(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob, testDirName string) {
+	blobCount := 0
+	iterator := bs.ListBlobIds(ctx)
+	for iterator.Advance() {
+		fileName := iterator.Value()
+		i := 0
+		for ; i != len(blobVector) && fileName != blobVector[i].blobName; i++ {
+		}
+		if i == len(blobVector) {
+			t.Errorf("localblobstore.ListBlobIds found unexpected file %s", fileName)
+		} else {
+			content, pos, hash, fullHash, err := readBlob(ctx, bs, fileName, -1, 0, 0)
+			if err != nil && err != io.EOF {
+				t.Errorf("localblobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+			} else if bytes.Compare(blobVector[i].content, content) != 0 {
+				t.Errorf("localblobstore.ListCAIds found unexpected blob content: %q, contains %q, expected %q",
+					filepath.Join(testDirName, fileName), content, string(blobVector[i].content))
+			} else if pos != 0 {
+				t.Errorf("localblobstore.ListCAIds Seek on %q returned %d instead of 0",
+					filepath.Join(testDirName, fileName), pos)
+			}
+			if bytes.Compare(hash, fullHash) != 0 {
+				t.Errorf("localblobstore.ListCAIds read on %q; got hash %v, expected %v",
+					fileName, hash, fullHash)
+			}
+		}
+		blobCount++
+	}
+	if iterator.Err() != nil {
+		t.Errorf("localblobstore.ListBlobIds iteration failed: %v", iterator.Err())
+	}
+	if blobCount != len(blobVector) {
+		t.Errorf("localblobstore.ListBlobIds iteration expected 4 files, got %d", blobCount)
+	}
+}
+
+// checkFragments() checks all the fragments in bs to ensure they
+// correspond to those fragmentMap[], iff testDirName is non-empty.
+func checkFragments(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, fragmentMap map[string]bool, testDirName string) {
+	if testDirName != "" {
+		caCount := 0
+		iterator := bs.ListCAIds(ctx)
+		for iterator.Advance() {
+			fileName := iterator.Value()
+			content, err := ioutil.ReadFile(filepath.Join(testDirName, fileName))
+			if err != nil && err != io.EOF {
+				t.Errorf("localblobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+			} else if !fragmentMap[string(content)] {
+				t.Errorf("localblobstore.ListCAIds found unexpected fragment entry: %q, contains %q", filepath.Join(testDirName, fileName), content)
+			} else {
+				hasher := md5.New()
+				hasher.Write(content)
+				hash := hasher.Sum(nil)
+				nameFromContent := filepath.Join("cas",
+					fmt.Sprintf("%02x", hash[0]),
+					fmt.Sprintf("%02x", hash[1]),
+					fmt.Sprintf("%02x", hash[2]),
+					fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+						hash[3],
+						hash[4], hash[5], hash[6], hash[7],
+						hash[8], hash[9], hash[10], hash[11],
+						hash[12], hash[13], hash[14], hash[15]))
+				if nameFromContent != fileName {
+					t.Errorf("localblobstore.ListCAIds hash of fragment: got %q, expected %q (content=%s)", nameFromContent, fileName, string(content))
+				}
+			}
+			caCount++
+		}
+		if iterator.Err() != nil {
+			t.Errorf("localblobstore.ListCAIds iteration failed: %v", iterator.Err())
+		}
+		if caCount != len(fragmentMap) {
+			t.Errorf("localblobstore.ListCAIds iteration expected %d files, got %d", len(fragmentMap), caCount)
+		}
+	}
+}
+
+// AddRetrieveAndDelete() tests adding, retrieving, and deleting blobs from a
+// blobstore bs.  One can't retrieve or delete something that hasn't been
+// created, so it's all done in one routine.    If testDirName is non-empty,
+// the blobstore is assumed to be accessible in the file system, and its
+// files are checked.
+func AddRetrieveAndDelete(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, testDirName string) {
+	var err error
+
+	// Check that there are no files in the blobstore we were given.
+	iterator := bs.ListBlobIds(ctx)
+	for iterator.Advance() {
+		fileName := iterator.Value()
+		t.Errorf("unexpected file %q\n", fileName)
+	}
+	if iterator.Err() != nil {
+		t.Errorf("localblobstore.ListBlobIds iteration failed: %v", iterator.Err())
+	}
+
+	// Create the strings:  "wom", "bat", "wombat", "batwom", "atwo", "atwoatwoombatatwo".
+	womData := []byte("wom")
+	batData := []byte("bat")
+	wombatData := []byte("wombat")
+	batwomData := []byte("batwom")
+	atwoData := []byte("atwo")
+	atwoatwoombatatwoData := []byte("atwoatwoombatatwo")
+
+	// fragmentMap will have an entry per content-addressed fragment.
+	fragmentMap := make(map[string]bool)
+
+	// Create the blobs, by various means.
+
+	var blobVector []testBlob // Accumulate the blobs we create here.
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		womData, false,
+		blobOrBlockOrFile{block: womData})
+	womName := blobVector[len(blobVector)-1].blobName
+	fragmentMap[string(womData)] = true
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		batData, false,
+		blobOrBlockOrFile{block: batData})
+	batName := blobVector[len(blobVector)-1].blobName
+	fragmentMap[string(batData)] = true
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		wombatData, false,
+		blobOrBlockOrFile{block: wombatData})
+	firstWombatName := blobVector[len(blobVector)-1].blobName
+	fragmentMap[string(wombatData)] = true
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		wombatData, true,
+		blobOrBlockOrFile{block: womData},
+		blobOrBlockOrFile{block: batData})
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		wombatData, false,
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   -1,
+			offset: 0})
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		wombatData, false,
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   6,
+			offset: 0})
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		batwomData, false,
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   3,
+			offset: 3},
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   3,
+			offset: 0})
+	batwomName := blobVector[len(blobVector)-1].blobName
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		atwoData, false,
+		blobOrBlockOrFile{
+			blob:   batwomName,
+			size:   4,
+			offset: 1})
+	atwoName := blobVector[len(blobVector)-1].blobName
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		atwoatwoombatatwoData, true,
+		blobOrBlockOrFile{
+			blob:   atwoName,
+			size:   -1,
+			offset: 0},
+		blobOrBlockOrFile{
+			blob:   atwoName,
+			size:   4,
+			offset: 0},
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   -1,
+			offset: 1},
+		blobOrBlockOrFile{
+			blob:   batName,
+			size:   -1,
+			offset: 1},
+		blobOrBlockOrFile{
+			blob:   womName,
+			size:   2,
+			offset: 0})
+	atwoatwoombatatwoName := blobVector[len(blobVector)-1].blobName
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Nothing should change if we garbage collect.
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Ensure that deleting non-existent blobs fails.
+	err = bs.DeleteBlob(ctx, "../../../../etc/passwd")
+	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+	}
+	err = bs.DeleteBlob(ctx, "foo/00/00/00/00000000000000000000000000")
+	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+	}
+
+	// -------------------------------------------------
+	// Delete a blob.
+	err = bs.DeleteBlob(ctx, batName)
+	if err != nil {
+		t.Errorf("DeleteBlob failed to delete blob %q: %v", batName, err)
+	}
+	blobVector = removeBlobFromBlobVector(blobVector, batName)
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Nothing should change if we garbage collect.
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Open a BlobReader on a blob we're about to delete,
+	// so its fragments won't be garbage collected.
+
+	var br localblobstore.BlobReader
+	br, err = bs.NewBlobReader(ctx, atwoatwoombatatwoName)
+	if err != nil {
+		t.Errorf("NewBlobReader failed in blob %q: %v", atwoatwoombatatwoName, err)
+	}
+
+	// -------------------------------------------------
+	// Delete a blob.  This should be the last on-disc reference to the
+	// content-addressed fragment "bat", but the fragment won't be deleted
+	// until close the reader and garbage collect.
+	err = bs.DeleteBlob(ctx, atwoatwoombatatwoName)
+	if err != nil {
+		t.Errorf("DeleteBlob failed to delete blob %q: %v", atwoatwoombatatwoName, err)
+	}
+	blobVector = removeBlobFromBlobVector(blobVector, atwoatwoombatatwoName)
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Garbage collection should change nothing; the fragment involved
+	// is still referenced from the open reader *br.
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+
+	// Close the open BlobReader and garbage collect.
+	err = br.Close()
+	if err != nil {
+		t.Errorf("BlobReader.Close failed on blob %q: %v", atwoatwoombatatwoName, err)
+	}
+	delete(fragmentMap, string(batData))
+
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Delete all blobs.
+	for len(blobVector) != 0 {
+		err = bs.DeleteBlob(ctx, blobVector[0].blobName)
+		if err != nil {
+			t.Errorf("DeleteBlob failed to delete blob %q: %v", blobVector[0].blobName, err)
+		}
+		blobVector = removeBlobFromBlobVector(blobVector, blobVector[0].blobName)
+	}
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// The remaining fragments should be removed when we garbage collect.
+	for frag := range fragmentMap {
+		delete(fragmentMap, frag)
+	}
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+}
diff --git a/services/syncbase/localblobstore/model.go b/services/syncbase/localblobstore/model.go
new file mode 100644
index 0000000..b1a4565
--- /dev/null
+++ b/services/syncbase/localblobstore/model.go
@@ -0,0 +1,170 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package localblobstore is the interface to a local blob store.
+// Implementations include fs_cablobstore.
+package localblobstore
+
+import "v.io/v23/context"
+
+// A BlobStore represents a simple, content-addressable store.
+type BlobStore interface {
+	// NewBlobReader() returns a pointer to a newly allocated BlobReader on
+	// the specified blobName.  BlobReaders should not be used concurrently
+	// by multiple threads.  Returned handles should be closed with
+	// Close().
+	NewBlobReader(ctx *context.T, blobName string) (br BlobReader, err error)
+
+	// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
+	// a newly created blob name, which can be found using the Name()
+	// method.  BlobWriters should not be used concurrently by multiple
+	// threads.  The returned handle should be closed with either the
+	// Close() or CloseWithoutFinalize() method to avoid leaking file
+	// handles.
+	NewBlobWriter(ctx *context.T) (bw BlobWriter, err error)
+
+	// ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on
+	// an old, but unfinalized blob name.
+	ResumeBlobWriter(ctx *context.T, blobName string) (bw BlobWriter, err error)
+
+	// DeleteBlob() deletes the named blob from the BlobStore.
+	DeleteBlob(ctx *context.T, blobName string) (err error)
+
+	// GC() removes old temp files and content-addressed blocks that are no
+	// longer referenced by any blob.  It may be called concurrently with
+	// other calls to GC(), and with uses of BlobReaders and BlobWriters.
+	GC(ctx *context.T) error
+
+	// ListBlobIds() returns an iterator that can be used to enumerate the
+	// blobs in a BlobStore.  Expected use is:
+	//
+	//	iter := bs.ListBlobIds(ctx)
+	//	for iter.Advance() {
+	//	  // Process iter.Value() here.
+	//	}
+	//	if iter.Err() != nil {
+	//	  // The loop terminated early due to an error.
+	//	}
+	ListBlobIds(ctx *context.T) (iter Iter)
+
+	// ListCAIds() returns an iterator that can be used to enumerate the
+	// content-addressable fragments in a BlobStore.  Expected use is:
+	//
+	//	iter := bs.ListCAIds(ctx)
+	//	for iter.Advance() {
+	//	  // Process iter.Value() here.
+	//	}
+	//	if iter.Err() != nil {
+	//	  // The loop terminated early due to an error.
+	//	}
+	ListCAIds(ctx *context.T) (iter Iter)
+
+	// Root() returns the name of the root directory where the BlobStore is stored.
+	Root() string
+}
+
+// A BlobReader allows a blob to be read using the standard ReadAt(), Read(),
+// and Seek() calls.  A BlobReader can be created with NewBlobReader(), and
+// should be closed with the Close() method to avoid leaking file handles.
+type BlobReader interface {
+	// ReadAt() fills b[] with up to len(b) bytes of data starting at
+	// position "at" within the blob that the BlobReader indicates, and
+	// returns the number of bytes read.
+	ReadAt(b []byte, at int64) (n int, err error)
+
+	// Read() fills b[] with up to len(b) bytes of data starting at the
+	// current seek position of the BlobReader within the blob that the
+	// BlobReader indicates, and then both returns the number of bytes read
+	// and advances the BlobReader's seek position by that amount.
+	Read(b []byte) (n int, err error)
+
+	// Seek() sets the seek position of the BlobReader to offset if
+	// whence==0, offset+current_seek_position if whence==1, and
+	// offset+end_of_blob if whence==2, and then returns the current seek
+	// position.
+	Seek(offset int64, whence int) (result int64, err error)
+
+	// Close() indicates that the client will perform no further operations
+	// on the BlobReader.  It releases any resources held by the
+	// BlobReader.
+	Close() error
+
+	// Name() returns the BlobReader's name.
+	Name() string
+
+	// Size() returns the BlobReader's size.
+	Size() int64
+
+	// IsFinalized() returns whether the BlobReader has been finalized.
+	IsFinalized() bool
+
+	// Hash() returns the BlobReader's hash.  It may be nil if the blob is
+	// not finalized.
+	Hash() []byte
+}
+
+// A BlockOrFile represents a vector of bytes, and contains either a data
+// block (as a []byte), or a (file name, size, offset) triple.
+type BlockOrFile struct {
+	Block    []byte // If FileName is empty, the bytes represented.
+	FileName string // If non-empty, the name of the file containing the bytes.
+	Size     int64  // If FileName is non-empty, the number of bytes (or -1 for "all")
+	Offset   int64  // If FileName is non-empty, the offset of the relevant bytes within the file.
+}
+
+// A BlobWriter allows a blob to be written.  If a blob has not yet been
+// finalized, it also allows that blob to be extended.  A BlobWriter may be
+// created with NewBlobWriter(), and should be closed with Close() or
+// CloseWithoutFinalize().
+type BlobWriter interface {
+	// AppendBlob() adds a (substring of a) pre-existing blob to the blob
+	// being written by the BlobWriter.  The fragments of the pre-existing
+	// blob are not physically copied; they are referenced by both blobs.
+	AppendBlob(blobName string, size int64, offset int64) (err error)
+
+	// AppendFragment() appends a fragment to the blob being written by the
+	// BlobWriter, where the fragment is composed of the byte vectors
+	// described by the elements of item[].  The fragment is copied into
+	// the blob store.
+	AppendFragment(item ...BlockOrFile) (err error)
+
+	// Close() finalizes the BlobWriter, and indicates that the client will
+	// perform no further append operations on the BlobWriter.  Any
+	// internal open file handles are closed.
+	Close() (err error)
+
+	// CloseWithoutFinalize() indicates that the client will perform no
+	// further append operations on the BlobWriter, but does not finalize
+	// the blob.  Any internal open file handles are closed.  Clients are
+	// expected to need this operation infrequently.
+	CloseWithoutFinalize() (err error)
+
+	// Name() returns the BlobWriter's name.
+	Name() string
+
+	// Size() returns the BlobWriter's size.
+	Size() int64
+
+	// IsFinalized() returns whether the BlobWriter has been finalized.
+	IsFinalized() bool
+
+	// Hash() returns the BlobWriter's hash, reflecting the bytes written so far.
+	Hash() []byte
+}
+
+// A Iter represents an iterator that allows the client to enumerate
+// all the blobs of fragments in a BlobStore.
+type Iter interface {
+	// Advance() stages an item so that it may be retrieved via Value.
+	// Returns true iff there is an item to retrieve.  Advance must be
+	// called before Value is called.
+	Advance() (advanced bool)
+
+	// Value() returns the item that was staged by Advance.  May panic if
+	// Advance returned false or was not called.  Never blocks.
+	Value() (name string)
+
+	// Err() returns any error encountered by Advance.  Never blocks.
+	Err() error
+}
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index b35afce..827eaf9 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -161,7 +161,7 @@
 			return err
 		}
 		// Check for "database already exists".
-		if _, err := a.getDbInfo(ctx, call, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -172,7 +172,7 @@
 		info := &dbInfo{
 			Name: dbName,
 		}
-		return a.putDbInfo(ctx, call, st, dbName, info)
+		return a.putDbInfo(ctx, st, dbName, info)
 	}); err != nil {
 		return err
 	}
@@ -192,7 +192,7 @@
 
 	// 3. Flip dbInfo.Initialized to true.
 	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
 			info.Initialized = true
 			return nil
 		})
@@ -234,7 +234,7 @@
 
 	// 2. Flip dbInfo.Deleted to true.
 	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
 			info.Deleted = true
 			return nil
 		})
@@ -251,7 +251,7 @@
 	}
 
 	// 4. Delete dbInfo record.
-	if err := a.delDbInfo(ctx, call, a.s.st, dbName); err != nil {
+	if err := a.delDbInfo(ctx, a.s.st, dbName); err != nil {
 		return err
 	}
 
diff --git a/services/syncbase/server/db_info.go b/services/syncbase/server/db_info.go
index 98216bc..cbfcebd 100644
--- a/services/syncbase/server/db_info.go
+++ b/services/syncbase/server/db_info.go
@@ -17,7 +17,6 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
-	"v.io/v23/rpc"
 )
 
 type dbInfoLayer struct {
@@ -49,9 +48,9 @@
 
 // getDbInfo reads data from the storage engine.
 // Returns a VDL-compatible error.
-func (a *app) getDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
 	info := &dbInfo{}
-	if err := util.GetWithoutAuth(ctx, call, st, &dbInfoLayer{dbName, a}, info); err != nil {
+	if err := util.GetWithoutAuth(ctx, st, &dbInfoLayer{dbName, a}, info); err != nil {
 		return nil, err
 	}
 	return info, nil
@@ -59,27 +58,27 @@
 
 // putDbInfo writes data to the storage engine.
 // Returns a VDL-compatible error.
-func (a *app) putDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string, info *dbInfo) error {
-	return util.Put(ctx, call, st, &dbInfoLayer{dbName, a}, info)
+func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
+	return util.Put(ctx, st, &dbInfoLayer{dbName, a}, info)
 }
 
 // delDbInfo deletes data from the storage engine.
 // Returns a VDL-compatible error.
-func (a *app) delDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string) error {
-	return util.Delete(ctx, call, st, &dbInfoLayer{dbName, a})
+func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
+	return util.Delete(ctx, st, &dbInfoLayer{dbName, a})
 }
 
 // updateDbInfo performs a read-modify-write.
 // fn should "modify" v, and should return a VDL-compatible error.
 // Returns a VDL-compatible error.
-func (a *app) updateDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
+func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
 	_ = st.(store.Transaction) // panics on failure, as desired
-	info, err := a.getDbInfo(ctx, call, st, dbName)
+	info, err := a.getDbInfo(ctx, st, dbName)
 	if err != nil {
 		return err
 	}
 	if err := fn(info); err != nil {
 		return err
 	}
-	return a.putDbInfo(ctx, call, st, dbName, info)
+	return a.putDbInfo(ctx, st, dbName, info)
 }
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 0971923..d18ab04 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -15,7 +15,6 @@
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/nosql/query_db"
 	"v.io/syncbase/v23/syncbase/nosql/query_exec"
-	prefixutil "v.io/syncbase/v23/syncbase/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -103,7 +102,7 @@
 		Name:  d.name,
 		Perms: opts.Perms,
 	}
-	if err := util.Put(ctx, call, d.st, d, data); err != nil {
+	if err := util.Put(ctx, d.st, d, data); err != nil {
 		return nil, err
 	}
 	return d, nil
@@ -404,20 +403,20 @@
 	req *tableReq
 }
 
-func (t *tableDb) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t *tableDb) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
 	return &kvs{
-		t:        t,
-		prefixes: prefixes,
-		curr:     -1,
-		validRow: false,
-		it:       nil,
-		err:      nil,
+		t:         t,
+		keyRanges: keyRanges,
+		curr:      -1,
+		validRow:  false,
+		it:        nil,
+		err:       nil,
 	}, nil
 }
 
 type kvs struct {
 	t         *tableDb
-	prefixes  []string
+	keyRanges query_db.KeyRanges
 	curr      int // current index into prefixes, -1 at start
 	validRow  bool
 	currKey   string
@@ -433,16 +432,22 @@
 	if s.curr == -1 {
 		s.curr++
 	}
-	for s.curr < len(s.prefixes) {
+	for s.curr < len(s.keyRanges) {
 		if s.it == nil {
-			start := prefixutil.PrefixRangeStart(s.prefixes[s.curr])
-			limit := prefixutil.PrefixRangeLimit(s.prefixes[s.curr])
-			s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), string(start), string(limit)))
+			start := s.keyRanges[s.curr].Start
+			limit := s.keyRanges[s.curr].Limit
+			// 0-255 means examine all rows
+			if start == string([]byte{0}) && limit == string([]byte{255}) {
+				start = ""
+				limit = ""
+			}
+			s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), start, limit))
 		}
 		if s.it.Advance() {
 			// key
 			keyBytes := s.it.Key(nil)
 			parts := util.SplitKeyParts(string(keyBytes))
+			// TODO(rogulenko): Check access for the key.
 			s.currKey = parts[len(parts)-1]
 			// value
 			valueBytes := s.it.Value(nil)
@@ -489,8 +494,8 @@
 		s.it.Cancel()
 		s.it = nil
 	}
-	// set curr to end of prefixes so Advance will return false
-	s.curr = len(s.prefixes)
+	// set curr to end of keyRanges so Advance will return false
+	s.curr = len(s.keyRanges)
 }
 
 ////////////////////////////////////////
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index 3397fd1..542224a 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -91,11 +91,10 @@
 }
 
 // checkAccess checks that this row's table exists in the database, and performs
-// an authorization check (currently against the table perms).
+// an authorization check.
 // Returns a VDL-compatible error.
-// TODO(sadovsky): Use prefix permissions.
 func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
-	return util.Get(ctx, call, st, r.t, &tableData{})
+	return r.t.checkAccess(ctx, call, st, r.key)
 }
 
 // get reads data from the storage engine.
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 7619fc4..82a0cab 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -5,6 +5,8 @@
 package nosql
 
 import (
+	"strings"
+
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
@@ -12,6 +14,7 @@
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 )
 
 // tableReq is a per-request object that handles Table RPCs.
@@ -39,7 +42,7 @@
 			return err
 		}
 		// Check for "table already exists".
-		if err := util.GetWithoutAuth(ctx, call, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.GetWithoutAuth(ctx, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -54,7 +57,7 @@
 			Name:  t.name,
 			Perms: perms,
 		}
-		return util.Put(ctx, call, st, t, data)
+		return util.Put(ctx, st, t, data)
 	})
 }
 
@@ -71,20 +74,30 @@
 			return err
 		}
 		// TODO(sadovsky): Delete all rows in this table.
-		return util.Delete(ctx, call, st, t)
+		return util.Delete(ctx, st, t)
 	})
 }
 
 func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
 	impl := func(st store.StoreReadWriter) error {
-		// Check perms.
-		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+		// Check for table-level access before doing a scan.
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			return err
 		}
 		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
 		key := []byte{}
 		for it.Advance() {
 			key = it.Key(key)
+			// Check perms.
+			parts := util.SplitKeyParts(string(key))
+			externalKey := parts[len(parts)-1]
+			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+				// TODO(rogulenko): Revisit this behavior. Probably we should
+				// delete all rows that we have access to.
+				it.Cancel()
+				return err
+			}
+			// Delete the key-value pair.
 			if err := st.Delete(key); err != nil {
 				return verror.New(verror.ErrInternal, ctx, err)
 			}
@@ -107,8 +120,8 @@
 
 func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
 	impl := func(st store.StoreReader) error {
-		// Check perms.
-		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+		// Check for table-level access before doing a scan.
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			return err
 		}
 		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
@@ -116,8 +129,14 @@
 		key, value := []byte{}, []byte{}
 		for it.Advance() {
 			key, value = it.Key(key), it.Value(value)
+			// Check perms.
 			parts := util.SplitKeyParts(string(key))
-			sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+			externalKey := parts[len(parts)-1]
+			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+				it.Cancel()
+				return err
+			}
+			sender.Send(wire.KeyValue{Key: externalKey, Value: value})
 		}
 		if err := it.Err(); err != nil {
 			return verror.New(verror.ErrInternal, ctx, err)
@@ -135,38 +154,27 @@
 	return impl(st)
 }
 
-func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
-	if prefix != "" {
-		return verror.NewErrNotImplemented(ctx)
-	}
-	impl := func(st store.StoreReadWriter) error {
-		data := &tableData{}
-		return util.Update(ctx, call, st, t, data, func() error {
-			data.Perms = perms
-			return nil
-		})
-	}
-	if t.d.batchId != nil {
-		if st, err := t.d.batchReadWriter(); err != nil {
-			return err
-		} else {
-			return impl(st)
-		}
-	} else {
-		return store.RunInTransaction(t.d.st, impl)
-	}
-}
-
 func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
-	if key != "" {
-		return nil, verror.NewErrNotImplemented(ctx)
-	}
 	impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
-		data := &tableData{}
-		if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+		// Check permissions only at table level.
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			return nil, err
 		}
-		return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+		// Get the most specific permissions object.
+		prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+		if err != nil {
+			return nil, err
+		}
+		result := []wire.PrefixPermissions{{Prefix: prefix, Perms: prefixPerms.Perms}}
+		// Collect all parent permissions objects all the way up to the table level.
+		for prefix != "" {
+			prefix = prefixPerms.Parent
+			if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+				return nil, err
+			}
+			result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
+		}
+		return result, nil
 	}
 	var st store.StoreReader
 	if t.d.batchId != nil {
@@ -179,17 +187,114 @@
 	return impl(st)
 }
 
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+	impl := func(st store.StoreReadWriter) error {
+		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+			return err
+		}
+		// Concurrent transactions that touch this table should fail with
+		// ErrConcurrentTransaction when this transaction commits.
+		if err := t.lock(ctx, st); err != nil {
+			return err
+		}
+		if prefix == "" {
+			data := &tableData{}
+			return util.Update(ctx, call, st, t, data, func() error {
+				data.Perms = perms
+				return nil
+			})
+		}
+		// Get the most specific permissions object.
+		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		if err != nil {
+			return err
+		}
+		// In case there is no permissions object for the given prefix, we need
+		// to add a new node to the prefix permissions tree. We do it by updating
+		// parents for all children of the prefix to the node corresponding to
+		// the prefix.
+		if parent != prefix {
+			if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+				return err
+			}
+		} else {
+			parent = prefixPerms.Parent
+		}
+		stPrefix := t.prefixPermsKey(prefix)
+		stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
+		prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
+		// Put the (prefix, perms) pair to the database.
+		if err := util.PutObject(st, stPrefix, prefixPerms); err != nil {
+			return err
+		}
+		return util.PutObject(st, stPrefixLimit, prefixPerms)
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
+}
+
 func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
-	return verror.NewErrNotImplemented(ctx)
+	if prefix == "" {
+		return verror.New(verror.ErrBadArg, ctx, prefix)
+	}
+	impl := func(st store.StoreReadWriter) error {
+		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+			return err
+		}
+		// Concurrent transactions that touch this table should fail with
+		// ErrConcurrentTransaction when this transaction commits.
+		if err := t.lock(ctx, st); err != nil {
+			return err
+		}
+		// Get the most specific permissions object.
+		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		if err != nil {
+			return err
+		}
+		if parent != prefix {
+			// This can happen only if there is no permissions object for the
+			// given prefix. Since DeletePermissions is idempotent, return nil.
+			return nil
+		}
+		// We need to delete the node corresponding to the prefix from the prefix
+		// permissions tree. We do it by updating parents for all children of the
+		// prefix to the parent of the node corresponding to the prefix.
+		if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+			return err
+		}
+		stPrefix := []byte(t.prefixPermsKey(prefix))
+		stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+		if err := st.Delete(stPrefix); err != nil {
+			return err
+		}
+		return st.Delete(stPrefixLimit)
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
 }
 
 func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
 		// Check perms.
-		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			closeStoreReader()
 			return nil, err
 		}
+		// TODO(rogulenko): Check prefix permissions for children.
 		return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
 	}
 	var st store.StoreReader
@@ -226,3 +331,145 @@
 func (t *tableReq) stKeyPart() string {
 	return t.name
 }
+
+// updateParentRefs updates the parent for all children of the given
+// prefix to newParent.
+func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+	stPrefix := []byte(t.prefixPermsKey(prefix))
+	stPrefixStart := append(stPrefix, 0)
+	stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+	it := st.Scan(stPrefixStart, stPrefixLimit)
+	var key, value []byte
+	for it.Advance() {
+		key, value = it.Key(key), it.Value(value)
+		var prefixPerms stPrefixPerms
+		if err := vom.Decode(value, &prefixPerms); err != nil {
+			it.Cancel()
+			return verror.New(verror.ErrInternal, ctx, err)
+		}
+		prefixPerms.Parent = newParent
+		if err := util.PutObject(st, string(key), prefixPerms); err != nil {
+			it.Cancel()
+			return err
+		}
+	}
+	if err := it.Err(); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// lock invalidates all concurrent transactions with ErrConcurrentTransaction
+// that have accessed this table.
+// Returns a VDL-compatible error.
+//
+// It is necessary to call lock() every time prefix permissions are updated,
+// so snapshots inside all transactions reflect up-to-date permissions. Since
+// every public function that touches this table has to read the table-level
+// permissions object, it is enough to add the key of table-level permissions
+// to the write set of the current transaction.
+//
+// TODO(rogulenko): Revisit this behavior to provide more granularity.
+// A possible option would be to add prefix and its parent to the write set
+// of the current transaction when permissions object for a prefix is updated.
+func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+	var data tableData
+	if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+		return err
+	}
+	return util.Put(ctx, st, t, data)
+}
+
+// checkAccess checks that this table exists in the database, and performs
+// an authorization check. The access is checked at table level and at the
+// level of the most specific prefix for the given key.
+// Returns a VDL-compatible error.
+// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
+// access check to be a check for "Resolve", i.e. also check access to
+// service, app and database.
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
+	prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+	if err != nil {
+		return err
+	}
+	if prefix != "" {
+		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+			return err
+		}
+	}
+	auth, _ := access.PermissionsAuthorizer(prefixPerms.Perms, access.TypicalTagType())
+	if err := auth.Authorize(ctx, call.Security()); err != nil {
+		return verror.New(verror.ErrNoAccess, ctx, prefix)
+	}
+	return nil
+}
+
+// permsForKey returns the longest prefix of the given key that has
+// associated permissions with its permissions object.
+// permsForKey doesn't perform an authorization check.
+// Returns a VDL-compatible error.
+//
+// Virtually we represent all prefixes as a forest T, where each vertex maps to
+// a prefix. A parent for a string is the maximum proper prefix of it that
+// belongs to T. Each prefix P from T is represented as a pair of entries with
+// keys P and P~ with values of type stPrefixPerms (parent + perms).
+// High level of how this function works:
+// 1	iter = db.Scan(K, "")
+// 		Here last character of iter.Key() is removed automatically if it is '~'
+// 2	if hasPrefix(K, iter.Key()) return iter.Value()
+// 3	return parent(iter.Key())
+// Short proof:
+// iter returned on line 1 points to one of the following:
+// - a string t that is equal to K;
+// - a string t~: if t is not a prefix of K, then K < t < t~ which
+//   contradicts with property of returned iterator on line 1 => t is prefix of
+//   K; also t is the largest prefix of K, as all larger prefixes of K are
+//   less than t~; in this case line 2 returns correct result;
+// - a string t that doesn't end with '~': it can't be a prefix of K, as all
+//   proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
+//   K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
+//   prefix of K; in this case line 3 returns correct result.
+func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
+	it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+	if !it.Advance() {
+		prefixPerms, err := t.permsForPrefix(ctx, st, "")
+		return "", prefixPerms, err
+	}
+	defer it.Cancel()
+	parts := util.SplitKeyParts(string(it.Key(nil)))
+	prefix := strings.TrimSuffix(parts[len(parts)-1], util.PrefixRangeLimitSuffix)
+	value := it.Value(nil)
+	var prefixPerms stPrefixPerms
+	if err := vom.Decode(value, &prefixPerms); err != nil {
+		return "", stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+	}
+	if strings.HasPrefix(key, prefix) {
+		return prefix, prefixPerms, nil
+	}
+	prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+	return prefixPerms.Parent, prefixPerms, err
+}
+
+// permsForPrefix returns the permissions object associated with the
+// provided prefix.
+// Returns a VDL-compatible error.
+func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+	if prefix == "" {
+		var data tableData
+		if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+			return stPrefixPerms{}, err
+		}
+		return stPrefixPerms{Perms: data.Perms}, nil
+	}
+	var prefixPerms stPrefixPerms
+	if err := util.GetObject(st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+		return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return prefixPerms, nil
+}
+
+// prefixPermsKey returns the key used for storing permissions for the given
+// prefix in the table.
+func (t *tableReq) prefixPermsKey(prefix string) string {
+	return util.JoinKeyParts(util.PermsPrefix, t.name, prefix)
+}
diff --git a/services/syncbase/server/nosql/types.vdl b/services/syncbase/server/nosql/types.vdl
index 4a8a328..33d3883 100644
--- a/services/syncbase/server/nosql/types.vdl
+++ b/services/syncbase/server/nosql/types.vdl
@@ -21,3 +21,17 @@
 	Name  string
 	Perms access.Permissions
 }
+
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key"  - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+	Parent string
+	Perms  access.Permissions
+}
diff --git a/services/syncbase/server/nosql/types.vdl.go b/services/syncbase/server/nosql/types.vdl.go
index 313d982..021cef6 100644
--- a/services/syncbase/server/nosql/types.vdl.go
+++ b/services/syncbase/server/nosql/types.vdl.go
@@ -39,7 +39,27 @@
 }) {
 }
 
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key"  - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+	Parent string
+	Perms  access.Permissions
+}
+
+func (stPrefixPerms) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/nosql.stPrefixPerms"`
+}) {
+}
+
 func init() {
 	vdl.Register((*databaseData)(nil))
 	vdl.Register((*tableData)(nil))
+	vdl.Register((*stPrefixPerms)(nil))
 }
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index e551431..135ee35 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -68,7 +68,7 @@
 	data := &serviceData{
 		Perms: opts.Perms,
 	}
-	if err := util.Put(ctx, call, s.st, s, data); err != nil {
+	if err := util.Put(ctx, s.st, s, data); err != nil {
 		return nil, err
 	}
 	if s.sync, err = vsync.New(ctx, call, s); err != nil {
@@ -178,7 +178,7 @@
 			return err
 		}
 		// Check for "app already exists".
-		if err := util.GetWithoutAuth(ctx, call, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.GetWithoutAuth(ctx, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -192,7 +192,7 @@
 			Name:  appName,
 			Perms: perms,
 		}
-		return util.Put(ctx, call, st, a, data)
+		return util.Put(ctx, st, a, data)
 	}); err != nil {
 		return err
 	}
@@ -218,7 +218,7 @@
 			return err
 		}
 		// TODO(sadovsky): Delete all databases in this app.
-		return util.Delete(ctx, call, st, a)
+		return util.Delete(ctx, st, a)
 	}); err != nil {
 		return err
 	}
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 44d0ff6..9251375 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -12,6 +12,7 @@
 	DatabasePrefix = "$database"
 	DbInfoPrefix   = "$dbInfo"
 	LogPrefix      = "$log"
+	PermsPrefix    = "$perms"
 	RowPrefix      = "$row"
 	ServicePrefix  = "$service"
 	SyncPrefix     = "$sync"
@@ -27,4 +28,8 @@
 	BatchSep = ":"
 	// Separator for parts of storage engine keys.
 	KeyPartSep = ":"
+	// PrefixRangeLimitSuffix is the suffix of a key which indicates the end of
+	// a prefix range. Should be more than any regular key in the store.
+	// TODO(rogulenko): Change this constant to something out of the UTF8 space.
+	PrefixRangeLimitSuffix = "~"
 )
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index a495ebd..fb739ca 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -47,7 +47,7 @@
 
 // GetWithoutAuth does st.Get(l.StKey(), v), populating v.
 // Returns a VDL-compatible error.
-func GetWithoutAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v interface{}) error {
+func GetWithoutAuth(ctx *context.T, st store.StoreReader, l Layer, v interface{}) error {
 	if err := GetObject(st, l.StKey(), v); err != nil {
 		if verror.ErrorID(err) == store.ErrUnknownKey.ID {
 			return verror.New(verror.ErrNoExist, ctx, l.Name())
@@ -60,7 +60,7 @@
 // Get does GetWithoutAuth followed by an auth check.
 // Returns a VDL-compatible error.
 func Get(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v Permser) error {
-	if err := GetWithoutAuth(ctx, call, st, l, v); err != nil {
+	if err := GetWithoutAuth(ctx, st, l, v); err != nil {
 		return err
 	}
 	auth, _ := access.PermissionsAuthorizer(v.GetPerms(), access.TypicalTagType())
@@ -73,7 +73,7 @@
 // Put does st.Put(l.StKey(), v).
 // Returns a VDL-compatible error.
 // If you need to perform an authorization check, use Update().
-func Put(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer, v interface{}) error {
+func Put(ctx *context.T, st store.StoreWriter, l Layer, v interface{}) error {
 	if err := PutObject(st, l.StKey(), v); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
@@ -83,7 +83,7 @@
 // Delete does st.Delete(l.StKey()).
 // Returns a VDL-compatible error.
 // If you need to perform an authorization check, call Get() first.
-func Delete(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer) error {
+func Delete(ctx *context.T, st store.StoreWriter, l Layer) error {
 	if err := st.Delete([]byte(l.StKey())); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
@@ -103,7 +103,7 @@
 	if err := fn(); err != nil {
 		return err
 	}
-	return Put(ctx, call, st, l, v)
+	return Put(ctx, st, l, v)
 }
 
 ////////////////////////////////////////////////////////////
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index df96244..a75b471 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -19,6 +19,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/server"
 	"v.io/x/ref/lib/security/securityflag"
 	"v.io/x/ref/lib/signals"
+	"v.io/x/ref/lib/xrpc"
 	_ "v.io/x/ref/runtime/factories/generic"
 )
 
@@ -44,26 +45,16 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	s, err := v23.NewServer(ctx)
-	if err != nil {
-		vlog.Fatal("v23.NewServer() failed: ", err)
-	}
-	if _, err := s.Listen(v23.GetListenSpec(ctx)); err != nil {
-		vlog.Fatal("s.Listen() failed: ", err)
-	}
-
 	perms, err := securityflag.PermissionsFromFlag()
 	if err != nil {
 		vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
 	}
-
 	if perms != nil {
 		vlog.Info("Using permissions from command line flag.")
 	} else {
 		vlog.Info("No permissions flag provided. Giving local principal all permissions.")
 		perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
 	}
-
 	service, err := server.NewService(nil, nil, server.ServiceOptions{
 		Perms:   perms,
 		RootDir: *rootDir,
@@ -74,9 +65,8 @@
 	}
 	d := server.NewDispatcher(service)
 
-	// Publish the service in the mount table.
-	if err := s.ServeDispatcher(*name, d); err != nil {
-		vlog.Fatal("s.ServeDispatcher() failed: ", err)
+	if _, err = xrpc.NewDispatchingServer(ctx, *name, d); err != nil {
+		vlog.Fatal("xrpc.NewDispatchingServer() failed: ", err)
 	}
 	vlog.Info("Mounted at: ", *name)