Merge "syncbase/vsync: Integrate log and gen vector functionality (Part 2/2)."
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index 8803897..c990976 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -67,10 +67,7 @@
 	Delete() error {access.Write}
 
 	// Delete deletes all rows in the given half-open range [start, limit). If
-	// limit is "", all rows with keys >= start are included. If the last row that
-	// is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
-	// pair is removed.
-	// TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
+	// limit is "", all rows with keys >= start are included.
 	DeleteRowRange(start, limit []byte) error {access.Write}
 
 	// Scan returns all rows in the given half-open range [start, limit). If limit
@@ -78,6 +75,13 @@
 	// from a consistent snapshot taken at the time of the Scan RPC.
 	Scan(start, limit []byte) stream<_, KeyValue> error {access.Read}
 
+	// GetPermissions returns an array of (prefix, perms) pairs. The array is
+	// sorted from longest prefix to shortest, so element zero is the one that
+	// applies to the row with the given key. The last element is always the
+	// prefix "" which represents the table's permissions -- the array will always
+	// have at least one element.
+	GetPermissions(key string) ([]PrefixPermissions | error) {access.Admin}
+
 	// SetPermissions sets the permissions for all current and future rows with
 	// the given prefix. If the prefix overlaps with an existing prefix, the
 	// longest prefix that matches a row applies. For example:
@@ -85,18 +89,8 @@
 	//     SetPermissions(ctx, Prefix("a/b/c"), perms2)
 	// The permissions for row "a/b/1" are perms1, and the permissions for row
 	// "a/b/c/1" are perms2.
-	//
-	// SetPermissions will fail if called with a prefix that does not match any
-	// rows.
 	SetPermissions(prefix string, perms access.Permissions) error {access.Admin}
 
-	// GetPermissions returns an array of (prefix, perms) pairs. The array is
-	// sorted from longest prefix to shortest, so element zero is the one that
-	// applies to the row with the given key. The last element is always the
-	// prefix "" which represents the table's permissions -- the array will always
-	// have at least one element.
-	GetPermissions(key string) ([]PrefixPermissions | error) {access.Admin}
-
 	// DeletePermissions deletes the permissions for the specified prefix. Any
 	// rows covered by this prefix will use the next longest prefix's permissions
 	// (see the array returned by GetPermissions).
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 5319420..2af0a60 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -954,15 +954,18 @@
 	// Delete deletes this Table.
 	Delete(*context.T, ...rpc.CallOpt) error
 	// Delete deletes all rows in the given half-open range [start, limit). If
-	// limit is "", all rows with keys >= start are included. If the last row that
-	// is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
-	// pair is removed.
-	// TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
+	// limit is "", all rows with keys >= start are included.
 	DeleteRowRange(ctx *context.T, start []byte, limit []byte, opts ...rpc.CallOpt) error
 	// Scan returns all rows in the given half-open range [start, limit). If limit
 	// is "", all rows with keys >= start are included. The returned stream reads
 	// from a consistent snapshot taken at the time of the Scan RPC.
 	Scan(ctx *context.T, start []byte, limit []byte, opts ...rpc.CallOpt) (TableScanClientCall, error)
+	// GetPermissions returns an array of (prefix, perms) pairs. The array is
+	// sorted from longest prefix to shortest, so element zero is the one that
+	// applies to the row with the given key. The last element is always the
+	// prefix "" which represents the table's permissions -- the array will always
+	// have at least one element.
+	GetPermissions(ctx *context.T, key string, opts ...rpc.CallOpt) ([]PrefixPermissions, error)
 	// SetPermissions sets the permissions for all current and future rows with
 	// the given prefix. If the prefix overlaps with an existing prefix, the
 	// longest prefix that matches a row applies. For example:
@@ -970,16 +973,7 @@
 	//     SetPermissions(ctx, Prefix("a/b/c"), perms2)
 	// The permissions for row "a/b/1" are perms1, and the permissions for row
 	// "a/b/c/1" are perms2.
-	//
-	// SetPermissions will fail if called with a prefix that does not match any
-	// rows.
 	SetPermissions(ctx *context.T, prefix string, perms access.Permissions, opts ...rpc.CallOpt) error
-	// GetPermissions returns an array of (prefix, perms) pairs. The array is
-	// sorted from longest prefix to shortest, so element zero is the one that
-	// applies to the row with the given key. The last element is always the
-	// prefix "" which represents the table's permissions -- the array will always
-	// have at least one element.
-	GetPermissions(ctx *context.T, key string, opts ...rpc.CallOpt) ([]PrefixPermissions, error)
 	// DeletePermissions deletes the permissions for the specified prefix. Any
 	// rows covered by this prefix will use the next longest prefix's permissions
 	// (see the array returned by GetPermissions).
@@ -1025,13 +1019,13 @@
 	return
 }
 
-func (c implTableClientStub) SetPermissions(ctx *context.T, i0 string, i1 access.Permissions, opts ...rpc.CallOpt) (err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "SetPermissions", []interface{}{i0, i1}, nil, opts...)
+func (c implTableClientStub) GetPermissions(ctx *context.T, i0 string, opts ...rpc.CallOpt) (o0 []PrefixPermissions, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "GetPermissions", []interface{}{i0}, []interface{}{&o0}, opts...)
 	return
 }
 
-func (c implTableClientStub) GetPermissions(ctx *context.T, i0 string, opts ...rpc.CallOpt) (o0 []PrefixPermissions, err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "GetPermissions", []interface{}{i0}, []interface{}{&o0}, opts...)
+func (c implTableClientStub) SetPermissions(ctx *context.T, i0 string, i1 access.Permissions, opts ...rpc.CallOpt) (err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "SetPermissions", []interface{}{i0, i1}, nil, opts...)
 	return
 }
 
@@ -1121,15 +1115,18 @@
 	// Delete deletes this Table.
 	Delete(*context.T, rpc.ServerCall) error
 	// Delete deletes all rows in the given half-open range [start, limit). If
-	// limit is "", all rows with keys >= start are included. If the last row that
-	// is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
-	// pair is removed.
-	// TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
+	// limit is "", all rows with keys >= start are included.
 	DeleteRowRange(ctx *context.T, call rpc.ServerCall, start []byte, limit []byte) error
 	// Scan returns all rows in the given half-open range [start, limit). If limit
 	// is "", all rows with keys >= start are included. The returned stream reads
 	// from a consistent snapshot taken at the time of the Scan RPC.
 	Scan(ctx *context.T, call TableScanServerCall, start []byte, limit []byte) error
+	// GetPermissions returns an array of (prefix, perms) pairs. The array is
+	// sorted from longest prefix to shortest, so element zero is the one that
+	// applies to the row with the given key. The last element is always the
+	// prefix "" which represents the table's permissions -- the array will always
+	// have at least one element.
+	GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]PrefixPermissions, error)
 	// SetPermissions sets the permissions for all current and future rows with
 	// the given prefix. If the prefix overlaps with an existing prefix, the
 	// longest prefix that matches a row applies. For example:
@@ -1137,16 +1134,7 @@
 	//     SetPermissions(ctx, Prefix("a/b/c"), perms2)
 	// The permissions for row "a/b/1" are perms1, and the permissions for row
 	// "a/b/c/1" are perms2.
-	//
-	// SetPermissions will fail if called with a prefix that does not match any
-	// rows.
 	SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error
-	// GetPermissions returns an array of (prefix, perms) pairs. The array is
-	// sorted from longest prefix to shortest, so element zero is the one that
-	// applies to the row with the given key. The last element is always the
-	// prefix "" which represents the table's permissions -- the array will always
-	// have at least one element.
-	GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]PrefixPermissions, error)
 	// DeletePermissions deletes the permissions for the specified prefix. Any
 	// rows covered by this prefix will use the next longest prefix's permissions
 	// (see the array returned by GetPermissions).
@@ -1164,15 +1152,18 @@
 	// Delete deletes this Table.
 	Delete(*context.T, rpc.ServerCall) error
 	// Delete deletes all rows in the given half-open range [start, limit). If
-	// limit is "", all rows with keys >= start are included. If the last row that
-	// is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
-	// pair is removed.
-	// TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
+	// limit is "", all rows with keys >= start are included.
 	DeleteRowRange(ctx *context.T, call rpc.ServerCall, start []byte, limit []byte) error
 	// Scan returns all rows in the given half-open range [start, limit). If limit
 	// is "", all rows with keys >= start are included. The returned stream reads
 	// from a consistent snapshot taken at the time of the Scan RPC.
 	Scan(ctx *context.T, call *TableScanServerCallStub, start []byte, limit []byte) error
+	// GetPermissions returns an array of (prefix, perms) pairs. The array is
+	// sorted from longest prefix to shortest, so element zero is the one that
+	// applies to the row with the given key. The last element is always the
+	// prefix "" which represents the table's permissions -- the array will always
+	// have at least one element.
+	GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]PrefixPermissions, error)
 	// SetPermissions sets the permissions for all current and future rows with
 	// the given prefix. If the prefix overlaps with an existing prefix, the
 	// longest prefix that matches a row applies. For example:
@@ -1180,16 +1171,7 @@
 	//     SetPermissions(ctx, Prefix("a/b/c"), perms2)
 	// The permissions for row "a/b/1" are perms1, and the permissions for row
 	// "a/b/c/1" are perms2.
-	//
-	// SetPermissions will fail if called with a prefix that does not match any
-	// rows.
 	SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error
-	// GetPermissions returns an array of (prefix, perms) pairs. The array is
-	// sorted from longest prefix to shortest, so element zero is the one that
-	// applies to the row with the given key. The last element is always the
-	// prefix "" which represents the table's permissions -- the array will always
-	// have at least one element.
-	GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]PrefixPermissions, error)
 	// DeletePermissions deletes the permissions for the specified prefix. Any
 	// rows covered by this prefix will use the next longest prefix's permissions
 	// (see the array returned by GetPermissions).
@@ -1241,14 +1223,14 @@
 	return s.impl.Scan(ctx, call, i0, i1)
 }
 
-func (s implTableServerStub) SetPermissions(ctx *context.T, call rpc.ServerCall, i0 string, i1 access.Permissions) error {
-	return s.impl.SetPermissions(ctx, call, i0, i1)
-}
-
 func (s implTableServerStub) GetPermissions(ctx *context.T, call rpc.ServerCall, i0 string) ([]PrefixPermissions, error) {
 	return s.impl.GetPermissions(ctx, call, i0)
 }
 
+func (s implTableServerStub) SetPermissions(ctx *context.T, call rpc.ServerCall, i0 string, i1 access.Permissions) error {
+	return s.impl.SetPermissions(ctx, call, i0, i1)
+}
+
 func (s implTableServerStub) DeletePermissions(ctx *context.T, call rpc.ServerCall, i0 string) error {
 	return s.impl.DeletePermissions(ctx, call, i0)
 }
@@ -1285,7 +1267,7 @@
 		},
 		{
 			Name: "DeleteRowRange",
-			Doc:  "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included. If the last row that\n// is covered by a prefix from SetPermissions is deleted, that (prefix, perms)\n// pair is removed.\n// TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.",
+			Doc:  "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.",
 			InArgs: []rpc.ArgDesc{
 				{"start", ``}, // []byte
 				{"limit", ``}, // []byte
@@ -1302,15 +1284,6 @@
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
-			Name: "SetPermissions",
-			Doc:  "// SetPermissions sets the permissions for all current and future rows with\n// the given prefix. If the prefix overlaps with an existing prefix, the\n// longest prefix that matches a row applies. For example:\n//     SetPermissions(ctx, Prefix(\"a/b\"), perms1)\n//     SetPermissions(ctx, Prefix(\"a/b/c\"), perms2)\n// The permissions for row \"a/b/1\" are perms1, and the permissions for row\n// \"a/b/c/1\" are perms2.\n//\n// SetPermissions will fail if called with a prefix that does not match any\n// rows.",
-			InArgs: []rpc.ArgDesc{
-				{"prefix", ``}, // string
-				{"perms", ``},  // access.Permissions
-			},
-			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
-		},
-		{
 			Name: "GetPermissions",
 			Doc:  "// GetPermissions returns an array of (prefix, perms) pairs. The array is\n// sorted from longest prefix to shortest, so element zero is the one that\n// applies to the row with the given key. The last element is always the\n// prefix \"\" which represents the table's permissions -- the array will always\n// have at least one element.",
 			InArgs: []rpc.ArgDesc{
@@ -1322,6 +1295,15 @@
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
 		},
 		{
+			Name: "SetPermissions",
+			Doc:  "// SetPermissions sets the permissions for all current and future rows with\n// the given prefix. If the prefix overlaps with an existing prefix, the\n// longest prefix that matches a row applies. For example:\n//     SetPermissions(ctx, Prefix(\"a/b\"), perms1)\n//     SetPermissions(ctx, Prefix(\"a/b/c\"), perms2)\n// The permissions for row \"a/b/1\" are perms1, and the permissions for row\n// \"a/b/c/1\" are perms2.",
+			InArgs: []rpc.ArgDesc{
+				{"prefix", ``}, // string
+				{"perms", ``},  // access.Permissions
+			},
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
+		},
+		{
 			Name: "DeletePermissions",
 			Doc:  "// DeletePermissions deletes the permissions for the specified prefix. Any\n// rows covered by this prefix will use the next longest prefix's permissions\n// (see the array returned by GetPermissions).",
 			InArgs: []rpc.ArgDesc{
diff --git a/v23/syncbase/nosql/client_test.go b/v23/syncbase/nosql/client_test.go
index 435ee50..38f66d2 100644
--- a/v23/syncbase/nosql/client_test.go
+++ b/v23/syncbase/nosql/client_test.go
@@ -19,6 +19,7 @@
 )
 
 // TODO(sadovsky): Finish writing tests.
+// TODO(rogulenko): Test perms checking for Glob and Exec.
 
 // Tests various Name, FullName, and Key methods.
 func TestNameAndKey(t *testing.T) {
@@ -177,14 +178,160 @@
 
 // Tests that Table.{Set,Get,Delete}Permissions methods work as expected.
 func TestTablePerms(t *testing.T) {
-	// TODO(sadovsky): Implement.
+	clientACtx, sName, cleanup, sp, ctx := tu.SetupOrDieCustom("clientA", "server", nil)
+	defer cleanup()
+	clientBCtx := tu.NewClient("clientB", "server", ctx, sp)
+	a := tu.CreateApp(t, clientACtx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, clientACtx, a, "d")
+	tb := tu.CreateTable(t, clientACtx, d, "tb")
+
+	// Permission objects.
+	aAndB := tu.DefaultPerms("server/clientA", "server/clientB")
+	aOnly := tu.DefaultPerms("server/clientA")
+	bOnly := tu.DefaultPerms("server/clientB")
+
+	// Set initial permissions.
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix(""), aAndB); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix("prefix"), aAndB); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix"), aAndB); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix("prefix_a"), aOnly); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix_b"), bOnly); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+
+	// Checks A has no access to 'prefix_b' and vice versa.
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix("prefix_b"), aOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix("prefix_b_suffix"), aOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix_a"), bOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix_a_suffix"), bOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+	}
+
+	// Check GetPermissions.
+	wantPerms := []nosql.PrefixPermissions{
+		nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+	}
+	if got, _ := tb.GetPermissions(clientACtx, ""); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "abc"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	wantPerms = []nosql.PrefixPermissions{
+		nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix"), Perms: aAndB},
+		nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix_c"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	wantPerms = []nosql.PrefixPermissions{
+		nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix_a"), Perms: aOnly},
+		nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix"), Perms: aAndB},
+		nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix_a"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix_a_suffix"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	wantPerms = []nosql.PrefixPermissions{
+		nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix_b"), Perms: bOnly},
+		nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix"), Perms: aAndB},
+		nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix_b"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix_b_suffix"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+
+	// Delete some prefix permissions and check again.
+	// Check that A can't delete permissions of B.
+	if err := tb.DeletePermissions(clientACtx, nosql.Prefix("prefix_b")); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.DeletePermissions() should have failed: %v", err)
+	}
+	if err := tb.DeletePermissions(clientBCtx, nosql.Prefix("prefix_a")); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.DeletePermissions() should have failed: %v", err)
+	}
+	// Delete 'prefix' and 'prefix_a'
+	if err := tb.DeletePermissions(clientACtx, nosql.Prefix("prefix")); err != nil {
+		t.Fatalf("tb.DeletePermissions() failed: %v", err)
+	}
+	if err := tb.DeletePermissions(clientACtx, nosql.Prefix("prefix_a")); err != nil {
+		t.Fatalf("tb.DeletePermissions() failed: %v", err)
+	}
+	// Check DeletePermissions is idempotent.
+	if err := tb.DeletePermissions(clientACtx, nosql.Prefix("prefix")); err != nil {
+		t.Fatalf("tb.DeletePermissions() failed: %v", err)
+	}
+
+	// Check GetPermissions again.
+	wantPerms = []nosql.PrefixPermissions{
+		nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+	}
+	if got, _ := tb.GetPermissions(clientACtx, ""); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix_a"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix_a_suffix"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	wantPerms = []nosql.PrefixPermissions{
+		nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix_b"), Perms: bOnly},
+		nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix_b"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+	if got, _ := tb.GetPermissions(clientACtx, "prefix_b_suffix"); !reflect.DeepEqual(got, wantPerms) {
+		t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+	}
+
+	// Remove B from table-level permissions and check B has no access.
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix(""), aOnly); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+	if _, err := tb.GetPermissions(clientBCtx, ""); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.GetPermissions() should have failed: %v", err)
+	}
+	if _, err := tb.GetPermissions(clientBCtx, "prefix_b"); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.GetPermissions() should have failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientBCtx, nosql.Prefix(""), bOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix_b"), bOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+	}
 }
 
 ////////////////////////////////////////
 // Tests involving rows
 
-// TODO(sadovsky): Test perms-checking in all test functions below.
-
 type Foo struct {
 	I int
 	S string
@@ -365,3 +512,68 @@
 		t.Fatalf("r.Get() should have failed: %v", err)
 	}
 }
+
+// Test permission checking in Row.{Get,Put,Delete} and
+// Table.{Scan, DeleteRowRange}.
+func TestRowPermissions(t *testing.T) {
+	clientACtx, sName, cleanup, sp, ctx := tu.SetupOrDieCustom("clientA", "server", nil)
+	defer cleanup()
+	clientBCtx := tu.NewClient("clientB", "server", ctx, sp)
+	a := tu.CreateApp(t, clientACtx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, clientACtx, a, "d")
+	tb := tu.CreateTable(t, clientACtx, d, "tb")
+
+	// Permission objects.
+	aAndB := tu.DefaultPerms("server/clientA", "server/clientB")
+	aOnly := tu.DefaultPerms("server/clientA")
+	bOnly := tu.DefaultPerms("server/clientB")
+
+	// Set initial permissions.
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix(""), aAndB); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix("a"), aOnly); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientBCtx, nosql.Prefix("b"), bOnly); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+
+	// Add some key-value pairs.
+	ra := tb.Row("afoo")
+	rb := tb.Row("bfoo")
+	if err := ra.Put(clientACtx, Foo{}); err != nil {
+		t.Fatalf("ra.Put() failed: %v", err)
+	}
+	if err := rb.Put(clientBCtx, Foo{}); err != nil {
+		t.Fatalf("rb.Put() failed: %v", err)
+	}
+
+	// Check A doesn't have access to 'b'.
+	if err := rb.Get(clientACtx, &Foo{}); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("rb.Get() should have failed: %v", err)
+	}
+	if err := rb.Put(clientACtx, Foo{}); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("rb.Put() should have failed: %v", err)
+	}
+	if err := rb.Delete(clientACtx); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("rb.Delete() should have failed: %v", err)
+	}
+	// Test Table.Delete and Scan.
+	if err := tb.Delete(clientACtx, nosql.Prefix("")); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("tb.Delete should have failed: %v", err)
+	}
+	s := tb.Scan(clientACtx, nosql.Prefix(""))
+	if !s.Advance() {
+		t.Fatalf("Stream should have advanced: %v", s.Err())
+	}
+	if s.Key() != "afoo" {
+		t.Fatalf("Unexpected key: got %q, want %q", s.Key(), "afoo")
+	}
+	if s.Advance() {
+		t.Fatalf("Stream advanced unexpectedly")
+	}
+	if err := s.Err(); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+		t.Fatalf("Unexpected stream error: %v", err)
+	}
+}
diff --git a/v23/syncbase/nosql/internal/query/demo/db/db.go b/v23/syncbase/nosql/internal/query/demo/db/db.go
index 71434a2..8a96f0b 100644
--- a/v23/syncbase/nosql/internal/query/demo/db/db.go
+++ b/v23/syncbase/nosql/internal/query/demo/db/db.go
@@ -7,7 +7,6 @@
 import (
 	"errors"
 	"fmt"
-	"strings"
 	"time"
 
 	"v.io/syncbase/v23/syncbase/nosql/query_db"
@@ -43,10 +42,10 @@
 }
 
 type keyValueStreamImpl struct {
-	table        table
-	cursor       int
-	prefixes     []string
-	prefixCursor int
+	table       table
+	cursor      int
+	keyRanges   query_db.KeyRanges
+	rangeCursor int
 }
 
 func (kvs *keyValueStreamImpl) Advance() bool {
@@ -55,16 +54,17 @@
 		if kvs.cursor >= len(kvs.table.rows) {
 			return false
 		}
-		for kvs.prefixCursor < len(kvs.prefixes) {
-			// does it match any prefix
-			if kvs.prefixes[kvs.prefixCursor] == "" || strings.HasPrefix(kvs.table.rows[kvs.cursor].key, kvs.prefixes[kvs.prefixCursor]) {
+		for kvs.rangeCursor < len(kvs.keyRanges) {
+			// does it match any keyRange (or is the keyRange the 0-255 wildcard)?
+			if (kvs.keyRanges[kvs.rangeCursor].Start == string([]byte{0}) && kvs.keyRanges[kvs.rangeCursor].Limit == string([]byte{255})) ||
+				(kvs.table.rows[kvs.cursor].key >= kvs.keyRanges[kvs.rangeCursor].Start && kvs.table.rows[kvs.cursor].key <= kvs.keyRanges[kvs.rangeCursor].Limit) {
 				return true
 			}
-			// Keys and prefixes are both sorted low to high, so we can increment
-			// prefixCursor if the prefix is < the key.
-			if kvs.prefixes[kvs.prefixCursor] < kvs.table.rows[kvs.cursor].key {
-				kvs.prefixCursor++
-				if kvs.prefixCursor >= len(kvs.prefixes) {
+			// Keys and keyRanges are both sorted low to high, so we can increment
+			// rangeCursor if the keyRange.Limit is < the key.
+			if kvs.keyRanges[kvs.rangeCursor].Limit < kvs.table.rows[kvs.cursor].key {
+				kvs.rangeCursor++
+				if kvs.rangeCursor >= len(kvs.keyRanges) {
 					return false
 				}
 			} else {
@@ -86,11 +86,11 @@
 func (kvs *keyValueStreamImpl) Cancel() {
 }
 
-func (t table) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t table) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
 	var keyValueStreamImpl keyValueStreamImpl
 	keyValueStreamImpl.table = t
 	keyValueStreamImpl.cursor = -1
-	keyValueStreamImpl.prefixes = prefixes
+	keyValueStreamImpl.keyRanges = keyRanges
 	return &keyValueStreamImpl, nil
 }
 
diff --git a/v23/syncbase/nosql/internal/query/demo/demo.go b/v23/syncbase/nosql/internal/query/demo/demo.go
index b19e0cc..e369dc4 100644
--- a/v23/syncbase/nosql/internal/query/demo/demo.go
+++ b/v23/syncbase/nosql/internal/query/demo/demo.go
@@ -24,7 +24,7 @@
 
 var (
 	format = flag.String("format", "table",
-		"Output format.  'table': human-readable table; 'csv': comma-separated values, use -csv-delimiter to control the delimiter.")
+		"Output format.  'table': human-readable table; 'csv': comma-separated values, use -csv-delimiter to control the delimiter; 'json': JSON arrays.")
 	csvDelimiter = flag.String("csv-delimiter", ",", "Delimiter to use when printing data as CSV (e.g. \"\t\", \",\")")
 )
 
@@ -63,10 +63,16 @@
 			if err := writer.WriteTable(os.Stdout, columnNames, rs); err != nil {
 				fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
 			}
-		} else {
+		} else if *format == "csv" {
 			if err := writer.WriteCSV(os.Stdout, columnNames, rs, *csvDelimiter); err != nil {
 				fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
 			}
+		} else if *format == "json" {
+			if err := writer.WriteJson(os.Stdout, columnNames, rs); err != nil {
+				fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
+			}
+		} else {
+			panic(fmt.Sprintf("invalid format flag value: %v", *format))
 		}
 	}
 }
@@ -75,8 +81,8 @@
 	shutdown := db.InitDB()
 	defer shutdown()
 
-	if *format != "table" && *format != "csv" {
-		fmt.Fprintf(os.Stderr, "Unsupported -format %q.  Must be one of 'table' or 'csv'.\n", *format)
+	if *format != "table" && *format != "csv" && *format != "json" {
+		fmt.Fprintf(os.Stderr, "Unsupported -format %q.  Must be one of 'table', 'csv', or 'json'.\n", *format)
 		os.Exit(-1)
 	}
 
@@ -103,10 +109,11 @@
 				break
 			}
 		} else {
-			if q == "" || strings.ToLower(q) == "exit" || strings.ToLower(q) == "quit" {
+			tq := strings.ToLower(strings.TrimSpace(q))
+			if tq == "" || tq == "exit" || tq == "quit" {
 				break
 			}
-			if strings.ToLower(q) == "dump" {
+			if tq == "dump" {
 				dumpDB(d)
 			} else {
 				queryExec(d, q)
diff --git a/v23/syncbase/nosql/internal/query/demo/writer/writer.go b/v23/syncbase/nosql/internal/query/demo/writer/writer.go
index 3e2d997..20d61fe 100644
--- a/v23/syncbase/nosql/internal/query/demo/writer/writer.go
+++ b/v23/syncbase/nosql/internal/query/demo/writer/writer.go
@@ -5,6 +5,7 @@
 package writer
 
 import (
+	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
@@ -138,6 +139,34 @@
 	return str
 }
 
+// WriteJson formats the result as a JSON array of arrays (rows) of values.
+func WriteJson(out io.Writer, columnNames []string, rs query.ResultStream) error {
+	io.WriteString(out, "[\n")
+	io.WriteString(out, "  [")
+	delim := ""
+	for _, cName := range columnNames {
+		str, err := json.Marshal(cName)
+		if err != nil {
+			panic(fmt.Sprintf("JSON marshalling failed for column name: %v", err))
+		}
+		io.WriteString(out, fmt.Sprintf("%s%s", delim, str))
+		delim = ", "
+	}
+	io.WriteString(out, "]\n")
+	for rs.Advance() {
+		io.WriteString(out, fmt.Sprintf(" ,["))
+		delim := ""
+		for _, column := range rs.Result() {
+			str := toJson(column)
+			io.WriteString(out, fmt.Sprintf("%s%s", delim, str))
+			delim = ", "
+		}
+		io.WriteString(out, "]\n")
+	}
+	io.WriteString(out, "]\n")
+	return rs.Err()
+}
+
 // Converts VDL value to readable yet parseable string representation.
 // If nested is not set, strings outside composites are left unquoted.
 // TODO(ivanpi): Handle cycles and improve non-tree DAG handling.
@@ -248,3 +277,129 @@
 	}
 	return begin + strings.Join(elems, sep) + end
 }
+
+// Converts VDL value to JSON representation.
+func toJson(val *vdl.Value) string {
+	jf := toJsonFriendly(val)
+	jOut, err := json.Marshal(jf)
+	if err != nil {
+		panic(fmt.Sprintf("JSON marshalling failed: %v", err))
+	}
+	return string(jOut)
+}
+
+// Converts VDL value to Go type compatible with json.Marshal().
+func toJsonFriendly(val *vdl.Value) interface{} {
+	switch val.Type() {
+	case vdl.TypeOf(vtime.Time{}), vdl.TypeOf(vtime.Duration{}):
+		s, err := toStringNative(val)
+		if err != nil {
+			panic(fmt.Sprintf("toStringNative failed for builtin time type: %v", err))
+		}
+		return s
+	default:
+		// fall through to Kind switch
+	}
+	switch val.Kind() {
+	case vdl.Bool:
+		return val.Bool()
+	case vdl.Byte:
+		return val.Byte()
+	case vdl.Uint16, vdl.Uint32, vdl.Uint64:
+		return val.Uint()
+	case vdl.Int16, vdl.Int32, vdl.Int64:
+		return val.Int()
+	case vdl.Float32, vdl.Float64:
+		return val.Float()
+	case vdl.Complex64, vdl.Complex128:
+		// Go doesn't support marshalling complex values, we need to stringify.
+		c := val.Complex()
+		return fmt.Sprintf("%v+%vi", real(c), imag(c))
+	case vdl.String:
+		return val.RawString()
+	case vdl.Enum:
+		return val.EnumLabel()
+	case vdl.Array, vdl.List:
+		arr := make([]interface{}, val.Len())
+		for i, _ := range arr {
+			arr[i] = toJsonFriendly(val.Index(i))
+		}
+		return arr
+	case vdl.Any, vdl.Optional:
+		if val.IsNil() {
+			return nil
+		}
+		return toJsonFriendly(val.Elem())
+	case vdl.Struct:
+		// TODO(ivanpi): Consider lowercasing field names.
+		return toOrderedMap(val.Type().NumField(), func(i int) (string, interface{}) {
+			return val.Type().Field(i).Name, toJsonFriendly(val.StructField(i))
+		})
+	case vdl.Union:
+		// TODO(ivanpi): Consider lowercasing field name.
+		ui, uv := val.UnionField()
+		return toOrderedMap(1, func(_ int) (string, interface{}) {
+			return val.Type().Field(ui).Name, toJsonFriendly(uv)
+		})
+	case vdl.Set:
+		// TODO(ivanpi): vdl.SortValuesAsString() used for predictable output ordering.
+		// Use a more sensible sort for numbers etc.
+		keys := vdl.SortValuesAsString(val.Keys())
+		return toOrderedMap(len(keys), func(i int) (string, interface{}) {
+			return toString(keys[i], false), true
+		})
+	case vdl.Map:
+		// TODO(ivanpi): vdl.SortValuesAsString() used for predictable output ordering.
+		// Use a more sensible sort for numbers etc.
+		keys := vdl.SortValuesAsString(val.Keys())
+		return toOrderedMap(len(keys), func(i int) (string, interface{}) {
+			return toString(keys[i], false), toJsonFriendly(val.MapIndex(keys[i]))
+		})
+	case vdl.TypeObject:
+		return val.String()
+	default:
+		panic(fmt.Sprintf("unknown Kind %s", val.Kind()))
+	}
+}
+
+// Serializes to JSON object, preserving key order.
+// Native Go map will serialize to JSON object with sorted keys, which is
+// unexpected behaviour for a struct.
+type orderedMap []orderedMapElem
+
+type orderedMapElem struct {
+	Key string
+	Val interface{}
+}
+
+var _ json.Marshaler = (*orderedMap)(nil)
+
+// Builds an orderedMap with n elements, obtaining the key and value of element
+// i using elemToKeyVal(i).
+func toOrderedMap(n int, elemToKeyVal func(i int) (string, interface{})) orderedMap {
+	om := make(orderedMap, n)
+	for i, _ := range om {
+		om[i].Key, om[i].Val = elemToKeyVal(i)
+	}
+	return om
+}
+
+// Serializes orderedMap to JSON object, preserving key order.
+func (om orderedMap) MarshalJSON() (_ []byte, rerr error) {
+	defer func() {
+		if r := recover(); r != nil {
+			rerr = fmt.Errorf("orderedMap: %v", r)
+		}
+	}()
+	return []byte(listToString("{", ",", "}", len(om), func(i int) string {
+		keyJson, err := json.Marshal(om[i].Key)
+		if err != nil {
+			panic(err)
+		}
+		valJson, err := json.Marshal(om[i].Val)
+		if err != nil {
+			panic(err)
+		}
+		return fmt.Sprintf("%s:%s", keyJson, valJson)
+	})), nil
+}
diff --git a/v23/syncbase/nosql/internal/query/demo/writer/writer_test.go b/v23/syncbase/nosql/internal/query/demo/writer/writer_test.go
index 3dd5b95..3c8c642 100644
--- a/v23/syncbase/nosql/internal/query/demo/writer/writer_test.go
+++ b/v23/syncbase/nosql/internal/query/demo/writer/writer_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"bytes"
+	"encoding/json"
 	"testing"
 	"time"
 
@@ -348,3 +349,136 @@
 		}
 	}
 }
+
+func TestWriteJson(t *testing.T) {
+	type testCase struct {
+		columns []string
+		rows    [][]interface{}
+		// To make the test cases easier to read, output should have a leading
+		// newline.
+		output string
+	}
+	tests := []testCase{
+		{ // Basic.
+			[]string{"c\n1", "c鶊2"},
+			[][]interface{}{
+				{5, "foo\nbar"},
+				{6, "bar\tfoo"},
+			},
+			`
+[
+  ["c\n1", "c鶊2"]
+ ,[5, "foo\nbar"]
+ ,[6, "bar\tfoo"]
+]
+`,
+		},
+		{ // Numbers.
+			[]string{"byte", "uint16", "uint32", "uint64", "int16", "int32", "int64",
+				"float32", "float64", "complex64", "complex128"},
+			[][]interface{}{
+				{
+					byte(12), uint16(1234), uint32(5678), uint64(999888777666), int16(9876), int32(876543), int64(128),
+					float32(3.14159), float64(2.71828182846), complex64(123.0 + 7.0i), complex128(456.789 + 10.1112i),
+				},
+				{
+					byte(9), uint16(99), uint32(999), uint64(9999999), int16(9), int32(99), int64(88),
+					float32(1.41421356237), float64(1.73205080757), complex64(9.87 + 7.65i), complex128(4.32 + 1.0i),
+				},
+			},
+			`
+[
+  ["byte", "uint16", "uint32", "uint64", "int16", "int32", "int64", "float32", "float64", "complex64", "complex128"]
+ ,[12, 1234, 5678, 999888777666, 9876, 876543, 128, 3.141590118408203, 2.71828182846, "123+7i", "456.789+10.1112i"]
+ ,[9, 99, 999, 9999999, 9, 99, 88, 1.4142135381698608, 1.73205080757, "9.869999885559082+7.650000095367432i", "4.32+1i"]
+]
+`,
+		},
+		{ // Empty result.
+			[]string{},
+			[][]interface{}{},
+			`
+[
+  []
+]
+`,
+		},
+		{ // Empty values.
+			[]string{"blank", "empty", "nil"},
+			[][]interface{}{
+				{struct{}{}, []string{}, nil},
+				{},
+			},
+			`
+[
+  ["blank", "empty", "nil"]
+ ,[{}, [], null]
+ ,[]
+]
+`,
+		},
+		{
+			[]string{"c1"},
+			[][]interface{}{
+				{db.Customer{"John Smith", 1, true, db.AddressInfo{"1 Main St.", "Palo Alto", "CA", "94303"}, db.CreditReport{Agency: db.CreditAgencyEquifax, Report: db.AgencyReportEquifaxReport{db.EquifaxCreditReport{'A'}}}}},
+				{db.Invoice{1, 1000, 42, db.AddressInfo{"1 Main St.", "Palo Alto", "CA", "94303"}}},
+			},
+			`
+[
+  ["c1"]
+ ,[{"Name":"John Smith","Id":1,"Active":true,"Address":{"Street":"1 Main St.","City":"Palo Alto","State":"CA","Zip":"94303"},"Credit":{"Agency":"Equifax","Report":{"EquifaxReport":{"Rating":65}}}}]
+ ,[{"CustId":1,"InvoiceNum":1000,"Amount":42,"ShipTo":{"Street":"1 Main St.","City":"Palo Alto","State":"CA","Zip":"94303"}}]
+]
+`,
+		},
+		{
+			[]string{"nil", "composite", "typeobj"},
+			[][]interface{}{
+				{
+					nil,
+					db.Composite{db.Array2String{"foo", "bar"}, []int32{1, 2}, map[int32]struct{}{1: struct{}{}, 2: struct{}{}}, map[string]int32{"foo": 1, "bar": 2}},
+					vdl.TypeOf(map[string]struct{}{}),
+				},
+			},
+			`
+[
+  ["nil", "composite", "typeobj"]
+ ,[null, {"Arr":["foo","bar"],"ListInt":[1,2],"MySet":{"1":true,"2":true},"Map":{"bar":2,"foo":1}}, "typeobject(set[string])"]
+]
+`,
+		},
+		{
+			[]string{"c1"},
+			[][]interface{}{
+				{
+					db.Recursive{nil, &db.Times{time.Unix(123456789, 42244224), time.Duration(1337)}, map[db.Array2String]db.Recursive{
+						db.Array2String{"a", "棎鶊鵱"}: db.Recursive{},
+						db.Array2String{"x", "y"}:   db.Recursive{vdl.ValueOf(db.CreditReport{Agency: db.CreditAgencyExperian, Report: db.AgencyReportExperianReport{db.ExperianCreditReport{db.ExperianRatingGood}}}), nil, nil},
+					}},
+				},
+			},
+			`
+[
+  ["c1"]
+ ,[{"Any":null,"Maybe":{"Stamp":"1973-11-29 21:33:09.042244224 +0000 UTC","Interval":"1.337µs"},"Rec":{"[\"a\", \"棎鶊鵱\"]":{"Any":null,"Maybe":null,"Rec":{}},"[\"x\", \"y\"]":{"Any":{"Agency":"Experian","Report":{"ExperianReport":{"Rating":"Good"}}},"Maybe":null,"Rec":{}}}}]
+]
+`,
+		},
+	}
+	for _, test := range tests {
+		var b bytes.Buffer
+		if err := writer.WriteJson(&b, test.columns, newResultStream(test.rows)); err != nil {
+			t.Errorf("Unexpected error: %v", err)
+			continue
+		}
+		var decoded interface{}
+		if err := json.Unmarshal(b.Bytes(), &decoded); err != nil {
+			t.Errorf("Got invalid JSON: %v", err)
+		}
+		// Add a leading newline to the output to match the leading newline
+		// in our test cases.
+		if got, want := "\n"+b.String(), test.output; got != want {
+			t.Errorf("Wrong output:\nGOT: %q\nWANT:%q", got, want)
+		}
+	}
+}
diff --git a/v23/syncbase/nosql/internal/query/query.go b/v23/syncbase/nosql/internal/query/query.go
index da87502..bcbb3cf 100644
--- a/v23/syncbase/nosql/internal/query/query.go
+++ b/v23/syncbase/nosql/internal/query/query.go
@@ -65,18 +65,18 @@
 	return projection
 }
 
-// Given a query (i.e.,, select statement), return the key prefixes needed to satisfy the query.
-// A return of a single empty string ([]string{ "" }) means fetch all keys.
-func CompileKeyPrefixes(w *query_parser.WhereClause) []string {
+// Given a query (i.e.,, select statement), return the key ranges needed to satisfy the query.
+// A return of a single element array of string([]byte{0}), string([]byte{255}) means fetch all.
+func CompileKeyRanges(w *query_parser.WhereClause) query_db.KeyRanges {
 	// First determine if every key needs to be fetched.  To do this, evaluate the
 	// where clause substituting false for every key expression and true for every
 	// other (type for value) expression.  If the where clause evaluates to true,
 	// it is possible for a row to be selected without any dependence on the contents
 	// of the key.  In that case, all keys must be fetched.
 	if w == nil || CheckIfAllKeysMustBeFetched(w.Expr) {
-		return []string{""}
+		return query_db.KeyRanges{query_db.KeyRange{string([]byte{0}), string([]byte{255})}}
 	} else {
-		return query_checker.CompileKeyPrefixes(w)
+		return query_checker.CompileKeyRanges(w)
 	}
 }
 
@@ -176,8 +176,8 @@
 }
 
 func execSelect(db query_db.Database, s *query_parser.SelectStatement) ([]string, ResultStream, error) {
-	prefixes := CompileKeyPrefixes(s.Where)
-	keyValueStream, err := s.From.Table.DBTable.Scan(prefixes)
+	keyRanges := CompileKeyRanges(s.Where)
+	keyValueStream, err := s.From.Table.DBTable.Scan(keyRanges)
 	if err != nil {
 		return nil, nil, syncql.NewErrScanError(db.GetContext(), s.Off, err)
 	}
diff --git a/v23/syncbase/nosql/internal/query/query_checker/query_checker.go b/v23/syncbase/nosql/internal/query/query_checker/query_checker.go
index ce807c0..81bfea3 100644
--- a/v23/syncbase/nosql/internal/query/query_checker/query_checker.go
+++ b/v23/syncbase/nosql/internal/query/query_checker/query_checker.go
@@ -107,10 +107,18 @@
 		e.Operand2.Prefix = prefix
 		// Compute the regular expression now to to check for errors.
 		// Save the regex (testing) and the compiled regex (for later use in evaluation).
-		regex, compRegex, err := computeRegex(db, e.Operand2.Off, e.Operand2.Str)
+		regex, compRegex, foundWildcard, err := computeRegex(db, e.Operand2.Off, e.Operand2.Str)
 		if err != nil {
 			return err
 		}
+		// Optimization: If like argument contains no wildcards, convert the expression to equals.
+		if !foundWildcard {
+			e.Operator.Type = query_parser.Equal
+			// Since this is no longer a like expression, we need to unescape
+			// any escaped chars (i.e., "\\", "\_" and "\%" become
+			// "\", "_" and "%", respectively).
+			e.Operand2.Str = unescapeLikeExpression(e.Operand2.Str)
+		}
 		e.Operand2.Regex = regex
 		e.Operand2.CompRegex = compRegex
 	}
@@ -227,68 +235,85 @@
 // Convert Like expression to a regex.  That is, convert:
 // % to .*?
 // _ to .
+// Unescape '\\', '\%' and '\_' to '\', '%' and '_', respectively.
 // Escape everything that would be incorrectly interpreted as a regex.
-// Note: \% and \_ are used to escape % and _, respectively.
-func computeRegex(db query_db.Database, off int64, s string) (string, *regexp.Regexp, error) {
-	// Escape everything, this will escape too much as like wildcards can
-	// also be escaped by a backslash (\%, \_, \\).
-	escaped := regexp.QuoteMeta(s)
-	// Change all unescaped '%' chars to ".*?" and all unescaped '_' chars to '.'.
-	var buf bytes.Buffer
-	buf.WriteString("^")
-	backslash_level := 0
-	for _, c := range escaped {
-		switch backslash_level {
-		case 0:
-			switch c {
-			case '%':
-				buf.WriteString(".*?")
-			case '_':
-				buf.WriteString(".")
-			case '\\':
-				backslash_level++
-			default:
-				buf.WriteString(string(c))
+//
+// The approach this function takes is to collect characters to be escaped
+// into toBeEscapedBuf.  When a wildcard is encountered, first toBeEscapedBuf
+// is escaped and written to the regex buffer, next the wildcard is translated
+// to regex (either ".*?" or ".") and written to the regex buffer.
+// At the end, any remaining chars in toBeEscapedBuf are written.
+//
+// Return values are:
+// 1. string: uncompiled regular expression
+// 2. *Regexp: compiled regular expression
+// 3. bool: true if wildcards were found (if false, like is converted to equal)
+// 4. error: non-nil if error encountered
+func computeRegex(db query_db.Database, off int64, s string) (string, *regexp.Regexp, bool, error) {
+	var buf bytes.Buffer            // buffer for return regex
+	var toBeEscapedBuf bytes.Buffer // buffer to hold characters waiting to be escaped
+
+	buf.WriteString("^") // '^<regex_str>$'
+	escapedMode := false
+	foundWildcard := false
+
+	for _, c := range s {
+		switch c {
+		case '%', '_':
+			if escapedMode {
+				toBeEscapedBuf.WriteString(string(c))
+			} else {
+				// Write out any chars waiting to be escaped, then
+				// write ".*?' or '.'.
+				buf.WriteString(regexp.QuoteMeta(toBeEscapedBuf.String()))
+				toBeEscapedBuf.Reset()
+				if c == '%' {
+					buf.WriteString(".*?")
+				} else {
+					buf.WriteString(".")
+				}
+				foundWildcard = true
 			}
-		case 1:
-			switch c {
-			case '\\':
-				// backslashes become double backslashes because of the
-				// QuoteMeta above.  Let's see what's next.
-				backslash_level++
-			default:
-				// In this case, QuoteMeta is escaping a regex character.  We
-				// need to honor the escape (e.g., \*, \[, \]).
-				buf.WriteString("\\")
-				buf.WriteString(string(c))
-				backslash_level = 0
+			escapedMode = false
+		case '\\':
+			if escapedMode {
+				toBeEscapedBuf.WriteString(string(c))
 			}
-		case 2:
-			switch c {
-			case '\\':
-				// We've hit a third backslash.
-				// Write out the first \ (escaped as \\).
-				// Set backslash_level to 1 since we've encountered
-				// another backslash and need to see what follows.
-				buf.WriteString("\\\\")
-				backslash_level = 1
-			default:
-				// The user wrote \% or \_.
-				// It was escaped by QuoteMeta to \\% or \\_.
-				// Since the % or _ was escaped, just write it so regex can
-				// treat it like any character.
-				buf.WriteString(string(c))
-				backslash_level = 0
-			}
+			escapedMode = !escapedMode
+		default:
+			toBeEscapedBuf.WriteString(string(c))
 		}
 	}
-	buf.WriteString("$")
+	// Write any remaining chars in toBeEscapedBuf.
+	buf.WriteString(regexp.QuoteMeta(toBeEscapedBuf.String()))
+	buf.WriteString("$") // '^<regex_str>$'
 	regex := buf.String()
 	compRegex, err := regexp.Compile(regex)
 	if err != nil {
-		return "", nil, syncql.NewErrErrorCompilingRegularExpression(db.GetContext(), off, regex, err)
+		return "", nil, false, syncql.NewErrErrorCompilingRegularExpression(db.GetContext(), off, regex, err)
 	}
-	return regex, compRegex, nil
+	return regex, compRegex, foundWildcard, nil
+}
+
+// Unescape '\\', '\%' and '\_' to '\', '%' and '_', respectively.
+func unescapeLikeExpression(s string) string {
+	var buf bytes.Buffer // buffer for returned unescaped string
+
+	escapedMode := false
+
+	for _, c := range s {
+		switch c {
+		case '\\':
+			if escapedMode {
+				buf.WriteString(string(c))
+			}
+			escapedMode = !escapedMode
+		default:
+			buf.WriteString(string(c))
+			escapedMode = false
+		}
+	}
+	return buf.String()
 }
 
 func IsLogicalOperator(o *query_parser.BinaryOperator) bool {
@@ -323,53 +348,97 @@
 	return o.Type == query_parser.TypExpr
 }
 
-// Compile a list of key prefixes to fetch with scan.  Prefixes are returned in sorted order
-// and do not overlap (e.g., prefixes of "ab" and "abc" would be combined as "ab").
-// A single empty string (array len of 1) is returned if all keys are to be fetched.
-// Used by query package.  In query_checker package so prefixes can be tested.
-func CompileKeyPrefixes(where *query_parser.WhereClause) []string {
-	if where == nil {
-		return []string{""}
-	} else {
-		// Collect all key string literal operands.
-		p := collectKeyPrefixes(where.Expr)
-		// Sort
-		sort.Strings(p)
-		// Elminate overlaps
-		var p2 []string
-		for i, s := range p {
-			// Skip over prefixes that are already covered by prior key prefixes.
-			if i != 0 && strings.HasPrefix(s, p2[len(p2)-1]) {
-				continue
-			}
-			p2 = append(p2, s)
+// Function copied from syncbase.
+func computeKeyRangeForPrefix(prefix string) query_db.KeyRange {
+	if prefix == "" {
+		return query_db.KeyRange{string([]byte{0}), string([]byte{255})}
+	}
+	limit := []byte(prefix)
+	for len(limit) > 0 {
+		if limit[len(limit)-1] == 255 {
+			limit = limit[:len(limit)-1] // chop off trailing \x00
+		} else {
+			limit[len(limit)-1] += 1 // add 1
+			break                    // no carry
 		}
-		return p2
+	}
+	return query_db.KeyRange{prefix, string(limit)}
+}
+
+// The limit for a single value range is simply a zero byte appended.
+// In this way, only the single 'start' value will be returned (or nothing if that single
+// value is not present).
+func computeKeyRangeForSingleValue(start string) query_db.KeyRange {
+	limit := []byte(start)
+	limit = append(limit, 0)
+	return query_db.KeyRange{start, string(limit)}
+}
+
+// Compute a list of key ranges to be used by query_db's Table.Scan implementation.
+func CompileKeyRanges(where *query_parser.WhereClause) query_db.KeyRanges {
+	if where == nil {
+		return query_db.KeyRanges{computeKeyRangeForPrefix("")}
+	}
+	var keyRanges query_db.KeyRanges
+	collectKeyRanges(where.Expr, &keyRanges)
+	return keyRanges
+}
+
+func collectKeyRanges(expr *query_parser.Expression, keyRanges *query_db.KeyRanges) {
+	if IsKey(expr.Operand1) {
+		if expr.Operator.Type == query_parser.Like {
+			addKeyRange(computeKeyRangeForPrefix(expr.Operand2.Prefix), keyRanges)
+		} else { // OpEqual
+			addKeyRange(computeKeyRangeForSingleValue(expr.Operand2.Str), keyRanges)
+		}
+	}
+	if IsExpr(expr.Operand1) {
+		collectKeyRanges(expr.Operand1.Expr, keyRanges)
+	}
+	if IsExpr(expr.Operand2) {
+		collectKeyRanges(expr.Operand2.Expr, keyRanges)
 	}
 }
 
-// Collect all operand2 string literals where operand1 of the expression is ident "k".
-func collectKeyPrefixes(expr *query_parser.Expression) []string {
-	var prefixes []string
-	if IsKey(expr.Operand1) {
-		if expr.Operator.Type == query_parser.Like {
-			prefixes = append(prefixes, expr.Operand2.Prefix)
-		} else { // OpEqual
-			prefixes = append(prefixes, expr.Operand2.Str)
-		}
-		return prefixes
-	}
-	if IsExpr(expr.Operand1) {
-		for _, p := range collectKeyPrefixes(expr.Operand1.Expr) {
-			prefixes = append(prefixes, p)
+func addKeyRange(keyRange query_db.KeyRange, keyRanges *query_db.KeyRanges) {
+	handled := false
+	// Is there an overlap with an existing range?
+	for i, r := range *keyRanges {
+		// In the following if,
+		// the first paren expr is true if the start of the range to be added is contained in r
+		// the second paren expr is true if the limit of the range to be added is contained in r
+		// the third paren expr is true if the range to be added entirely contains r
+		if (keyRange.Start >= r.Start && keyRange.Start <= r.Limit) ||
+			(keyRange.Limit >= r.Start && keyRange.Limit <= r.Limit) ||
+			(keyRange.Start <= r.Start && keyRange.Limit >= r.Limit) {
+			// keyRange overlaps with existing range at keyRanges[i]
+			// set newKeyRange to a range that ecompasses both
+			var newKeyRange query_db.KeyRange
+			if keyRange.Start < r.Start {
+				newKeyRange.Start = keyRange.Start
+			} else {
+				newKeyRange.Start = r.Start
+			}
+			if keyRange.Limit > r.Limit {
+				newKeyRange.Limit = keyRange.Limit
+			} else {
+				newKeyRange.Limit = r.Limit
+			}
+			// The new range may overlap with other ranges in keyRanges
+			// delete the current range and call addKeyRange again
+			// This recursion will continue until no ranges overlap.
+			*keyRanges = append((*keyRanges)[:i], (*keyRanges)[i+1:]...)
+			addKeyRange(newKeyRange, keyRanges)
+			handled = true // we don't want to add keyRange below
+			break
 		}
 	}
-	if IsExpr(expr.Operand2) {
-		for _, p := range collectKeyPrefixes(expr.Operand2.Expr) {
-			prefixes = append(prefixes, p)
-		}
+	// no overlap, just add it
+	if !handled {
+		*keyRanges = append(*keyRanges, keyRange)
 	}
-	return prefixes
+	// sort before returning
+	sort.Sort(*keyRanges)
 }
 
 // Check limit clause.  Limit must be >= 1.
diff --git a/v23/syncbase/nosql/internal/query/query_checker/query_checker_test.go b/v23/syncbase/nosql/internal/query/query_checker/query_checker_test.go
index 16dd2d7..2ea1196 100644
--- a/v23/syncbase/nosql/internal/query/query_checker/query_checker_test.go
+++ b/v23/syncbase/nosql/internal/query/query_checker/query_checker_test.go
@@ -41,11 +41,11 @@
 	defer shutdown()
 }
 
-func (t invoiceTable) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t invoiceTable) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
 	return nil, errors.New("unimplemented")
 }
 
-func (t customerTable) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t customerTable) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
 	return nil, errors.New("unimplemented")
 }
 
@@ -66,9 +66,9 @@
 	query string
 }
 
-type keyPrefixesTest struct {
-	query       string
-	keyPrefixes []string
+type keyRangesTest struct {
+	query     string
+	keyRanges query_db.KeyRanges
 }
 
 type regularExpressionsTest struct {
@@ -121,35 +121,69 @@
 	}
 }
 
-func TestKeyPrefixes(t *testing.T) {
-	basic := []keyPrefixesTest{
+func appendZeroByte(start string) string {
+	limit := []byte(start)
+	limit = append(limit, 0)
+	return string(limit)
+
+}
+
+func TestKeyRanges(t *testing.T) {
+	basic := []keyRangesTest{
 		{
 			"select k, v from Customer",
-			[]string{""},
+			query_db.KeyRanges{
+				query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+			},
 		},
 		{
 			"select k, v from Customer where t = \"Foo.Bar\" and k like \"abc%\" limit 100 offset 200",
-			[]string{"abc"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"abc", "abd"},
+			},
 		},
 		{
 			"select  k,  v from \n  Customer where k like \"002%\" or k like \"001%\" or k like \"%\"",
-			[]string{""},
+			query_db.KeyRanges{
+				query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+			},
 		},
 		{
 			"select k, v from Customer where k = \"Foo.Bar\" and k like \"abc%\" limit 100 offset 200",
-			[]string{"Foo.Bar", "abc"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"Foo.Bar", appendZeroByte("Foo.Bar")},
+				query_db.KeyRange{"abc", "abd"},
+			},
 		},
 		{
+			// Note: 'like "Foo"' is optimized to '= "Foo"
 			"select k, v from Customer where k = \"Foo.Bar\" or k like \"Foo\" or k like \"abc%\" limit 100 offset 200",
-			[]string{"Foo", "abc"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"Foo", appendZeroByte("Foo")},
+				query_db.KeyRange{"Foo.Bar", appendZeroByte("Foo.Bar")},
+				query_db.KeyRange{"abc", "abd"},
+			},
 		},
 		{
 			"select k, v from Customer where k like \"Foo\\%Bar\" or k like \"abc%\" limit 100 offset 200",
-			[]string{"Foo%Bar", "abc"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"Foo%Bar", appendZeroByte("Foo%Bar")},
+				query_db.KeyRange{"abc", "abd"},
+			},
 		},
 		{
 			"select k, v from Customer where k like \"Foo\\\\%Bar\" or k like \"abc%\" limit 100 offset 200",
-			[]string{"Foo\\", "abc"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"Foo\\", "Foo]"},
+				query_db.KeyRange{"abc", "abd"},
+			},
+		},
+		{
+			"select k, v from Customer where k like \"Foo\\\\\\%Bar\" or k like \"abc%\" limit 100 offset 200",
+			query_db.KeyRanges{
+				query_db.KeyRange{"Foo\\%Bar", appendZeroByte("Foo\\%Bar")},
+				query_db.KeyRange{"abc", "abd"},
+			},
 		},
 	}
 
@@ -164,9 +198,9 @@
 		}
 		switch sel := (*s).(type) {
 		case query_parser.SelectStatement:
-			prefixes := query_checker.CompileKeyPrefixes(sel.Where)
-			if !reflect.DeepEqual(test.keyPrefixes, prefixes) {
-				t.Errorf("query: %s;\nGOT  %v\nWANT %v", test.query, prefixes, test.keyPrefixes)
+			keyRanges := query_checker.CompileKeyRanges(sel.Where)
+			if !reflect.DeepEqual(test.keyRanges, keyRanges) {
+				t.Errorf("query: %s;\nGOT  %v\nWANT %v", test.query, keyRanges, test.keyRanges)
 			}
 		default:
 			t.Errorf("query: %s;\nGOT  %v\nWANT query_parser.SelectStatement", test.query, *s)
diff --git a/v23/syncbase/nosql/internal/query/test/query_test.go b/v23/syncbase/nosql/internal/query/test/query_test.go
index 86e395e..00498a6 100644
--- a/v23/syncbase/nosql/internal/query/test/query_test.go
+++ b/v23/syncbase/nosql/internal/query/test/query_test.go
@@ -8,7 +8,6 @@
 	"errors"
 	"fmt"
 	"reflect"
-	"strings"
 	"testing"
 	"time"
 
@@ -36,10 +35,10 @@
 }
 
 type keyValueStreamImpl struct {
-	table        table
-	cursor       int
-	prefixes     []string
-	prefixCursor int
+	table           table
+	cursor          int
+	keyRanges       query_db.KeyRanges
+	keyRangesCursor int
 }
 
 func (kvs *keyValueStreamImpl) Advance() bool {
@@ -48,16 +47,16 @@
 		if kvs.cursor >= len(kvs.table.rows) {
 			return false
 		}
-		for kvs.prefixCursor < len(kvs.prefixes) {
-			// does it match any prefix
-			if kvs.prefixes[kvs.prefixCursor] == "" || strings.HasPrefix(kvs.table.rows[kvs.cursor].key, kvs.prefixes[kvs.prefixCursor]) {
+		for kvs.keyRangesCursor < len(kvs.keyRanges) {
+			// does it match any keyRange (or is the keyRange the 0-255 wildcard)?
+			if (kvs.keyRanges[kvs.keyRangesCursor].Start == string([]byte{0}) && kvs.keyRanges[kvs.keyRangesCursor].Limit == string([]byte{255})) || (kvs.table.rows[kvs.cursor].key >= kvs.keyRanges[kvs.keyRangesCursor].Start && kvs.table.rows[kvs.cursor].key <= kvs.keyRanges[kvs.keyRangesCursor].Limit) {
 				return true
 			}
-			// Keys and prefixes are both sorted low to high, so we can increment
-			// prefixCursor if the prefix is < the key.
-			if kvs.prefixes[kvs.prefixCursor] < kvs.table.rows[kvs.cursor].key {
-				kvs.prefixCursor++
-				if kvs.prefixCursor >= len(kvs.prefixes) {
+			// Keys and keyRanges are both sorted low to high, so we can increment
+			// keyRangesCursor if the keyRange.Limit is < the key.
+			if kvs.keyRanges[kvs.keyRangesCursor].Limit < kvs.table.rows[kvs.cursor].key {
+				kvs.keyRangesCursor++
+				if kvs.keyRangesCursor >= len(kvs.keyRanges) {
 					return false
 				}
 			} else {
@@ -79,11 +78,11 @@
 func (kvs *keyValueStreamImpl) Cancel() {
 }
 
-func (t table) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t table) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
 	var keyValueStreamImpl keyValueStreamImpl
 	keyValueStreamImpl.table = t
 	keyValueStreamImpl.cursor = -1
-	keyValueStreamImpl.prefixes = prefixes
+	keyValueStreamImpl.keyRanges = keyRanges
 	return &keyValueStreamImpl, nil
 }
 
@@ -221,10 +220,10 @@
 	db.tables = append(db.tables, fooTable)
 }
 
-type keyPrefixesTest struct {
-	query       string
-	keyPrefixes []string
-	err         error
+type keyRangesTest struct {
+	query     string
+	keyRanges query_db.KeyRanges
+	err       error
 }
 
 type evalWhereUsingOnlyKeyTest struct {
@@ -983,120 +982,206 @@
 	}
 }
 
-func TestKeyPrefixes(t *testing.T) {
-	basic := []keyPrefixesTest{
+func appendZeroByte(start string) string {
+	limit := []byte(start)
+	limit = append(limit, 0)
+	return string(limit)
+
+}
+
+func plusOne(start string) string {
+	limit := []byte(start)
+	for len(limit) > 0 {
+		if limit[len(limit)-1] == 255 {
+			limit = limit[:len(limit)-1] // chop off trailing \x00
+		} else {
+			limit[len(limit)-1] += 1 // add 1
+			break                    // no carry
+		}
+	}
+	return string(limit)
+}
+
+func TestKeyRanges(t *testing.T) {
+	basic := []keyRangesTest{
 		{
-			// Need all keys (single prefix of "").
+			// Need all keys
 			"select k, v from Customer",
-			[]string{""},
+			query_db.KeyRanges{
+				query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+			},
 			nil,
 		},
 		{
-			// Need all keys (single prefix of "").
-			"   sElEcT  k,  v from \n  Customer WhErE k lIkE \"002%\" oR k LiKe \"001%\" or k lIkE \"%\"",
-			[]string{""},
+			// Keys 001 and 003
+			"   select  k,  v from Customer where k = \"001\" or k = \"003\"",
+			query_db.KeyRanges{
+				query_db.KeyRange{"001", appendZeroByte("001")},
+				query_db.KeyRange{"003", appendZeroByte("003")},
+			},
+			nil,
+		},
+		{
+			// Need all keys
+			"select  k,  v from Customer where k like \"%\" or k like \"001%\" or k like \"002%\"",
+			query_db.KeyRanges{
+				query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+			},
+			nil,
+		},
+		{
+			// Need all keys, likes in where clause in different order
+			"select  k,  v from Customer where k like \"002%\" or k like \"001%\" or k like \"%\"",
+			query_db.KeyRanges{
+				query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+			},
 			nil,
 		},
 		{
 			// All selected rows will have key prefix of "abc".
 			"select k, v from Customer where t = \"Foo.Bar\" and k like \"abc%\"",
-			[]string{"abc"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"abc", plusOne("abc")},
+			},
 			nil,
 		},
 		{
-			// Need all keys (single prefix of "").
+			// Need all keys
 			"select k, v from Customer where t = \"Foo.Bar\" or k like \"abc%\"",
-			[]string{""},
+			query_db.KeyRanges{
+				query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+			},
 			nil,
 		},
 		{
-			// Need all keys (single prefix of "").
+			// Need all keys
 			"select k, v from Customer where k like \"abc%\" or v.zip = \"94303\"",
-			[]string{""},
+			query_db.KeyRanges{
+				query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+			},
 			nil,
 		},
 		{
 			// All selected rows will have key prefix of "foo".
 			"select k, v from Customer where t = \"Foo.Bar\" and k like \"foo_bar\"",
-			[]string{"foo"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"foo", plusOne("foo")},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "baz" or "foo".
+			// All selected rows will have key == "baz" or prefix of "foo".
 			"select k, v from Customer where k like \"foo_bar\" or k = \"baz\"",
-			[]string{"baz", "foo"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"baz", appendZeroByte("baz")},
+				query_db.KeyRange{"foo", plusOne("foo")},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "fo".
+			// All selected rows will have key == "fo" or prefix of "foo".
 			"select k, v from Customer where k like \"foo_bar\" or k = \"fo\"",
-			[]string{"fo"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"fo", appendZeroByte("fo")},
+				query_db.KeyRange{"foo", plusOne("foo")},
+			},
+			nil,
+		},
+		{
+			// All selected rows will have prefix of "fo".
+			// k == foo is a subset of above prefix
+			"select k, v from Customer where k like \"fo_bar\" or k = \"foo\"",
+			query_db.KeyRanges{
+				query_db.KeyRange{"fo", plusOne("fo")},
+			},
 			nil,
 		},
 		{
 			// All selected rows will have key prefix of "foo".
 			"select k, v from Customer where k like \"foo%bar\"",
-			[]string{"foo"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"foo", plusOne("foo")},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "foo\bar".
+			// Select "foo\bar" row.
 			"select k, v from Customer where k like \"foo\\\\bar\"",
-			[]string{"foo\\bar"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"foo\\bar", appendZeroByte("foo\\bar")},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "foo%bar".
+			// Select "foo%bar" row.
 			"select k, v from Customer where k like \"foo\\%bar\"",
-			[]string{"foo%bar"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"foo%bar", appendZeroByte("foo%bar")},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "foo\%bar".
+			// Select "foo\%bar" row.
 			"select k, v from Customer where k like \"foo\\\\\\%bar\"",
-			[]string{"foo\\%bar"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"foo\\%bar", appendZeroByte("foo\\%bar")},
+			},
 			nil,
 		},
 		{
-			// Need all keys (single prefix of "").
+			// Need all keys
 			"select k, v from Customer where k like \"%foo\"",
-			[]string{""},
+			query_db.KeyRanges{
+				query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+			},
 			nil,
 		},
 		{
-			// Need all keys (single prefix of "").
+			// Need all keys
 			"select k, v from Customer where k like \"_foo\"",
-			[]string{""},
+			query_db.KeyRanges{
+				query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "foo_bar".
+			// Select "foo_bar" row.
 			"select k, v from Customer where k like \"foo\\_bar\"",
-			[]string{"foo_bar"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"foo_bar", appendZeroByte("foo_bar")},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "foobar%".
+			// Select "foobar%" row.
 			"select k, v from Customer where k like \"foobar\\%\"",
-			[]string{"foobar%"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"foobar%", appendZeroByte("foobar%")},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "foobar_".
+			// Select "foobar_" row.
 			"select k, v from Customer where k like \"foobar\\_\"",
-			[]string{"foobar_"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"foobar_", appendZeroByte("foobar_")},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "\%_".
+			// Select "\%_" row.
 			"select k, v from Customer where k like \"\\\\\\%\\_\"",
-			[]string{"\\%_"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"\\%_", appendZeroByte("\\%_")},
+			},
 			nil,
 		},
 		{
-			// All selected rows will have key prefix of "%_abc\".
+			// Select "%_abc\" row.
 			"select k, v from Customer where k = \"%_abc\\\"",
-			[]string{"%_abc\\"},
+			query_db.KeyRanges{
+				query_db.KeyRange{"%_abc\\", appendZeroByte("%_abc\\")},
+			},
 			nil,
 		},
 	}
@@ -1114,9 +1199,9 @@
 			if semErr == nil {
 				switch sel := (*s).(type) {
 				case query_parser.SelectStatement:
-					keyPrefixes := query.CompileKeyPrefixes(sel.Where)
-					if !reflect.DeepEqual(test.keyPrefixes, keyPrefixes) {
-						t.Errorf("query: %s;\nGOT  %v\nWANT %v", test.query, keyPrefixes, test.keyPrefixes)
+					keyRanges := query.CompileKeyRanges(sel.Where)
+					if !reflect.DeepEqual(test.keyRanges, keyRanges) {
+						t.Errorf("query: %s;\nGOT  %v\nWANT %v", test.query, keyRanges, test.keyRanges)
 					}
 				default:
 					t.Errorf("query: %s; got %v, want query_parser.SelectStatement", test.query, reflect.TypeOf(*s))
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index cf3124e..bd0903b 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -151,11 +151,8 @@
 	Put(ctx *context.T, key string, value interface{}) error
 
 	// Delete deletes all rows in the given half-open range [start, limit). If
-	// limit is "", all rows with keys >= start are included. If the last row that
-	// is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
-	// pair is removed.
+	// limit is "", all rows with keys >= start are included.
 	// See helpers nosql.Prefix(), nosql.Range(), nosql.SingleRow().
-	// TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
 	Delete(ctx *context.T, r RowRange) error
 
 	// Scan returns all rows in the given half-open range [start, limit). If limit
@@ -164,6 +161,13 @@
 	// See helpers nosql.Prefix(), nosql.Range(), nosql.SingleRow().
 	Scan(ctx *context.T, r RowRange) Stream
 
+	// GetPermissions returns an array of (prefix, perms) pairs. The array is
+	// sorted from longest prefix to shortest, so element zero is the one that
+	// applies to the row with the given key. The last element is always the
+	// prefix "" which represents the table's permissions -- the array will always
+	// have at least one element.
+	GetPermissions(ctx *context.T, key string) ([]PrefixPermissions, error)
+
 	// SetPermissions sets the permissions for all current and future rows with
 	// the given prefix. If the prefix overlaps with an existing prefix, the
 	// longest prefix that matches a row applies. For example:
@@ -171,18 +175,8 @@
 	//     SetPermissions(ctx, Prefix("a/b/c"), perms2)
 	// The permissions for row "a/b/1" are perms1, and the permissions for row
 	// "a/b/c/1" are perms2.
-	//
-	// SetPermissions will fail if called with a prefix that does not match any
-	// rows.
 	SetPermissions(ctx *context.T, prefix PrefixRange, perms access.Permissions) error
 
-	// GetPermissions returns an array of (prefix, perms) pairs. The array is
-	// sorted from longest prefix to shortest, so element zero is the one that
-	// applies to the row with the given key. The last element is always the
-	// prefix "" which represents the table's permissions -- the array will always
-	// have at least one element.
-	GetPermissions(ctx *context.T, key string) ([]PrefixPermissions, error)
-
 	// DeletePermissions deletes the permissions for the specified prefix. Any
 	// rows covered by this prefix will use the next longest prefix's permissions
 	// (see the array returned by GetPermissions).
diff --git a/v23/syncbase/nosql/query_db/query_db.go b/v23/syncbase/nosql/query_db/query_db.go
index d6634dc..c1eba0c 100644
--- a/v23/syncbase/nosql/query_db/query_db.go
+++ b/v23/syncbase/nosql/query_db/query_db.go
@@ -19,7 +19,7 @@
 	// of the prefixes arguments.
 	// Note: an empty string prefix (""), matches all keys.
 	// The prefixes argument will be sorted (low to high).
-	Scan(prefixes []string) (KeyValueStream, error)
+	Scan(keyRanges KeyRanges) (KeyValueStream, error)
 }
 
 type KeyValueStream interface {
@@ -50,3 +50,28 @@
 	// return  false.  Cancel does not block.
 	Cancel()
 }
+
+type KeyRange struct {
+	Start string
+	Limit string
+}
+
+type KeyRanges []KeyRange
+
+// Implement sort interface for KeyRanges.
+func (keyRanges KeyRanges) Len() int {
+	return len(keyRanges)
+}
+
+func (keyRanges KeyRanges) Less(i, j int) bool {
+	return keyRanges[i].Start < keyRanges[j].Start
+}
+
+func (keyRanges KeyRanges) Swap(i, j int) {
+	saveStart := keyRanges[i].Start
+	saveLimit := keyRanges[i].Limit
+	keyRanges[i].Start = keyRanges[j].Start
+	keyRanges[i].Limit = keyRanges[j].Limit
+	keyRanges[j].Start = saveStart
+	keyRanges[j].Limit = saveLimit
+}
diff --git a/v23/syncbase/nosql/table.go b/v23/syncbase/nosql/table.go
index c77905c..67aa074 100644
--- a/v23/syncbase/nosql/table.go
+++ b/v23/syncbase/nosql/table.go
@@ -70,11 +70,6 @@
 	return newStream(cancel, call)
 }
 
-// SetPermissions implements Table.SetPermissions.
-func (t *table) SetPermissions(ctx *context.T, prefix PrefixRange, perms access.Permissions) error {
-	return t.c.SetPermissions(ctx, prefix.Prefix(), perms)
-}
-
 // GetPermissions implements Table.GetPermissions.
 func (t *table) GetPermissions(ctx *context.T, key string) ([]PrefixPermissions, error) {
 	wirePermsList, err := t.c.GetPermissions(ctx, key)
@@ -85,6 +80,11 @@
 	return permsList, err
 }
 
+// SetPermissions implements Table.SetPermissions.
+func (t *table) SetPermissions(ctx *context.T, prefix PrefixRange, perms access.Permissions) error {
+	return t.c.SetPermissions(ctx, prefix.Prefix(), perms)
+}
+
 // DeletePermissions implements Table.DeletePermissions.
 func (t *table) DeletePermissions(ctx *context.T, prefix PrefixRange) error {
 	return t.c.DeletePermissions(ctx, prefix.Prefix())
diff --git a/v23/syncbase/testutil/layer.go b/v23/syncbase/testutil/layer.go
index 14e8497..ccfe62d 100644
--- a/v23/syncbase/testutil/layer.go
+++ b/v23/syncbase/testutil/layer.go
@@ -34,7 +34,7 @@
 	if err := self.Create(ctx, nil); err != nil {
 		t.Fatalf("self.Create() failed: %v", err)
 	}
-	if gotPerms, wantPerms := getPermsOrDie(t, ctx, self), defaultPerms(); !reflect.DeepEqual(gotPerms, wantPerms) {
+	if gotPerms, wantPerms := getPermsOrDie(t, ctx, self), DefaultPerms("server/client"); !reflect.DeepEqual(gotPerms, wantPerms) {
 		t.Errorf("Perms do not match: got %v, want %v", gotPerms, wantPerms)
 	}
 
@@ -60,7 +60,7 @@
 	}
 
 	// Test that create fails if the parent perms disallow access.
-	perms = defaultPerms()
+	perms = DefaultPerms("server/client")
 	perms.Blacklist("server/client", string(access.Write))
 	if err := parent.SetPermissions(ctx, perms, ""); err != nil {
 		t.Fatalf("parent.SetPermissions() failed: %v", err)
@@ -107,7 +107,7 @@
 	if err := self2.Create(ctx, nil); err != nil {
 		t.Fatalf("self2.Create() failed: %v", err)
 	}
-	perms := defaultPerms()
+	perms := DefaultPerms("server/client")
 	perms.Blacklist("server/client", string(access.Write))
 	if err := self2.SetPermissions(ctx, perms, ""); err != nil {
 		t.Fatalf("self2.SetPermissions() failed: %v", err)
@@ -117,7 +117,7 @@
 	}
 
 	// Test that delete succeeds even if the parent perms disallow access.
-	perms = defaultPerms()
+	perms = DefaultPerms("server/client")
 	perms.Blacklist("server/client", string(access.Write))
 	if err := parent.SetPermissions(ctx, perms, ""); err != nil {
 		t.Fatalf("parent.SetPermissions() failed: %v", err)
diff --git a/v23/syncbase/testutil/util.go b/v23/syncbase/testutil/util.go
index 0783e54..c5df00f 100644
--- a/v23/syncbase/testutil/util.go
+++ b/v23/syncbase/testutil/util.go
@@ -6,6 +6,7 @@
 package testutil
 
 import (
+	"fmt"
 	"io/ioutil"
 	"os"
 	"reflect"
@@ -18,13 +19,13 @@
 	"v.io/syncbase/x/ref/services/syncbase/server"
 	"v.io/v23"
 	"v.io/v23/context"
-	"v.io/v23/naming"
-	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/security/access"
 	"v.io/v23/vdl"
 	"v.io/v23/verror"
 	"v.io/x/lib/vlog"
+	"v.io/x/ref/lib/flags"
+	"v.io/x/ref/lib/xrpc"
 	tsecurity "v.io/x/ref/test/testutil"
 )
 
@@ -67,6 +68,12 @@
 }
 
 func SetupOrDieCustom(client, server string, perms access.Permissions) (clientCtx *context.T, serverName string, cleanup func(), sp security.Principal, ctx *context.T) {
+	// TODO(mattr): Instead of SetDefaultHostPort the arguably more correct thing
+	// would be to call v.io/x/ref/test.Init() from the test packages that import
+	// the profile.  Note you should only call that from the package that imports
+	// the profile, not from libraries like this.  Also, it would be better if
+	// v23.Init was test.V23Init().
+	flags.SetDefaultHostPort("127.0.0.1:0")
 	ctx, shutdown := v23.Init()
 	sp = tsecurity.NewPrincipal(server)
 
@@ -77,7 +84,7 @@
 		vlog.Fatal("v23.WithPrincipal() failed: ", err)
 	}
 
-	serverName, stopServer := newServer(serverCtx, perms)
+	serverName, stopServer := newServer(client, server, serverCtx, perms)
 	cleanup = func() {
 		stopServer()
 		shutdown()
@@ -85,10 +92,12 @@
 	return
 }
 
-func defaultPerms() access.Permissions {
+func DefaultPerms(patterns ...string) access.Permissions {
 	perms := access.Permissions{}
 	for _, tag := range access.AllTypicalTags() {
-		perms.Add(security.BlessingPattern("server/client"), string(tag))
+		for _, pattern := range patterns {
+			perms.Add(security.BlessingPattern(pattern), string(tag))
+		}
 	}
 	return perms
 }
@@ -175,18 +184,9 @@
 	return perms
 }
 
-func newServer(ctx *context.T, perms access.Permissions) (string, func()) {
-	s, err := v23.NewServer(ctx)
-	if err != nil {
-		vlog.Fatal("v23.NewServer() failed: ", err)
-	}
-	eps, err := s.Listen(rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}}})
-	if err != nil {
-		vlog.Fatal("s.Listen() failed: ", err)
-	}
-
+func newServer(clientName, serverName string, ctx *context.T, perms access.Permissions) (string, func()) {
 	if perms == nil {
-		perms = defaultPerms()
+		perms = DefaultPerms(fmt.Sprintf("%s/%s", serverName, clientName))
 	}
 	rootDir, err := ioutil.TempDir("", "syncbase")
 	if err != nil {
@@ -200,13 +200,11 @@
 	if err != nil {
 		vlog.Fatal("server.NewService() failed: ", err)
 	}
-	d := server.NewDispatcher(service)
-
-	if err := s.ServeDispatcher("", d); err != nil {
-		vlog.Fatal("s.ServeDispatcher() failed: ", err)
+	s, err := xrpc.NewDispatchingServer(ctx, "", server.NewDispatcher(service))
+	if err != nil {
+		vlog.Fatal("xrpc.NewDispatchingServer() failed: ", err)
 	}
-
-	name := naming.JoinAddressName(eps[0].String(), "")
+	name := s.Status().Endpoints[0].Name()
 	return name, func() {
 		s.Stop()
 		os.RemoveAll(rootDir)
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
index 8e173d1..43cc542 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -41,6 +41,7 @@
 import "sync"
 import "time"
 
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
 import "v.io/v23/context"
 import "v.io/v23/verror"
 
@@ -247,22 +248,13 @@
 
 // -----------------------------------------------------------
 
-// A BlockOrFile represents a vector of bytes, and contains either a data block
-// (as a []byte), or a (file name, size, offset) triple.
-type BlockOrFile struct {
-	Block    []byte // If FileName is empty, the bytes represented.
-	FileName string // If non-empty, the name of the file containing the bytes.
-	Size     int64  // If FileName is non-empty, the number of bytes (or -1 for "all")
-	Offset   int64  // If FileName is non-empty, the offset of the relevant bytes within the file.
-}
-
 // addFragment() ensures that the store *fscabs contains a fragment comprising
 // the catenation of the byte vectors named by item[..].block and the contents
 // of the files named by item[..].filename.  The block field is ignored if
 // fileName!="".  The fragment is not physically added if already present.
 // The fragment is added to the fragment list of the descriptor *desc.
 func (fscabs *FsCaBlobStore) addFragment(ctx *context.T, extHasher hash.Hash,
-	desc *blobDesc, item ...BlockOrFile) (fileName string, size int64, err error) {
+	desc *blobDesc, item ...localblobstore.BlockOrFile) (fileName string, size int64, err error) {
 
 	hasher := md5.New()
 	var buf []byte
@@ -543,12 +535,12 @@
 // should not be used concurrently by multiple threads.  The returned handle
 // should be closed with either the Close() or CloseWithoutFinalize() method to
 // avoid leaking file handles.
-func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (bw *BlobWriter, err error) {
+func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (localblobstore.BlobWriter, error) {
+	var bw *BlobWriter
 	newName := newBlobName()
-	var f *file
 	fileName := filepath.Join(fscabs.rootName, newName)
 	os.MkdirAll(filepath.Dir(fileName), dirPermissions)
-	f, err = newFile(os.Create(fileName))
+	f, err := newFile(os.Create(fileName))
 	if err == nil {
 		bw = new(BlobWriter)
 		bw.fscabs = fscabs
@@ -568,8 +560,9 @@
 
 // ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on an
 // old, but unfinalized blob name.
-func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (bw *BlobWriter, err error) {
-	bw = new(BlobWriter)
+func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (localblobstore.BlobWriter, error) {
+	var err error
+	bw := new(BlobWriter)
 	bw.fscabs = fscabs
 	bw.ctx = ctx
 	bw.desc, err = bw.fscabs.getBlob(ctx, blobName)
@@ -639,7 +632,7 @@
 // AppendFragment() appends a fragment to the blob being written by *bw, where
 // the fragment is composed of the byte vectors described by the elements of
 // item[].  The fragment is copied into the blob store.
-func (bw *BlobWriter) AppendFragment(item ...BlockOrFile) (err error) {
+func (bw *BlobWriter) AppendFragment(item ...localblobstore.BlockOrFile) (err error) {
 	if bw.f == nil {
 		panic("fs_cablobstore.BlobWriter programming error: AppendFragment() after Close()")
 	}
@@ -782,7 +775,7 @@
 // NewBlobReader() returns a pointer to a newly allocated BlobReader on the
 // specified blobName.  BlobReaders should not be used concurrently by multiple
 // threads.  Returned handles should be closed with Close().
-func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br *BlobReader, err error) {
+func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br localblobstore.BlobReader, err error) {
 	var desc *blobDesc
 	desc, err = fscabs.getBlob(ctx, blobName)
 	if err == nil {
@@ -954,7 +947,7 @@
 //    if fscabsi.Err() != nil {
 //      // The loop terminated early due to an error.
 //    }
-func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) (fscabsi *FsCasIter) {
+func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Iter {
 	stack := make([]dirListing, 1)
 	stack[0] = dirListing{pos: -1, nameList: []string{blobDir}}
 	return &FsCasIter{fscabs: fscabs, stack: stack}
@@ -970,7 +963,7 @@
 //    if fscabsi.Err() != nil {
 //      // The loop terminated early due to an error.
 //    }
-func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) (fscabsi *FsCasIter) {
+func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Iter {
 	stack := make([]dirListing, 1)
 	stack[0] = dirListing{pos: -1, nameList: []string{casDir}}
 	return &FsCasIter{fscabs: fscabs, stack: stack}
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
index ded5e3c..886f5c4 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
@@ -5,345 +5,16 @@
 // A test for fs_cablobstore
 package fs_cablobstore_test
 
-import "bytes"
-import "crypto/md5"
-import "fmt"
-import "io"
 import "io/ioutil"
 import "os"
-import "path/filepath"
 import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
 import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
-import "v.io/v23/context"
-import "v.io/v23/verror"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
 import "v.io/x/ref/test"
 import _ "v.io/x/ref/runtime/factories/generic"
 
-func TestCreate(t *testing.T) {
-	ctx, shutdown := test.V23Init()
-	defer shutdown()
-
-	// Make a temporary directory.
-	var err error
-	var testDirName string
-	testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
-	if err != nil {
-		t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
-	}
-	defer os.RemoveAll(testDirName)
-
-	// Check that we can create an fs_cablobstore.
-	var fscabs *fs_cablobstore.FsCaBlobStore
-	fscabs, err = fs_cablobstore.Create(ctx, testDirName)
-	if err != nil {
-		t.Errorf("fs_cablobstore.Create failed: %v", err)
-	}
-
-	// Check that there are no files in the newly-created tree.
-	iterator := fscabs.ListBlobIds(ctx)
-	for iterator.Advance() {
-		fileName := iterator.Value()
-		t.Errorf("unexpected file %q\n", fileName)
-	}
-	if iterator.Err() != nil {
-		t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
-	}
-}
-
-// A blobOrBlockOrFile represents some bytes that may be contained in a named
-// blob, a named file, or in an explicit slice of bytes.
-type blobOrBlockOrFile struct {
-	blob   string // If non-emtpy, the name of the blob containing the bytes.
-	file   string // If non-empty and blob is empty, the name of the file containing the bytes.
-	size   int64  // Size of part of file or blob, or -1 for "everything until EOF".
-	offset int64  // Offset within file or blob.
-	block  []byte // If both blob and file are empty, a slice containing the bytes.
-}
-
-// A testBlob records that some specified content has been stored with a given
-// blob name in the blob store.
-type testBlob struct {
-	content  []byte // content that has been stored.
-	blobName string // the name of the blob.
-}
-
-// removeBlobFromBlobVector() removes the entry named blobName from
-// blobVector[], returning the new vector.
-func removeBlobFromBlobVector(blobVector []testBlob, blobName string) []testBlob {
-	n := len(blobVector)
-	i := 0
-	for i = 0; i != n && blobName != blobVector[i].blobName; i++ {
-	}
-	if i != n {
-		blobVector[i] = blobVector[n-1]
-		blobVector = blobVector[0 : n-1]
-	}
-	return blobVector
-}
-
-// writeBlob() writes a new blob to *fscabs, and returns its name.  The new
-// blob's content is described by the elements of data[].  Any error messages
-// generated include the index of the blob in blobVector and its content; the
-// latter is assumed to be printable.  The expected content of the the blob is
-// "content", so that this routine can check it.  If useResume is true, and data[]
-// has length more than 1, the function artificially uses ResumeBlobWriter(),
-// to test it.
-func writeBlob(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob,
-	content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
-	var bw *fs_cablobstore.BlobWriter
-	var err error
-	bw, err = fscabs.NewBlobWriter(ctx)
-	if err != nil {
-		t.Fatalf("fs_cablobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
-	}
-	blobName := bw.Name()
-
-	// Construct the blob from the pieces.
-	// There is a loop within the loop to exercise the possibility of
-	// passing multiple fragments to AppendFragment().
-	for i := 0; i != len(data) && err == nil; {
-		if len(data[i].blob) != 0 {
-			err = bw.AppendBlob(data[i].blob, data[i].size, data[i].offset)
-			if err != nil {
-				t.Errorf("fs_cablobstore.AppendBlob %d:%s blob %s failed: %v", len(blobVector), string(content), data[i].blob, err)
-			}
-			i++
-		} else {
-			var pieces []fs_cablobstore.BlockOrFile
-			for ; i != len(data) && len(data[i].blob) == 0; i++ {
-				if len(data[i].file) != 0 {
-					pieces = append(pieces, fs_cablobstore.BlockOrFile{
-						FileName: data[i].file,
-						Size:     data[i].size,
-						Offset:   data[i].offset})
-				} else {
-					pieces = append(pieces, fs_cablobstore.BlockOrFile{Block: data[i].block})
-				}
-			}
-			err = bw.AppendFragment(pieces...)
-			if err != nil {
-				t.Errorf("fs_cablobstore.AppendFragment %d:%s failed on %v: %v", len(blobVector), string(content), pieces, err)
-			}
-		}
-		if useResume && i < len(data)-1 && err == nil {
-			err = bw.CloseWithoutFinalize()
-			if err == nil {
-				bw, err = fscabs.ResumeBlobWriter(ctx, blobName)
-			}
-		}
-	}
-
-	if bw != nil {
-		if bw.Size() != int64(len(content)) {
-			t.Errorf("fs_cablobstore.Size before finalization %d:%s got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
-		}
-		if bw.IsFinalized() {
-			t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
-		}
-		err = bw.Close()
-		if err != nil {
-			t.Errorf("fs_cablobstore.Close %d:%s failed: %v", len(blobVector), string(content), err)
-		}
-		if !bw.IsFinalized() {
-			t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
-		}
-		if bw.Size() != int64(len(content)) {
-			t.Errorf("fs_cablobstore.Size %d:%s after finalization got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
-		}
-		if bw.Name() != blobName {
-			t.Errorf("fs_cablobstore %d:%s name changed when finalized was %s now %s", len(blobVector), string(content), blobName, bw.Name())
-		}
-		hasher := md5.New()
-		hasher.Write(content)
-		if bytes.Compare(bw.Hash(), hasher.Sum(nil)) != 0 {
-			t.Errorf("fs_cablobstore %d:%s BlobWriter.Hash got %v, expected %v", len(blobVector), string(content), bw.Hash(), hasher.Sum(nil))
-		}
-	}
-
-	return append(blobVector,
-		testBlob{
-			content:  content,
-			blobName: blobName,
-		})
-}
-
-// readBlob() returns a substring of the content of the blob named blobName in *fscabs.
-// The return values are:
-// - the "size" bytes from the content, starting at the given "offset",
-//   measured from "whence" (as defined by io.Seeker.Seek).
-// - the position to which BlobBeader seeks to,
-// - the md5 hash of the bytes read, and
-// - the md5 hash of the bytes of the blob, as returned by BlobReader.Hash(),
-// - and error.
-func readBlob(ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobName string,
-	size int64, offset int64, whence int) (content []byte, pos int64, hash []byte, fullHash []byte, err error) {
-
-	var br *fs_cablobstore.BlobReader
-	hasher := md5.New()
-	br, err = fscabs.NewBlobReader(ctx, blobName)
-	if err == nil {
-		buf := make([]byte, 8192, 8192)
-		fullHash = br.Hash()
-		pos, err = br.Seek(offset, whence)
-		if err == nil {
-			var n int
-			first := true // Read at least once, to test reading zero bytes.
-			for err == nil && (size == -1 || int64(len(content)) < size || first) {
-				// Read just what was asked for.
-				var toRead []byte = buf
-				if size >= 0 && int(size)-len(content) < len(buf) {
-					toRead = buf[0 : int(size)-len(content)]
-				}
-				n, err = br.Read(toRead)
-				hasher.Write(toRead[0:n])
-				if size >= 0 && int64(len(content)+n) > size {
-					n = int(size) - len(content)
-				}
-				content = append(content, toRead[0:n]...)
-				first = false
-			}
-		}
-		br.Close()
-	}
-	return content, pos, hasher.Sum(nil), fullHash, err
-}
-
-// checkWrittenBlobsAreReadable() checks that the blobs in blobVector[] can be
-// read, and that they contain the appropriate data.
-func checkWrittenBlobsAreReadable(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob) {
-	for i := range blobVector {
-		var size int64
-		data := blobVector[i].content
-		dataLen := int64(len(data))
-		blobName := blobVector[i].blobName
-		for size = -1; size != dataLen+1; size++ {
-			var offset int64
-			for offset = -dataLen - 1; offset != dataLen+1; offset++ {
-				for whence := -1; whence != 4; whence++ {
-					content, pos, hash, fullHash, err := readBlob(ctx, fscabs, blobName, size, offset, whence)
-
-					// Compute expected seek position.
-					expectedPos := offset
-					if whence == 2 {
-						expectedPos += dataLen
-					}
-
-					// Computed expected size.
-					expectedSize := size
-					if expectedSize == -1 || expectedPos+expectedSize > dataLen {
-						expectedSize = dataLen - expectedPos
-					}
-
-					// Check that reads behave as expected.
-					if (whence == -1 || whence == 3) &&
-						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errBadSeekWhence" {
-						// Expected error from bad "whence" value.
-					} else if expectedPos < 0 &&
-						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errNegativeSeekPosition" {
-						// Expected error from negative Seek position.
-					} else if expectedPos > dataLen &&
-						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errIllegalPositionForRead" {
-						// Expected error from too high a Seek position.
-					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
-						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == io.EOF &&
-						pos == expectedPos && expectedPos+expectedSize == dataLen {
-						// Expected success with EOF.
-					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
-						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == nil &&
-						pos == expectedPos && expectedPos+expectedSize != dataLen {
-						if pos == 0 && size == -1 && bytes.Compare(hash, fullHash) != 0 {
-							t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d; got hash %v, expected %v  (blob is %q)",
-								string(data), size, offset, whence,
-								hash, fullHash, blobName)
-						} // Else expected success without EOF.
-					} else {
-						t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d yields %q pos %d %v   (blob is %q)",
-							string(data), size, offset, whence,
-							content, pos, err, blobName)
-					}
-				}
-			}
-		}
-	}
-}
-
-// checkAllBlobs() checks all the blobs in *fscabs to ensure they correspond to
-// those in blobVector[].
-func checkAllBlobs(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob, testDirName string) {
-	blobCount := 0
-	iterator := fscabs.ListBlobIds(ctx)
-	for iterator.Advance() {
-		fileName := iterator.Value()
-		i := 0
-		for ; i != len(blobVector) && fileName != blobVector[i].blobName; i++ {
-		}
-		if i == len(blobVector) {
-			t.Errorf("fs_cablobstore.ListBlobIds found unexpected file %s", fileName)
-		} else {
-			content, pos, hash, fullHash, err := readBlob(ctx, fscabs, fileName, -1, 0, 0)
-			if err != nil && err != io.EOF {
-				t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
-			} else if bytes.Compare(blobVector[i].content, content) != 0 {
-				t.Errorf("fs_cablobstore.ListCAIds found unexpected blob content: %q, contains %q, expected %q",
-					filepath.Join(testDirName, fileName), content, string(blobVector[i].content))
-			} else if pos != 0 {
-				t.Errorf("fs_cablobstore.ListCAIds Seek on %q returned %d instead of 0",
-					filepath.Join(testDirName, fileName), pos)
-			}
-			if bytes.Compare(hash, fullHash) != 0 {
-				t.Errorf("fs_cablobstore.ListCAIds read on %q; got hash %v, expected %v",
-					fileName, hash, fullHash)
-			}
-		}
-		blobCount++
-	}
-	if iterator.Err() != nil {
-		t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
-	}
-	if blobCount != len(blobVector) {
-		t.Errorf("fs_cablobstore.ListBlobIds iteration expected 4 files, got %d", blobCount)
-	}
-}
-
-// checkFragments() checks all the fragments in *fscabs to ensure they
-// correspond to those fragmentMap[].
-func checkFragments(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, fragmentMap map[string]bool, testDirName string) {
-	caCount := 0
-	iterator := fscabs.ListCAIds(ctx)
-	for iterator.Advance() {
-		fileName := iterator.Value()
-		content, err := ioutil.ReadFile(filepath.Join(testDirName, fileName))
-		if err != nil && err != io.EOF {
-			t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
-		} else if !fragmentMap[string(content)] {
-			t.Errorf("fs_cablobstore.ListCAIds found unexpected fragment entry: %q, contains %q", filepath.Join(testDirName, fileName), content)
-		} else {
-			hasher := md5.New()
-			hasher.Write(content)
-			hash := hasher.Sum(nil)
-			nameFromContent := filepath.Join("cas",
-				fmt.Sprintf("%02x", hash[0]),
-				fmt.Sprintf("%02x", hash[1]),
-				fmt.Sprintf("%02x", hash[2]),
-				fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
-					hash[3],
-					hash[4], hash[5], hash[6], hash[7],
-					hash[8], hash[9], hash[10], hash[11],
-					hash[12], hash[13], hash[14], hash[15]))
-			if nameFromContent != fileName {
-				t.Errorf("fs_cablobstore.ListCAIds hash of fragment: got %q, expected %q (content=%s)", nameFromContent, fileName, string(content))
-			}
-		}
-		caCount++
-	}
-	if iterator.Err() != nil {
-		t.Errorf("fs_cablobstore.ListCAIds iteration failed: %v", iterator.Err())
-	}
-	if caCount != len(fragmentMap) {
-		t.Errorf("fs_cablobstore.ListCAIds iteration expected %d files, got %d", len(fragmentMap), caCount)
-	}
-}
-
 // This test case tests adding files, retrieving them and deleting them.  One
 // can't retrieve or delete something that hasn't been created, so it's all one
 // test case.
@@ -354,231 +25,19 @@
 	// Make a temporary directory.
 	var err error
 	var testDirName string
-	testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
+	testDirName, err = ioutil.TempDir("", "localblobstore_test")
 	if err != nil {
-		t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
+		t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
 	}
 	defer os.RemoveAll(testDirName)
 
 	// Create an fs_cablobstore.
-	var fscabs *fs_cablobstore.FsCaBlobStore
-	fscabs, err = fs_cablobstore.Create(ctx, testDirName)
+	var bs localblobstore.BlobStore
+	bs, err = fs_cablobstore.Create(ctx, testDirName)
 	if err != nil {
 		t.Fatalf("fs_cablobstore.Create failed: %v", err)
 	}
 
-	// Create the strings:  "wom", "bat", "wombat", "batwom", "atwo", "atwoatwoombatatwo".
-	womData := []byte("wom")
-	batData := []byte("bat")
-	wombatData := []byte("wombat")
-	batwomData := []byte("batwom")
-	atwoData := []byte("atwo")
-	atwoatwoombatatwoData := []byte("atwoatwoombatatwo")
-
-	// fragmentMap will have an entry per content-addressed fragment.
-	fragmentMap := make(map[string]bool)
-
-	// Create the blobs, by various means.
-
-	var blobVector []testBlob // Accumulate the blobs we create here.
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		womData, false,
-		blobOrBlockOrFile{block: womData})
-	womName := blobVector[len(blobVector)-1].blobName
-	fragmentMap[string(womData)] = true
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		batData, false,
-		blobOrBlockOrFile{block: batData})
-	batName := blobVector[len(blobVector)-1].blobName
-	fragmentMap[string(batData)] = true
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		wombatData, false,
-		blobOrBlockOrFile{block: wombatData})
-	firstWombatName := blobVector[len(blobVector)-1].blobName
-	fragmentMap[string(wombatData)] = true
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		wombatData, true,
-		blobOrBlockOrFile{block: womData},
-		blobOrBlockOrFile{block: batData})
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		wombatData, false,
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   -1,
-			offset: 0})
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		wombatData, false,
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   6,
-			offset: 0})
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		batwomData, false,
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   3,
-			offset: 3},
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   3,
-			offset: 0})
-	batwomName := blobVector[len(blobVector)-1].blobName
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		atwoData, false,
-		blobOrBlockOrFile{
-			blob:   batwomName,
-			size:   4,
-			offset: 1})
-	atwoName := blobVector[len(blobVector)-1].blobName
-
-	blobVector = writeBlob(t, ctx, fscabs, blobVector,
-		atwoatwoombatatwoData, true,
-		blobOrBlockOrFile{
-			blob:   atwoName,
-			size:   -1,
-			offset: 0},
-		blobOrBlockOrFile{
-			blob:   atwoName,
-			size:   4,
-			offset: 0},
-		blobOrBlockOrFile{
-			blob:   firstWombatName,
-			size:   -1,
-			offset: 1},
-		blobOrBlockOrFile{
-			blob:   batName,
-			size:   -1,
-			offset: 1},
-		blobOrBlockOrFile{
-			blob:   womName,
-			size:   2,
-			offset: 0})
-	atwoatwoombatatwoName := blobVector[len(blobVector)-1].blobName
-
-	// -------------------------------------------------
-	// Check that the state is as we expect.
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Nothing should change if we garbage collect.
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Ensure that deleting non-existent blobs fails.
-	err = fscabs.DeleteBlob(ctx, "../../../../etc/passwd")
-	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
-		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
-	}
-	err = fscabs.DeleteBlob(ctx, "foo/00/00/00/00000000000000000000000000")
-	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
-		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
-	}
-
-	// -------------------------------------------------
-	// Delete a blob.
-	err = fscabs.DeleteBlob(ctx, batName)
-	if err != nil {
-		t.Errorf("DeleteBlob failed to delete blob %q: %v", batName, err)
-	}
-	blobVector = removeBlobFromBlobVector(blobVector, batName)
-
-	// -------------------------------------------------
-	// Check that the state is as we expect.
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Nothing should change if we garbage collect.
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Open a BlobReader on a blob we're about to delete,
-	// so its fragments won't be garbage collected.
-
-	var br *fs_cablobstore.BlobReader
-	br, err = fscabs.NewBlobReader(ctx, atwoatwoombatatwoName)
-	if err != nil {
-		t.Errorf("NewBlobReader failed in blob %q: %v", atwoatwoombatatwoName, err)
-	}
-
-	// -------------------------------------------------
-	// Delete a blob.  This should be the last on-disc reference to the
-	// content-addressed fragment "bat", but the fragment won't be deleted
-	// until close the reader and garbage collect.
-	err = fscabs.DeleteBlob(ctx, atwoatwoombatatwoName)
-	if err != nil {
-		t.Errorf("DeleteBlob failed to delete blob %q: %v", atwoatwoombatatwoName, err)
-	}
-	blobVector = removeBlobFromBlobVector(blobVector, atwoatwoombatatwoName)
-
-	// -------------------------------------------------
-	// Check that the state is as we expect.
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Garbage collection should change nothing; the fragment involved
-	// is still referenced from the open reader *br.
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-
-	// Close the open BlobReader and garbage collect.
-	err = br.Close()
-	if err != nil {
-		t.Errorf("BlobReader.Close failed on blob %q: %v", atwoatwoombatatwoName, err)
-	}
-	delete(fragmentMap, string(batData))
-
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// Delete all blobs.
-	for len(blobVector) != 0 {
-		err = fscabs.DeleteBlob(ctx, blobVector[0].blobName)
-		if err != nil {
-			t.Errorf("DeleteBlob failed to delete blob %q: %v", blobVector[0].blobName, err)
-		}
-		blobVector = removeBlobFromBlobVector(blobVector, blobVector[0].blobName)
-	}
-
-	// -------------------------------------------------
-	// Check that the state is as we expect.
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
-	// -------------------------------------------------
-	// The remaining fragments should be removed when we garbage collect.
-	for frag := range fragmentMap {
-		delete(fragmentMap, frag)
-	}
-	fscabs.GC(ctx)
-	checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
-	checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
-	checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+	// Test it.
+	localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
 }
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_test.go b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
new file mode 100644
index 0000000..973a9c8
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
@@ -0,0 +1,43 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test for localblobstore
+package localblobstore_test
+
+import "io/ioutil"
+import "os"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
+
+// This test case tests adding files, retrieving them and deleting them.  One
+// can't retrieve or delete something that hasn't been created, so it's all one
+// test case.
+func TestAddRetrieveAndDelete(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	// Make a temporary directory.
+	var err error
+	var testDirName string
+	testDirName, err = ioutil.TempDir("", "localblobstore_test")
+	if err != nil {
+		t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+	}
+	defer os.RemoveAll(testDirName)
+
+	// Create an fs_cablobstore.
+	var bs localblobstore.BlobStore
+	bs, err = fs_cablobstore.Create(ctx, testDirName)
+	if err != nil {
+		t.Fatalf("fs_cablobstore.Create failed: %v", err)
+	}
+
+	// Test it.
+	localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
new file mode 100644
index 0000000..662e964
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
@@ -0,0 +1,548 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test library for localblobstores.
+package localblobstore_testlib
+
+import "bytes"
+import "crypto/md5"
+import "fmt"
+import "io"
+import "io/ioutil"
+import "path/filepath"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/v23/context"
+import "v.io/v23/verror"
+
+// A blobOrBlockOrFile represents some bytes that may be contained in a named
+// blob, a named file, or in an explicit slice of bytes.
+type blobOrBlockOrFile struct {
+	blob   string // If non-empty, the name of the blob containing the bytes.
+	file   string // If non-empty and blob is empty, the name of the file containing the bytes.
+	size   int64  // Size of part of file or blob, or -1 for "everything until EOF".
+	offset int64  // Offset within file or blob.
+	block  []byte // If both blob and file are empty, a slice containing the bytes.
+}
+
+// A testBlob records that some specified content has been stored with a given
+// blob name in the blob store.
+type testBlob struct {
+	content  []byte // content that has been stored.
+	blobName string // the name of the blob.
+}
+
+// removeBlobFromBlobVector() removes the entry named blobName from
+// blobVector[], returning the new vector.
+func removeBlobFromBlobVector(blobVector []testBlob, blobName string) []testBlob {
+	n := len(blobVector)
+	i := 0
+	for i = 0; i != n && blobName != blobVector[i].blobName; i++ {
+	}
+	if i != n {
+		blobVector[i] = blobVector[n-1]
+		blobVector = blobVector[0 : n-1]
+	}
+	return blobVector
+}
+
+// writeBlob() writes a new blob to bs, and returns its name.  The new
+// blob's content is described by the elements of data[].  Any error messages
+// generated include the index of the blob in blobVector and its content; the
+// latter is assumed to be printable.  The expected content of the the blob is
+// "content", so that this routine can check it.  If useResume is true, and data[]
+// has length more than 1, the function artificially uses ResumeBlobWriter(),
+// to test it.
+func writeBlob(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob,
+	content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
+	var bw localblobstore.BlobWriter
+	var err error
+	bw, err = bs.NewBlobWriter(ctx)
+	if err != nil {
+		t.Errorf("localblobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
+	}
+	blobName := bw.Name()
+
+	// Construct the blob from the pieces.
+	// There is a loop within the loop to exercise the possibility of
+	// passing multiple fragments to AppendFragment().
+	for i := 0; i != len(data) && err == nil; {
+		if len(data[i].blob) != 0 {
+			err = bw.AppendBlob(data[i].blob, data[i].size, data[i].offset)
+			if err != nil {
+				t.Errorf("localblobstore.AppendBlob %d:%s blob %s failed: %v", len(blobVector), string(content), data[i].blob, err)
+			}
+			i++
+		} else {
+			var pieces []localblobstore.BlockOrFile
+			for ; i != len(data) && len(data[i].blob) == 0; i++ {
+				if len(data[i].file) != 0 {
+					pieces = append(pieces, localblobstore.BlockOrFile{
+						FileName: data[i].file,
+						Size:     data[i].size,
+						Offset:   data[i].offset})
+				} else {
+					pieces = append(pieces, localblobstore.BlockOrFile{Block: data[i].block})
+				}
+			}
+			err = bw.AppendFragment(pieces...)
+			if err != nil {
+				t.Errorf("localblobstore.AppendFragment %d:%s failed on %v: %v", len(blobVector), string(content), pieces, err)
+			}
+		}
+		if useResume && i < len(data)-1 && err == nil {
+			err = bw.CloseWithoutFinalize()
+			if err == nil {
+				bw, err = bs.ResumeBlobWriter(ctx, blobName)
+			}
+		}
+	}
+
+	if bw != nil {
+		if bw.Size() != int64(len(content)) {
+			t.Errorf("localblobstore.Size before finalization %d:%s got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+		}
+		if bw.IsFinalized() {
+			t.Errorf("localblobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+		}
+		err = bw.Close()
+		if err != nil {
+			t.Errorf("localblobstore.Close %d:%s failed: %v", len(blobVector), string(content), err)
+		}
+		if !bw.IsFinalized() {
+			t.Errorf("localblobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+		}
+		if bw.Size() != int64(len(content)) {
+			t.Errorf("localblobstore.Size %d:%s after finalization got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+		}
+		if bw.Name() != blobName {
+			t.Errorf("localblobstore %d:%s name changed when finalized was %s now %s", len(blobVector), string(content), blobName, bw.Name())
+		}
+		hasher := md5.New()
+		hasher.Write(content)
+		if bytes.Compare(bw.Hash(), hasher.Sum(nil)) != 0 {
+			t.Errorf("localblobstore %d:%s BlobWriter.Hash got %v, expected %v", len(blobVector), string(content), bw.Hash(), hasher.Sum(nil))
+		}
+	}
+
+	return append(blobVector,
+		testBlob{
+			content:  content,
+			blobName: blobName,
+		})
+}
+
+// readBlob() returns a substring of the content of the blob named blobName in bs.
+// The return values are:
+// - the "size" bytes from the content, starting at the given "offset",
+//   measured from "whence" (as defined by io.Seeker.Seek).
+// - the position to which BlobBeader seeks to,
+// - the md5 hash of the bytes read, and
+// - the md5 hash of the bytes of the blob, as returned by BlobReader.Hash(),
+// - and error.
+func readBlob(ctx *context.T, bs localblobstore.BlobStore, blobName string,
+	size int64, offset int64, whence int) (content []byte, pos int64, hash []byte, fullHash []byte, err error) {
+
+	var br localblobstore.BlobReader
+	hasher := md5.New()
+	br, err = bs.NewBlobReader(ctx, blobName)
+	if err == nil {
+		buf := make([]byte, 8192, 8192)
+		fullHash = br.Hash()
+		pos, err = br.Seek(offset, whence)
+		if err == nil {
+			var n int
+			first := true // Read at least once, to test reading zero bytes.
+			for err == nil && (size == -1 || int64(len(content)) < size || first) {
+				// Read just what was asked for.
+				var toRead []byte = buf
+				if size >= 0 && int(size)-len(content) < len(buf) {
+					toRead = buf[0 : int(size)-len(content)]
+				}
+				n, err = br.Read(toRead)
+				hasher.Write(toRead[0:n])
+				if size >= 0 && int64(len(content)+n) > size {
+					n = int(size) - len(content)
+				}
+				content = append(content, toRead[0:n]...)
+				first = false
+			}
+		}
+		br.Close()
+	}
+	return content, pos, hasher.Sum(nil), fullHash, err
+}
+
+// checkWrittenBlobsAreReadable() checks that the blobs in blobVector[] can be
+// read, and that they contain the appropriate data.
+func checkWrittenBlobsAreReadable(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob) {
+	for i := range blobVector {
+		var size int64
+		data := blobVector[i].content
+		dataLen := int64(len(data))
+		blobName := blobVector[i].blobName
+		for size = -1; size != dataLen+1; size++ {
+			var offset int64
+			for offset = -dataLen - 1; offset != dataLen+1; offset++ {
+				for whence := -1; whence != 4; whence++ {
+					content, pos, hash, fullHash, err := readBlob(ctx, bs, blobName, size, offset, whence)
+
+					// Compute expected seek position.
+					expectedPos := offset
+					if whence == 2 {
+						expectedPos += dataLen
+					}
+
+					// Computed expected size.
+					expectedSize := size
+					if expectedSize == -1 || expectedPos+expectedSize > dataLen {
+						expectedSize = dataLen - expectedPos
+					}
+
+					// Check that reads behave as expected.
+					if (whence == -1 || whence == 3) &&
+						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errBadSeekWhence" {
+						// Expected error from bad "whence" value.
+					} else if expectedPos < 0 &&
+						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errNegativeSeekPosition" {
+						// Expected error from negative Seek position.
+					} else if expectedPos > dataLen &&
+						verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errIllegalPositionForRead" {
+						// Expected error from too high a Seek position.
+					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == io.EOF &&
+						pos == expectedPos && expectedPos+expectedSize == dataLen {
+						// Expected success with EOF.
+					} else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+						bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == nil &&
+						pos == expectedPos && expectedPos+expectedSize != dataLen {
+						if pos == 0 && size == -1 && bytes.Compare(hash, fullHash) != 0 {
+							t.Errorf("localblobstore read test on %q size %d offset %d whence %d; got hash %v, expected %v  (blob is %q)",
+								string(data), size, offset, whence,
+								hash, fullHash, blobName)
+						} // Else expected success without EOF.
+					} else {
+						t.Errorf("localblobstore read test on %q size %d offset %d whence %d yields %q pos %d %v   (blob is %q)",
+							string(data), size, offset, whence,
+							content, pos, err, blobName)
+					}
+				}
+			}
+		}
+	}
+}
+
+// checkAllBlobs() checks all the blobs in bs to ensure they correspond to
+// those in blobVector[].
+func checkAllBlobs(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob, testDirName string) {
+	blobCount := 0
+	iterator := bs.ListBlobIds(ctx)
+	for iterator.Advance() {
+		fileName := iterator.Value()
+		i := 0
+		for ; i != len(blobVector) && fileName != blobVector[i].blobName; i++ {
+		}
+		if i == len(blobVector) {
+			t.Errorf("localblobstore.ListBlobIds found unexpected file %s", fileName)
+		} else {
+			content, pos, hash, fullHash, err := readBlob(ctx, bs, fileName, -1, 0, 0)
+			if err != nil && err != io.EOF {
+				t.Errorf("localblobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+			} else if bytes.Compare(blobVector[i].content, content) != 0 {
+				t.Errorf("localblobstore.ListCAIds found unexpected blob content: %q, contains %q, expected %q",
+					filepath.Join(testDirName, fileName), content, string(blobVector[i].content))
+			} else if pos != 0 {
+				t.Errorf("localblobstore.ListCAIds Seek on %q returned %d instead of 0",
+					filepath.Join(testDirName, fileName), pos)
+			}
+			if bytes.Compare(hash, fullHash) != 0 {
+				t.Errorf("localblobstore.ListCAIds read on %q; got hash %v, expected %v",
+					fileName, hash, fullHash)
+			}
+		}
+		blobCount++
+	}
+	if iterator.Err() != nil {
+		t.Errorf("localblobstore.ListBlobIds iteration failed: %v", iterator.Err())
+	}
+	if blobCount != len(blobVector) {
+		t.Errorf("localblobstore.ListBlobIds iteration expected 4 files, got %d", blobCount)
+	}
+}
+
+// checkFragments() checks all the fragments in bs to ensure they
+// correspond to those fragmentMap[], iff testDirName is non-empty.
+func checkFragments(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, fragmentMap map[string]bool, testDirName string) {
+	if testDirName != "" {
+		caCount := 0
+		iterator := bs.ListCAIds(ctx)
+		for iterator.Advance() {
+			fileName := iterator.Value()
+			content, err := ioutil.ReadFile(filepath.Join(testDirName, fileName))
+			if err != nil && err != io.EOF {
+				t.Errorf("localblobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+			} else if !fragmentMap[string(content)] {
+				t.Errorf("localblobstore.ListCAIds found unexpected fragment entry: %q, contains %q", filepath.Join(testDirName, fileName), content)
+			} else {
+				hasher := md5.New()
+				hasher.Write(content)
+				hash := hasher.Sum(nil)
+				nameFromContent := filepath.Join("cas",
+					fmt.Sprintf("%02x", hash[0]),
+					fmt.Sprintf("%02x", hash[1]),
+					fmt.Sprintf("%02x", hash[2]),
+					fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+						hash[3],
+						hash[4], hash[5], hash[6], hash[7],
+						hash[8], hash[9], hash[10], hash[11],
+						hash[12], hash[13], hash[14], hash[15]))
+				if nameFromContent != fileName {
+					t.Errorf("localblobstore.ListCAIds hash of fragment: got %q, expected %q (content=%s)", nameFromContent, fileName, string(content))
+				}
+			}
+			caCount++
+		}
+		if iterator.Err() != nil {
+			t.Errorf("localblobstore.ListCAIds iteration failed: %v", iterator.Err())
+		}
+		if caCount != len(fragmentMap) {
+			t.Errorf("localblobstore.ListCAIds iteration expected %d files, got %d", len(fragmentMap), caCount)
+		}
+	}
+}
+
+// AddRetrieveAndDelete() tests adding, retrieving, and deleting blobs from a
+// blobstore bs.  One can't retrieve or delete something that hasn't been
+// created, so it's all done in one routine.    If testDirName is non-empty,
+// the blobstore is assumed to be accessible in the file system, and its
+// files are checked.
+func AddRetrieveAndDelete(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, testDirName string) {
+	var err error
+
+	// Check that there are no files in the blobstore we were given.
+	iterator := bs.ListBlobIds(ctx)
+	for iterator.Advance() {
+		fileName := iterator.Value()
+		t.Errorf("unexpected file %q\n", fileName)
+	}
+	if iterator.Err() != nil {
+		t.Errorf("localblobstore.ListBlobIds iteration failed: %v", iterator.Err())
+	}
+
+	// Create the strings:  "wom", "bat", "wombat", "batwom", "atwo", "atwoatwoombatatwo".
+	womData := []byte("wom")
+	batData := []byte("bat")
+	wombatData := []byte("wombat")
+	batwomData := []byte("batwom")
+	atwoData := []byte("atwo")
+	atwoatwoombatatwoData := []byte("atwoatwoombatatwo")
+
+	// fragmentMap will have an entry per content-addressed fragment.
+	fragmentMap := make(map[string]bool)
+
+	// Create the blobs, by various means.
+
+	var blobVector []testBlob // Accumulate the blobs we create here.
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		womData, false,
+		blobOrBlockOrFile{block: womData})
+	womName := blobVector[len(blobVector)-1].blobName
+	fragmentMap[string(womData)] = true
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		batData, false,
+		blobOrBlockOrFile{block: batData})
+	batName := blobVector[len(blobVector)-1].blobName
+	fragmentMap[string(batData)] = true
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		wombatData, false,
+		blobOrBlockOrFile{block: wombatData})
+	firstWombatName := blobVector[len(blobVector)-1].blobName
+	fragmentMap[string(wombatData)] = true
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		wombatData, true,
+		blobOrBlockOrFile{block: womData},
+		blobOrBlockOrFile{block: batData})
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		wombatData, false,
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   -1,
+			offset: 0})
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		wombatData, false,
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   6,
+			offset: 0})
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		batwomData, false,
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   3,
+			offset: 3},
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   3,
+			offset: 0})
+	batwomName := blobVector[len(blobVector)-1].blobName
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		atwoData, false,
+		blobOrBlockOrFile{
+			blob:   batwomName,
+			size:   4,
+			offset: 1})
+	atwoName := blobVector[len(blobVector)-1].blobName
+
+	blobVector = writeBlob(t, ctx, bs, blobVector,
+		atwoatwoombatatwoData, true,
+		blobOrBlockOrFile{
+			blob:   atwoName,
+			size:   -1,
+			offset: 0},
+		blobOrBlockOrFile{
+			blob:   atwoName,
+			size:   4,
+			offset: 0},
+		blobOrBlockOrFile{
+			blob:   firstWombatName,
+			size:   -1,
+			offset: 1},
+		blobOrBlockOrFile{
+			blob:   batName,
+			size:   -1,
+			offset: 1},
+		blobOrBlockOrFile{
+			blob:   womName,
+			size:   2,
+			offset: 0})
+	atwoatwoombatatwoName := blobVector[len(blobVector)-1].blobName
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Nothing should change if we garbage collect.
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Ensure that deleting non-existent blobs fails.
+	err = bs.DeleteBlob(ctx, "../../../../etc/passwd")
+	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+	}
+	err = bs.DeleteBlob(ctx, "foo/00/00/00/00000000000000000000000000")
+	if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+		t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+	}
+
+	// -------------------------------------------------
+	// Delete a blob.
+	err = bs.DeleteBlob(ctx, batName)
+	if err != nil {
+		t.Errorf("DeleteBlob failed to delete blob %q: %v", batName, err)
+	}
+	blobVector = removeBlobFromBlobVector(blobVector, batName)
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Nothing should change if we garbage collect.
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Open a BlobReader on a blob we're about to delete,
+	// so its fragments won't be garbage collected.
+
+	var br localblobstore.BlobReader
+	br, err = bs.NewBlobReader(ctx, atwoatwoombatatwoName)
+	if err != nil {
+		t.Errorf("NewBlobReader failed in blob %q: %v", atwoatwoombatatwoName, err)
+	}
+
+	// -------------------------------------------------
+	// Delete a blob.  This should be the last on-disc reference to the
+	// content-addressed fragment "bat", but the fragment won't be deleted
+	// until close the reader and garbage collect.
+	err = bs.DeleteBlob(ctx, atwoatwoombatatwoName)
+	if err != nil {
+		t.Errorf("DeleteBlob failed to delete blob %q: %v", atwoatwoombatatwoName, err)
+	}
+	blobVector = removeBlobFromBlobVector(blobVector, atwoatwoombatatwoName)
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Garbage collection should change nothing; the fragment involved
+	// is still referenced from the open reader *br.
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+
+	// Close the open BlobReader and garbage collect.
+	err = br.Close()
+	if err != nil {
+		t.Errorf("BlobReader.Close failed on blob %q: %v", atwoatwoombatatwoName, err)
+	}
+	delete(fragmentMap, string(batData))
+
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// Delete all blobs.
+	for len(blobVector) != 0 {
+		err = bs.DeleteBlob(ctx, blobVector[0].blobName)
+		if err != nil {
+			t.Errorf("DeleteBlob failed to delete blob %q: %v", blobVector[0].blobName, err)
+		}
+		blobVector = removeBlobFromBlobVector(blobVector, blobVector[0].blobName)
+	}
+
+	// -------------------------------------------------
+	// Check that the state is as we expect.
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+	// -------------------------------------------------
+	// The remaining fragments should be removed when we garbage collect.
+	for frag := range fragmentMap {
+		delete(fragmentMap, frag)
+	}
+	bs.GC(ctx)
+	checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+	checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+	checkFragments(t, ctx, bs, fragmentMap, testDirName)
+}
diff --git a/x/ref/services/syncbase/localblobstore/model.go b/x/ref/services/syncbase/localblobstore/model.go
new file mode 100644
index 0000000..b1a4565
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/model.go
@@ -0,0 +1,170 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package localblobstore is the interface to a local blob store.
+// Implementations include fs_cablobstore.
+package localblobstore
+
+import "v.io/v23/context"
+
+// A BlobStore represents a simple, content-addressable store.
+type BlobStore interface {
+	// NewBlobReader() returns a pointer to a newly allocated BlobReader on
+	// the specified blobName.  BlobReaders should not be used concurrently
+	// by multiple threads.  Returned handles should be closed with
+	// Close().
+	NewBlobReader(ctx *context.T, blobName string) (br BlobReader, err error)
+
+	// NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
+	// a newly created blob name, which can be found using the Name()
+	// method.  BlobWriters should not be used concurrently by multiple
+	// threads.  The returned handle should be closed with either the
+	// Close() or CloseWithoutFinalize() method to avoid leaking file
+	// handles.
+	NewBlobWriter(ctx *context.T) (bw BlobWriter, err error)
+
+	// ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on
+	// an old, but unfinalized blob name.
+	ResumeBlobWriter(ctx *context.T, blobName string) (bw BlobWriter, err error)
+
+	// DeleteBlob() deletes the named blob from the BlobStore.
+	DeleteBlob(ctx *context.T, blobName string) (err error)
+
+	// GC() removes old temp files and content-addressed blocks that are no
+	// longer referenced by any blob.  It may be called concurrently with
+	// other calls to GC(), and with uses of BlobReaders and BlobWriters.
+	GC(ctx *context.T) error
+
+	// ListBlobIds() returns an iterator that can be used to enumerate the
+	// blobs in a BlobStore.  Expected use is:
+	//
+	//	iter := bs.ListBlobIds(ctx)
+	//	for iter.Advance() {
+	//	  // Process iter.Value() here.
+	//	}
+	//	if iter.Err() != nil {
+	//	  // The loop terminated early due to an error.
+	//	}
+	ListBlobIds(ctx *context.T) (iter Iter)
+
+	// ListCAIds() returns an iterator that can be used to enumerate the
+	// content-addressable fragments in a BlobStore.  Expected use is:
+	//
+	//	iter := bs.ListCAIds(ctx)
+	//	for iter.Advance() {
+	//	  // Process iter.Value() here.
+	//	}
+	//	if iter.Err() != nil {
+	//	  // The loop terminated early due to an error.
+	//	}
+	ListCAIds(ctx *context.T) (iter Iter)
+
+	// Root() returns the name of the root directory where the BlobStore is stored.
+	Root() string
+}
+
+// A BlobReader allows a blob to be read using the standard ReadAt(), Read(),
+// and Seek() calls.  A BlobReader can be created with NewBlobReader(), and
+// should be closed with the Close() method to avoid leaking file handles.
+type BlobReader interface {
+	// ReadAt() fills b[] with up to len(b) bytes of data starting at
+	// position "at" within the blob that the BlobReader indicates, and
+	// returns the number of bytes read.
+	ReadAt(b []byte, at int64) (n int, err error)
+
+	// Read() fills b[] with up to len(b) bytes of data starting at the
+	// current seek position of the BlobReader within the blob that the
+	// BlobReader indicates, and then both returns the number of bytes read
+	// and advances the BlobReader's seek position by that amount.
+	Read(b []byte) (n int, err error)
+
+	// Seek() sets the seek position of the BlobReader to offset if
+	// whence==0, offset+current_seek_position if whence==1, and
+	// offset+end_of_blob if whence==2, and then returns the current seek
+	// position.
+	Seek(offset int64, whence int) (result int64, err error)
+
+	// Close() indicates that the client will perform no further operations
+	// on the BlobReader.  It releases any resources held by the
+	// BlobReader.
+	Close() error
+
+	// Name() returns the BlobReader's name.
+	Name() string
+
+	// Size() returns the BlobReader's size.
+	Size() int64
+
+	// IsFinalized() returns whether the BlobReader has been finalized.
+	IsFinalized() bool
+
+	// Hash() returns the BlobReader's hash.  It may be nil if the blob is
+	// not finalized.
+	Hash() []byte
+}
+
+// A BlockOrFile represents a vector of bytes, and contains either a data
+// block (as a []byte), or a (file name, size, offset) triple.
+type BlockOrFile struct {
+	Block    []byte // If FileName is empty, the bytes represented.
+	FileName string // If non-empty, the name of the file containing the bytes.
+	Size     int64  // If FileName is non-empty, the number of bytes (or -1 for "all")
+	Offset   int64  // If FileName is non-empty, the offset of the relevant bytes within the file.
+}
+
+// A BlobWriter allows a blob to be written.  If a blob has not yet been
+// finalized, it also allows that blob to be extended.  A BlobWriter may be
+// created with NewBlobWriter(), and should be closed with Close() or
+// CloseWithoutFinalize().
+type BlobWriter interface {
+	// AppendBlob() adds a (substring of a) pre-existing blob to the blob
+	// being written by the BlobWriter.  The fragments of the pre-existing
+	// blob are not physically copied; they are referenced by both blobs.
+	AppendBlob(blobName string, size int64, offset int64) (err error)
+
+	// AppendFragment() appends a fragment to the blob being written by the
+	// BlobWriter, where the fragment is composed of the byte vectors
+	// described by the elements of item[].  The fragment is copied into
+	// the blob store.
+	AppendFragment(item ...BlockOrFile) (err error)
+
+	// Close() finalizes the BlobWriter, and indicates that the client will
+	// perform no further append operations on the BlobWriter.  Any
+	// internal open file handles are closed.
+	Close() (err error)
+
+	// CloseWithoutFinalize() indicates that the client will perform no
+	// further append operations on the BlobWriter, but does not finalize
+	// the blob.  Any internal open file handles are closed.  Clients are
+	// expected to need this operation infrequently.
+	CloseWithoutFinalize() (err error)
+
+	// Name() returns the BlobWriter's name.
+	Name() string
+
+	// Size() returns the BlobWriter's size.
+	Size() int64
+
+	// IsFinalized() returns whether the BlobWriter has been finalized.
+	IsFinalized() bool
+
+	// Hash() returns the BlobWriter's hash, reflecting the bytes written so far.
+	Hash() []byte
+}
+
+// A Iter represents an iterator that allows the client to enumerate
+// all the blobs of fragments in a BlobStore.
+type Iter interface {
+	// Advance() stages an item so that it may be retrieved via Value.
+	// Returns true iff there is an item to retrieve.  Advance must be
+	// called before Value is called.
+	Advance() (advanced bool)
+
+	// Value() returns the item that was staged by Advance.  May panic if
+	// Advance returned false or was not called.  Never blocks.
+	Value() (name string)
+
+	// Err() returns any error encountered by Advance.  Never blocks.
+	Err() error
+}
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index b35afce..827eaf9 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -161,7 +161,7 @@
 			return err
 		}
 		// Check for "database already exists".
-		if _, err := a.getDbInfo(ctx, call, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -172,7 +172,7 @@
 		info := &dbInfo{
 			Name: dbName,
 		}
-		return a.putDbInfo(ctx, call, st, dbName, info)
+		return a.putDbInfo(ctx, st, dbName, info)
 	}); err != nil {
 		return err
 	}
@@ -192,7 +192,7 @@
 
 	// 3. Flip dbInfo.Initialized to true.
 	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
 			info.Initialized = true
 			return nil
 		})
@@ -234,7 +234,7 @@
 
 	// 2. Flip dbInfo.Deleted to true.
 	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
 			info.Deleted = true
 			return nil
 		})
@@ -251,7 +251,7 @@
 	}
 
 	// 4. Delete dbInfo record.
-	if err := a.delDbInfo(ctx, call, a.s.st, dbName); err != nil {
+	if err := a.delDbInfo(ctx, a.s.st, dbName); err != nil {
 		return err
 	}
 
diff --git a/x/ref/services/syncbase/server/db_info.go b/x/ref/services/syncbase/server/db_info.go
index 98216bc..cbfcebd 100644
--- a/x/ref/services/syncbase/server/db_info.go
+++ b/x/ref/services/syncbase/server/db_info.go
@@ -17,7 +17,6 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
-	"v.io/v23/rpc"
 )
 
 type dbInfoLayer struct {
@@ -49,9 +48,9 @@
 
 // getDbInfo reads data from the storage engine.
 // Returns a VDL-compatible error.
-func (a *app) getDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
 	info := &dbInfo{}
-	if err := util.GetWithoutAuth(ctx, call, st, &dbInfoLayer{dbName, a}, info); err != nil {
+	if err := util.GetWithoutAuth(ctx, st, &dbInfoLayer{dbName, a}, info); err != nil {
 		return nil, err
 	}
 	return info, nil
@@ -59,27 +58,27 @@
 
 // putDbInfo writes data to the storage engine.
 // Returns a VDL-compatible error.
-func (a *app) putDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string, info *dbInfo) error {
-	return util.Put(ctx, call, st, &dbInfoLayer{dbName, a}, info)
+func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
+	return util.Put(ctx, st, &dbInfoLayer{dbName, a}, info)
 }
 
 // delDbInfo deletes data from the storage engine.
 // Returns a VDL-compatible error.
-func (a *app) delDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string) error {
-	return util.Delete(ctx, call, st, &dbInfoLayer{dbName, a})
+func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
+	return util.Delete(ctx, st, &dbInfoLayer{dbName, a})
 }
 
 // updateDbInfo performs a read-modify-write.
 // fn should "modify" v, and should return a VDL-compatible error.
 // Returns a VDL-compatible error.
-func (a *app) updateDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
+func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
 	_ = st.(store.Transaction) // panics on failure, as desired
-	info, err := a.getDbInfo(ctx, call, st, dbName)
+	info, err := a.getDbInfo(ctx, st, dbName)
 	if err != nil {
 		return err
 	}
 	if err := fn(info); err != nil {
 		return err
 	}
-	return a.putDbInfo(ctx, call, st, dbName, info)
+	return a.putDbInfo(ctx, st, dbName, info)
 }
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 0971923..d18ab04 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -15,7 +15,6 @@
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/nosql/query_db"
 	"v.io/syncbase/v23/syncbase/nosql/query_exec"
-	prefixutil "v.io/syncbase/v23/syncbase/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -103,7 +102,7 @@
 		Name:  d.name,
 		Perms: opts.Perms,
 	}
-	if err := util.Put(ctx, call, d.st, d, data); err != nil {
+	if err := util.Put(ctx, d.st, d, data); err != nil {
 		return nil, err
 	}
 	return d, nil
@@ -404,20 +403,20 @@
 	req *tableReq
 }
 
-func (t *tableDb) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t *tableDb) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
 	return &kvs{
-		t:        t,
-		prefixes: prefixes,
-		curr:     -1,
-		validRow: false,
-		it:       nil,
-		err:      nil,
+		t:         t,
+		keyRanges: keyRanges,
+		curr:      -1,
+		validRow:  false,
+		it:        nil,
+		err:       nil,
 	}, nil
 }
 
 type kvs struct {
 	t         *tableDb
-	prefixes  []string
+	keyRanges query_db.KeyRanges
 	curr      int // current index into prefixes, -1 at start
 	validRow  bool
 	currKey   string
@@ -433,16 +432,22 @@
 	if s.curr == -1 {
 		s.curr++
 	}
-	for s.curr < len(s.prefixes) {
+	for s.curr < len(s.keyRanges) {
 		if s.it == nil {
-			start := prefixutil.PrefixRangeStart(s.prefixes[s.curr])
-			limit := prefixutil.PrefixRangeLimit(s.prefixes[s.curr])
-			s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), string(start), string(limit)))
+			start := s.keyRanges[s.curr].Start
+			limit := s.keyRanges[s.curr].Limit
+			// 0-255 means examine all rows
+			if start == string([]byte{0}) && limit == string([]byte{255}) {
+				start = ""
+				limit = ""
+			}
+			s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), start, limit))
 		}
 		if s.it.Advance() {
 			// key
 			keyBytes := s.it.Key(nil)
 			parts := util.SplitKeyParts(string(keyBytes))
+			// TODO(rogulenko): Check access for the key.
 			s.currKey = parts[len(parts)-1]
 			// value
 			valueBytes := s.it.Value(nil)
@@ -489,8 +494,8 @@
 		s.it.Cancel()
 		s.it = nil
 	}
-	// set curr to end of prefixes so Advance will return false
-	s.curr = len(s.prefixes)
+	// set curr to end of keyRanges so Advance will return false
+	s.curr = len(s.keyRanges)
 }
 
 ////////////////////////////////////////
diff --git a/x/ref/services/syncbase/server/nosql/row.go b/x/ref/services/syncbase/server/nosql/row.go
index 3397fd1..542224a 100644
--- a/x/ref/services/syncbase/server/nosql/row.go
+++ b/x/ref/services/syncbase/server/nosql/row.go
@@ -91,11 +91,10 @@
 }
 
 // checkAccess checks that this row's table exists in the database, and performs
-// an authorization check (currently against the table perms).
+// an authorization check.
 // Returns a VDL-compatible error.
-// TODO(sadovsky): Use prefix permissions.
 func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
-	return util.Get(ctx, call, st, r.t, &tableData{})
+	return r.t.checkAccess(ctx, call, st, r.key)
 }
 
 // get reads data from the storage engine.
diff --git a/x/ref/services/syncbase/server/nosql/table.go b/x/ref/services/syncbase/server/nosql/table.go
index 7619fc4..82a0cab 100644
--- a/x/ref/services/syncbase/server/nosql/table.go
+++ b/x/ref/services/syncbase/server/nosql/table.go
@@ -5,6 +5,8 @@
 package nosql
 
 import (
+	"strings"
+
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
@@ -12,6 +14,7 @@
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 )
 
 // tableReq is a per-request object that handles Table RPCs.
@@ -39,7 +42,7 @@
 			return err
 		}
 		// Check for "table already exists".
-		if err := util.GetWithoutAuth(ctx, call, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.GetWithoutAuth(ctx, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -54,7 +57,7 @@
 			Name:  t.name,
 			Perms: perms,
 		}
-		return util.Put(ctx, call, st, t, data)
+		return util.Put(ctx, st, t, data)
 	})
 }
 
@@ -71,20 +74,30 @@
 			return err
 		}
 		// TODO(sadovsky): Delete all rows in this table.
-		return util.Delete(ctx, call, st, t)
+		return util.Delete(ctx, st, t)
 	})
 }
 
 func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
 	impl := func(st store.StoreReadWriter) error {
-		// Check perms.
-		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+		// Check for table-level access before doing a scan.
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			return err
 		}
 		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
 		key := []byte{}
 		for it.Advance() {
 			key = it.Key(key)
+			// Check perms.
+			parts := util.SplitKeyParts(string(key))
+			externalKey := parts[len(parts)-1]
+			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+				// TODO(rogulenko): Revisit this behavior. Probably we should
+				// delete all rows that we have access to.
+				it.Cancel()
+				return err
+			}
+			// Delete the key-value pair.
 			if err := st.Delete(key); err != nil {
 				return verror.New(verror.ErrInternal, ctx, err)
 			}
@@ -107,8 +120,8 @@
 
 func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
 	impl := func(st store.StoreReader) error {
-		// Check perms.
-		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+		// Check for table-level access before doing a scan.
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			return err
 		}
 		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
@@ -116,8 +129,14 @@
 		key, value := []byte{}, []byte{}
 		for it.Advance() {
 			key, value = it.Key(key), it.Value(value)
+			// Check perms.
 			parts := util.SplitKeyParts(string(key))
-			sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+			externalKey := parts[len(parts)-1]
+			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+				it.Cancel()
+				return err
+			}
+			sender.Send(wire.KeyValue{Key: externalKey, Value: value})
 		}
 		if err := it.Err(); err != nil {
 			return verror.New(verror.ErrInternal, ctx, err)
@@ -135,38 +154,27 @@
 	return impl(st)
 }
 
-func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
-	if prefix != "" {
-		return verror.NewErrNotImplemented(ctx)
-	}
-	impl := func(st store.StoreReadWriter) error {
-		data := &tableData{}
-		return util.Update(ctx, call, st, t, data, func() error {
-			data.Perms = perms
-			return nil
-		})
-	}
-	if t.d.batchId != nil {
-		if st, err := t.d.batchReadWriter(); err != nil {
-			return err
-		} else {
-			return impl(st)
-		}
-	} else {
-		return store.RunInTransaction(t.d.st, impl)
-	}
-}
-
 func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
-	if key != "" {
-		return nil, verror.NewErrNotImplemented(ctx)
-	}
 	impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
-		data := &tableData{}
-		if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+		// Check permissions only at table level.
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			return nil, err
 		}
-		return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+		// Get the most specific permissions object.
+		prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+		if err != nil {
+			return nil, err
+		}
+		result := []wire.PrefixPermissions{{Prefix: prefix, Perms: prefixPerms.Perms}}
+		// Collect all parent permissions objects all the way up to the table level.
+		for prefix != "" {
+			prefix = prefixPerms.Parent
+			if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+				return nil, err
+			}
+			result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
+		}
+		return result, nil
 	}
 	var st store.StoreReader
 	if t.d.batchId != nil {
@@ -179,17 +187,114 @@
 	return impl(st)
 }
 
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+	impl := func(st store.StoreReadWriter) error {
+		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+			return err
+		}
+		// Concurrent transactions that touch this table should fail with
+		// ErrConcurrentTransaction when this transaction commits.
+		if err := t.lock(ctx, st); err != nil {
+			return err
+		}
+		if prefix == "" {
+			data := &tableData{}
+			return util.Update(ctx, call, st, t, data, func() error {
+				data.Perms = perms
+				return nil
+			})
+		}
+		// Get the most specific permissions object.
+		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		if err != nil {
+			return err
+		}
+		// In case there is no permissions object for the given prefix, we need
+		// to add a new node to the prefix permissions tree. We do it by updating
+		// parents for all children of the prefix to the node corresponding to
+		// the prefix.
+		if parent != prefix {
+			if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+				return err
+			}
+		} else {
+			parent = prefixPerms.Parent
+		}
+		stPrefix := t.prefixPermsKey(prefix)
+		stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
+		prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
+		// Put the (prefix, perms) pair to the database.
+		if err := util.PutObject(st, stPrefix, prefixPerms); err != nil {
+			return err
+		}
+		return util.PutObject(st, stPrefixLimit, prefixPerms)
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
+}
+
 func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
-	return verror.NewErrNotImplemented(ctx)
+	if prefix == "" {
+		return verror.New(verror.ErrBadArg, ctx, prefix)
+	}
+	impl := func(st store.StoreReadWriter) error {
+		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+			return err
+		}
+		// Concurrent transactions that touch this table should fail with
+		// ErrConcurrentTransaction when this transaction commits.
+		if err := t.lock(ctx, st); err != nil {
+			return err
+		}
+		// Get the most specific permissions object.
+		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		if err != nil {
+			return err
+		}
+		if parent != prefix {
+			// This can happen only if there is no permissions object for the
+			// given prefix. Since DeletePermissions is idempotent, return nil.
+			return nil
+		}
+		// We need to delete the node corresponding to the prefix from the prefix
+		// permissions tree. We do it by updating parents for all children of the
+		// prefix to the parent of the node corresponding to the prefix.
+		if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+			return err
+		}
+		stPrefix := []byte(t.prefixPermsKey(prefix))
+		stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+		if err := st.Delete(stPrefix); err != nil {
+			return err
+		}
+		return st.Delete(stPrefixLimit)
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
 }
 
 func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
 		// Check perms.
-		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+		if err := t.checkAccess(ctx, call, st, ""); err != nil {
 			closeStoreReader()
 			return nil, err
 		}
+		// TODO(rogulenko): Check prefix permissions for children.
 		return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
 	}
 	var st store.StoreReader
@@ -226,3 +331,145 @@
 func (t *tableReq) stKeyPart() string {
 	return t.name
 }
+
+// updateParentRefs updates the parent for all children of the given
+// prefix to newParent.
+func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+	stPrefix := []byte(t.prefixPermsKey(prefix))
+	stPrefixStart := append(stPrefix, 0)
+	stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+	it := st.Scan(stPrefixStart, stPrefixLimit)
+	var key, value []byte
+	for it.Advance() {
+		key, value = it.Key(key), it.Value(value)
+		var prefixPerms stPrefixPerms
+		if err := vom.Decode(value, &prefixPerms); err != nil {
+			it.Cancel()
+			return verror.New(verror.ErrInternal, ctx, err)
+		}
+		prefixPerms.Parent = newParent
+		if err := util.PutObject(st, string(key), prefixPerms); err != nil {
+			it.Cancel()
+			return err
+		}
+	}
+	if err := it.Err(); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// lock invalidates all concurrent transactions with ErrConcurrentTransaction
+// that have accessed this table.
+// Returns a VDL-compatible error.
+//
+// It is necessary to call lock() every time prefix permissions are updated,
+// so snapshots inside all transactions reflect up-to-date permissions. Since
+// every public function that touches this table has to read the table-level
+// permissions object, it is enough to add the key of table-level permissions
+// to the write set of the current transaction.
+//
+// TODO(rogulenko): Revisit this behavior to provide more granularity.
+// A possible option would be to add prefix and its parent to the write set
+// of the current transaction when permissions object for a prefix is updated.
+func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+	var data tableData
+	if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+		return err
+	}
+	return util.Put(ctx, st, t, data)
+}
+
+// checkAccess checks that this table exists in the database, and performs
+// an authorization check. The access is checked at table level and at the
+// level of the most specific prefix for the given key.
+// Returns a VDL-compatible error.
+// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
+// access check to be a check for "Resolve", i.e. also check access to
+// service, app and database.
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
+	prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+	if err != nil {
+		return err
+	}
+	if prefix != "" {
+		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+			return err
+		}
+	}
+	auth, _ := access.PermissionsAuthorizer(prefixPerms.Perms, access.TypicalTagType())
+	if err := auth.Authorize(ctx, call.Security()); err != nil {
+		return verror.New(verror.ErrNoAccess, ctx, prefix)
+	}
+	return nil
+}
+
+// permsForKey returns the longest prefix of the given key that has
+// associated permissions with its permissions object.
+// permsForKey doesn't perform an authorization check.
+// Returns a VDL-compatible error.
+//
+// Virtually we represent all prefixes as a forest T, where each vertex maps to
+// a prefix. A parent for a string is the maximum proper prefix of it that
+// belongs to T. Each prefix P from T is represented as a pair of entries with
+// keys P and P~ with values of type stPrefixPerms (parent + perms).
+// High level of how this function works:
+// 1	iter = db.Scan(K, "")
+// 		Here last character of iter.Key() is removed automatically if it is '~'
+// 2	if hasPrefix(K, iter.Key()) return iter.Value()
+// 3	return parent(iter.Key())
+// Short proof:
+// iter returned on line 1 points to one of the following:
+// - a string t that is equal to K;
+// - a string t~: if t is not a prefix of K, then K < t < t~ which
+//   contradicts with property of returned iterator on line 1 => t is prefix of
+//   K; also t is the largest prefix of K, as all larger prefixes of K are
+//   less than t~; in this case line 2 returns correct result;
+// - a string t that doesn't end with '~': it can't be a prefix of K, as all
+//   proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
+//   K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
+//   prefix of K; in this case line 3 returns correct result.
+func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
+	it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+	if !it.Advance() {
+		prefixPerms, err := t.permsForPrefix(ctx, st, "")
+		return "", prefixPerms, err
+	}
+	defer it.Cancel()
+	parts := util.SplitKeyParts(string(it.Key(nil)))
+	prefix := strings.TrimSuffix(parts[len(parts)-1], util.PrefixRangeLimitSuffix)
+	value := it.Value(nil)
+	var prefixPerms stPrefixPerms
+	if err := vom.Decode(value, &prefixPerms); err != nil {
+		return "", stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+	}
+	if strings.HasPrefix(key, prefix) {
+		return prefix, prefixPerms, nil
+	}
+	prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+	return prefixPerms.Parent, prefixPerms, err
+}
+
+// permsForPrefix returns the permissions object associated with the
+// provided prefix.
+// Returns a VDL-compatible error.
+func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+	if prefix == "" {
+		var data tableData
+		if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+			return stPrefixPerms{}, err
+		}
+		return stPrefixPerms{Perms: data.Perms}, nil
+	}
+	var prefixPerms stPrefixPerms
+	if err := util.GetObject(st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+		return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return prefixPerms, nil
+}
+
+// prefixPermsKey returns the key used for storing permissions for the given
+// prefix in the table.
+func (t *tableReq) prefixPermsKey(prefix string) string {
+	return util.JoinKeyParts(util.PermsPrefix, t.name, prefix)
+}
diff --git a/x/ref/services/syncbase/server/nosql/types.vdl b/x/ref/services/syncbase/server/nosql/types.vdl
index 4a8a328..33d3883 100644
--- a/x/ref/services/syncbase/server/nosql/types.vdl
+++ b/x/ref/services/syncbase/server/nosql/types.vdl
@@ -21,3 +21,17 @@
 	Name  string
 	Perms access.Permissions
 }
+
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key"  - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+	Parent string
+	Perms  access.Permissions
+}
diff --git a/x/ref/services/syncbase/server/nosql/types.vdl.go b/x/ref/services/syncbase/server/nosql/types.vdl.go
index 313d982..021cef6 100644
--- a/x/ref/services/syncbase/server/nosql/types.vdl.go
+++ b/x/ref/services/syncbase/server/nosql/types.vdl.go
@@ -39,7 +39,27 @@
 }) {
 }
 
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key"  - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+	Parent string
+	Perms  access.Permissions
+}
+
+func (stPrefixPerms) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/nosql.stPrefixPerms"`
+}) {
+}
+
 func init() {
 	vdl.Register((*databaseData)(nil))
 	vdl.Register((*tableData)(nil))
+	vdl.Register((*stPrefixPerms)(nil))
 }
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index e551431..135ee35 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -68,7 +68,7 @@
 	data := &serviceData{
 		Perms: opts.Perms,
 	}
-	if err := util.Put(ctx, call, s.st, s, data); err != nil {
+	if err := util.Put(ctx, s.st, s, data); err != nil {
 		return nil, err
 	}
 	if s.sync, err = vsync.New(ctx, call, s); err != nil {
@@ -178,7 +178,7 @@
 			return err
 		}
 		// Check for "app already exists".
-		if err := util.GetWithoutAuth(ctx, call, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.GetWithoutAuth(ctx, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -192,7 +192,7 @@
 			Name:  appName,
 			Perms: perms,
 		}
-		return util.Put(ctx, call, st, a, data)
+		return util.Put(ctx, st, a, data)
 	}); err != nil {
 		return err
 	}
@@ -218,7 +218,7 @@
 			return err
 		}
 		// TODO(sadovsky): Delete all databases in this app.
-		return util.Delete(ctx, call, st, a)
+		return util.Delete(ctx, st, a)
 	}); err != nil {
 		return err
 	}
diff --git a/x/ref/services/syncbase/server/util/constants.go b/x/ref/services/syncbase/server/util/constants.go
index 44d0ff6..9251375 100644
--- a/x/ref/services/syncbase/server/util/constants.go
+++ b/x/ref/services/syncbase/server/util/constants.go
@@ -12,6 +12,7 @@
 	DatabasePrefix = "$database"
 	DbInfoPrefix   = "$dbInfo"
 	LogPrefix      = "$log"
+	PermsPrefix    = "$perms"
 	RowPrefix      = "$row"
 	ServicePrefix  = "$service"
 	SyncPrefix     = "$sync"
@@ -27,4 +28,8 @@
 	BatchSep = ":"
 	// Separator for parts of storage engine keys.
 	KeyPartSep = ":"
+	// PrefixRangeLimitSuffix is the suffix of a key which indicates the end of
+	// a prefix range. Should be more than any regular key in the store.
+	// TODO(rogulenko): Change this constant to something out of the UTF8 space.
+	PrefixRangeLimitSuffix = "~"
 )
diff --git a/x/ref/services/syncbase/server/util/store_util.go b/x/ref/services/syncbase/server/util/store_util.go
index a495ebd..fb739ca 100644
--- a/x/ref/services/syncbase/server/util/store_util.go
+++ b/x/ref/services/syncbase/server/util/store_util.go
@@ -47,7 +47,7 @@
 
 // GetWithoutAuth does st.Get(l.StKey(), v), populating v.
 // Returns a VDL-compatible error.
-func GetWithoutAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v interface{}) error {
+func GetWithoutAuth(ctx *context.T, st store.StoreReader, l Layer, v interface{}) error {
 	if err := GetObject(st, l.StKey(), v); err != nil {
 		if verror.ErrorID(err) == store.ErrUnknownKey.ID {
 			return verror.New(verror.ErrNoExist, ctx, l.Name())
@@ -60,7 +60,7 @@
 // Get does GetWithoutAuth followed by an auth check.
 // Returns a VDL-compatible error.
 func Get(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v Permser) error {
-	if err := GetWithoutAuth(ctx, call, st, l, v); err != nil {
+	if err := GetWithoutAuth(ctx, st, l, v); err != nil {
 		return err
 	}
 	auth, _ := access.PermissionsAuthorizer(v.GetPerms(), access.TypicalTagType())
@@ -73,7 +73,7 @@
 // Put does st.Put(l.StKey(), v).
 // Returns a VDL-compatible error.
 // If you need to perform an authorization check, use Update().
-func Put(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer, v interface{}) error {
+func Put(ctx *context.T, st store.StoreWriter, l Layer, v interface{}) error {
 	if err := PutObject(st, l.StKey(), v); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
@@ -83,7 +83,7 @@
 // Delete does st.Delete(l.StKey()).
 // Returns a VDL-compatible error.
 // If you need to perform an authorization check, call Get() first.
-func Delete(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer) error {
+func Delete(ctx *context.T, st store.StoreWriter, l Layer) error {
 	if err := st.Delete([]byte(l.StKey())); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
@@ -103,7 +103,7 @@
 	if err := fn(); err != nil {
 		return err
 	}
-	return Put(ctx, call, st, l, v)
+	return Put(ctx, st, l, v)
 }
 
 ////////////////////////////////////////////////////////////
diff --git a/x/ref/services/syncbase/syncbased/main.go b/x/ref/services/syncbase/syncbased/main.go
index df96244..a75b471 100644
--- a/x/ref/services/syncbase/syncbased/main.go
+++ b/x/ref/services/syncbase/syncbased/main.go
@@ -19,6 +19,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/server"
 	"v.io/x/ref/lib/security/securityflag"
 	"v.io/x/ref/lib/signals"
+	"v.io/x/ref/lib/xrpc"
 	_ "v.io/x/ref/runtime/factories/generic"
 )
 
@@ -44,26 +45,16 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	s, err := v23.NewServer(ctx)
-	if err != nil {
-		vlog.Fatal("v23.NewServer() failed: ", err)
-	}
-	if _, err := s.Listen(v23.GetListenSpec(ctx)); err != nil {
-		vlog.Fatal("s.Listen() failed: ", err)
-	}
-
 	perms, err := securityflag.PermissionsFromFlag()
 	if err != nil {
 		vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
 	}
-
 	if perms != nil {
 		vlog.Info("Using permissions from command line flag.")
 	} else {
 		vlog.Info("No permissions flag provided. Giving local principal all permissions.")
 		perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
 	}
-
 	service, err := server.NewService(nil, nil, server.ServiceOptions{
 		Perms:   perms,
 		RootDir: *rootDir,
@@ -74,9 +65,8 @@
 	}
 	d := server.NewDispatcher(service)
 
-	// Publish the service in the mount table.
-	if err := s.ServeDispatcher(*name, d); err != nil {
-		vlog.Fatal("s.ServeDispatcher() failed: ", err)
+	if _, err = xrpc.NewDispatchingServer(ctx, *name, d); err != nil {
+		vlog.Fatal("xrpc.NewDispatchingServer() failed: ", err)
 	}
 	vlog.Info("Mounted at: ", *name)