syncbase: implement client batches

Change-Id: I9556f926c32b6e37772d276975285c6bffb3a9fc
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index a906176..bee1209 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -188,4 +188,5 @@
 error (
 	BoundToBatch() {"en": "bound to batch"}
 	NotBoundToBatch() {"en": "not bound to batch"}
+	ReadOnlyBatch() {"en": "batch is read-only"}
 )
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index ed109d8..15f6af7 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -26,11 +26,13 @@
 var (
 	ErrBoundToBatch    = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.BoundToBatch", verror.NoRetry, "{1:}{2:} bound to batch")
 	ErrNotBoundToBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.NotBoundToBatch", verror.NoRetry, "{1:}{2:} not bound to batch")
+	ErrReadOnlyBatch   = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.ReadOnlyBatch", verror.NoRetry, "{1:}{2:} batch is read-only")
 )
 
 func init() {
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBoundToBatch.ID), "{1:}{2:} bound to batch")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNotBoundToBatch.ID), "{1:}{2:} not bound to batch")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrReadOnlyBatch.ID), "{1:}{2:} batch is read-only")
 }
 
 // NewErrBoundToBatch returns an error with the ErrBoundToBatch ID.
@@ -43,6 +45,11 @@
 	return verror.New(ErrNotBoundToBatch, ctx)
 }
 
+// NewErrReadOnlyBatch returns an error with the ErrReadOnlyBatch ID.
+func NewErrReadOnlyBatch(ctx *context.T) error {
+	return verror.New(ErrReadOnlyBatch, ctx)
+}
+
 // SyncGroupManagerClientMethods is the client interface
 // containing SyncGroupManager methods.
 //
diff --git a/v23/syncbase/client_test.go b/v23/syncbase/client_test.go
index 43dbfc6..eee2a29 100644
--- a/v23/syncbase/client_test.go
+++ b/v23/syncbase/client_test.go
@@ -9,18 +9,24 @@
 
 	"v.io/syncbase/v23/syncbase"
 	tu "v.io/syncbase/v23/syncbase/testutil"
+	"v.io/v23/naming"
 	_ "v.io/x/ref/runtime/factories/generic"
 )
 
-////////////////////////////////////////
-// Test cases
+// Tests various Name and FullName methods.
+func TestName(t *testing.T) {
+	s := syncbase.NewService("s")
+	a := s.App("a")
 
-func TestNameAndKey(t *testing.T) {
-	a := syncbase.NewService("s").App("a")
-
+	if s.FullName() != "s" {
+		t.Errorf("Wrong full name: %q", s.FullName())
+	}
 	if a.Name() != "a" {
 		t.Errorf("Wrong name: %q", a.Name())
 	}
+	if a.FullName() != naming.Join("s", "a") {
+		t.Errorf("Wrong name: %q", a.FullName())
+	}
 }
 
 // Tests that Service.ListApps works as expected.
diff --git a/v23/syncbase/nosql/batch.go b/v23/syncbase/nosql/batch.go
index a49d726..7655a6c 100644
--- a/v23/syncbase/nosql/batch.go
+++ b/v23/syncbase/nosql/batch.go
@@ -5,11 +5,28 @@
 package nosql
 
 import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/v23/context"
 )
 
+type batch struct {
+	database
+}
+
+var _ BatchDatabase = (*batch)(nil)
+
+// Commit implements BatchDatabase.Commit.
+func (b *batch) Commit(ctx *context.T) error {
+	return b.c.Commit(ctx)
+}
+
+// Abort implements BatchDatabase.Abort.
+func (b *batch) Abort(ctx *context.T) {
+	b.c.Abort(ctx)
+}
+
 // TODO(sadovsky): Add retry loop.
-func RunInBatch(ctx *context.T, d Database, opts BatchOptions, fn func(b BatchDatabase) error) error {
+func RunInBatch(ctx *context.T, d Database, opts wire.BatchOptions, fn func(b BatchDatabase) error) error {
 	b, err := d.BeginBatch(ctx, opts)
 	if err != nil {
 		return err
diff --git a/v23/syncbase/nosql/batch_test.go b/v23/syncbase/nosql/batch_test.go
new file mode 100644
index 0000000..291ddd6
--- /dev/null
+++ b/v23/syncbase/nosql/batch_test.go
@@ -0,0 +1,275 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// TODO(sadovsky): Once the storage engine layer is made public, implement a
+// Syncbase-based storage engine so that we can run all the storage engine tests
+// against Syncbase itself.
+
+package nosql_test
+
+import (
+	"testing"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/v23/syncbase"
+	"v.io/syncbase/v23/syncbase/nosql"
+	tu "v.io/syncbase/v23/syncbase/testutil"
+	"v.io/v23/naming"
+	"v.io/v23/verror"
+	_ "v.io/x/ref/runtime/factories/generic"
+)
+
+// Tests various Name and FullName methods.
+func TestName(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+
+	b, err := d.BeginBatch(ctx, wire.BatchOptions{})
+	if err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+
+	if d.Name() != "d" {
+		t.Errorf("Wrong name: %q", d.Name())
+	}
+	if d.FullName() != naming.Join(sName, "a", "d") {
+		t.Errorf("Wrong full name: %q", d.FullName())
+	}
+	if b.Name() == d.Name() {
+		t.Errorf("Names should not match: %q", b.Name())
+	}
+	if b.FullName() == d.FullName() {
+		t.Errorf("Full names should not match: %q", b.FullName())
+	}
+}
+
+// Tests basic functionality of BatchDatabase.
+func TestBatchBasics(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+	tb := tu.CreateTable(t, ctx, d, "tb")
+
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+
+	var b1, b2 nosql.BatchDatabase
+	var b1tb, b2tb nosql.Table
+	var err error
+
+	// Test that the effects of a transaction are not visible until commit.
+	b1, err = d.BeginBatch(ctx, wire.BatchOptions{})
+	if err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+	b1tb = b1.Table("tb")
+
+	if err := b1tb.Put(ctx, "fooKey", "fooValue"); err != nil {
+		t.Fatalf("Put() failed: %v", err)
+	}
+
+	// Check that foo is not yet visible.
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+
+	if err := b1.Commit(ctx); err != nil {
+		t.Fatalf("b1.Commit() failed: %v", err)
+	}
+
+	// Check that foo is now visible.
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"fooKey"}, []interface{}{"fooValue"})
+
+	// Test that concurrent transactions are isolated.
+	if b1, err = d.BeginBatch(ctx, wire.BatchOptions{}); err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+	if b2, err = d.BeginBatch(ctx, wire.BatchOptions{}); err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+	b1tb, b2tb = b1.Table("tb"), b2.Table("tb")
+
+	if err := b1tb.Put(ctx, "barKey", "barValue"); err != nil {
+		t.Fatalf("Put() failed: %v", err)
+	}
+	if err := b1tb.Put(ctx, "bazKey", "bazValue"); err != nil {
+		t.Fatalf("Put() failed: %v", err)
+	}
+
+	var got string
+	if err := b2tb.Get(ctx, "barKey", &got); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		t.Fatalf("Get() should have failed: %v", err)
+	}
+	if err := b2tb.Put(ctx, "rabKey", "rabValue"); err != nil {
+		t.Fatalf("Put() failed: %v", err)
+	}
+
+	if err := b1.Commit(ctx); err != nil {
+		t.Fatalf("b1.Commit() failed: %v", err)
+	}
+	if err := b2.Commit(ctx); err == nil {
+		t.Fatalf("b2.Commit() should have failed: %v", err)
+	}
+
+	// Check that foo, bar, and baz (but not rab) are now visible.
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"barKey", "bazKey", "fooKey"}, []interface{}{"barValue", "bazValue", "fooValue"})
+}
+
+// Tests enforcement of BatchOptions.ReadOnly.
+func TestReadOnlyBatch(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+	tb := tu.CreateTable(t, ctx, d, "tb")
+
+	if err := tb.Put(ctx, "fooKey", "fooValue"); err != nil {
+		t.Fatalf("tb.Put() failed: %v", err)
+	}
+
+	b1, err := d.BeginBatch(ctx, wire.BatchOptions{ReadOnly: true})
+	if err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+	b1tb := b1.Table("tb")
+
+	if err := b1tb.Put(ctx, "barKey", "barValue"); verror.ErrorID(err) != wire.ErrReadOnlyBatch.ID {
+		t.Fatalf("Put() should have failed: %v", err)
+	}
+	if err := b1tb.Delete(ctx, nosql.Prefix("fooKey")); verror.ErrorID(err) != wire.ErrReadOnlyBatch.ID {
+		t.Fatalf("Table.Delete() should have failed: %v", err)
+	}
+	if err := b1tb.Row("fooKey").Delete(ctx); verror.ErrorID(err) != wire.ErrReadOnlyBatch.ID {
+		t.Fatalf("Row.Delete() should have failed: %v", err)
+	}
+}
+
+// Tests that all ops fail after attempted commit or abort.
+func TestOpAfterFinalize(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+	tb := tu.CreateTable(t, ctx, d, "tb")
+
+	// TODO(sadovsky): Add some sort of "op after finalize" error type and check
+	// for it specifically below.
+	checkOpsFail := func(b nosql.BatchDatabase) {
+		btb := b.Table("tb")
+		var got string
+		if err := btb.Get(ctx, "fooKey", &got); err == nil {
+			tu.Fatal(t, "Get() should have failed")
+		}
+		it := btb.Scan(ctx, nosql.Prefix(""))
+		it.Advance()
+		if it.Err() == nil {
+			tu.Fatal(t, "Scan() should have failed")
+		}
+		if err := btb.Put(ctx, "barKey", "barValue"); err == nil {
+			tu.Fatal(t, "Put() should have failed")
+		}
+		if err := btb.Delete(ctx, nosql.Prefix("fooKey")); err == nil {
+			tu.Fatal(t, "Table.Delete() should have failed: %v", err)
+		}
+		if err := btb.Row("fooKey").Delete(ctx); err == nil {
+			tu.Fatal(t, "Row.Delete() should have failed: %v", err)
+		}
+		if err := b.Commit(ctx); err == nil {
+			tu.Fatal(t, "Commit() should have failed")
+		}
+	}
+
+	// Commit a transaction, check that subsequent ops fail.
+	b1, err := d.BeginBatch(ctx, wire.BatchOptions{})
+	if err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+	b1tb := b1.Table("tb")
+
+	if err := b1tb.Put(ctx, "fooKey", "fooValue"); err != nil {
+		t.Fatalf("Put() failed: %v", err)
+	}
+	if err := b1.Commit(ctx); err != nil {
+		t.Fatalf("b1.Commit() failed: %v", err)
+	}
+	checkOpsFail(b1)
+
+	// Create a transaction with a conflict, check that the commit fails, then
+	// check that subsequent ops fail.
+	if b1, err = d.BeginBatch(ctx, wire.BatchOptions{}); err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+	b1tb = b1.Table("tb")
+
+	// Conflicts with future b1tb.Get().
+	if err := tb.Put(ctx, "fooKey", "v2"); err != nil {
+		t.Fatalf("tb.Put() failed: %v", err)
+	}
+
+	var got string
+	if err := b1tb.Get(ctx, "fooKey", &got); err != nil {
+		t.Fatalf("Get() failed: %v", err)
+	}
+	want := "fooValue"
+	if got != want {
+		t.Fatalf("Get() returned unexpected value: got %q, want %q", got, want)
+	}
+	if err := b1.Commit(ctx); err == nil {
+		t.Fatalf("b1.Commit() should have failed: %v", err)
+	}
+	checkOpsFail(b1)
+
+	// Create a transaction and immediately abort it, then check that subsequent
+	// ops fail.
+	if b1, err = d.BeginBatch(ctx, wire.BatchOptions{}); err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+	b1tb = b1.Table("tb")
+	b1.Abort(ctx)
+	checkOpsFail(b1)
+}
+
+// Tests that batch methods called on non-batch return ErrNotBoundToBatch and
+// that non-batch methods called on batch return ErrBoundToBatch.
+func TestDisallowedMethods(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+	b, err := d.BeginBatch(ctx, wire.BatchOptions{})
+	if err != nil {
+		t.Fatalf("d.BeginBatch() failed: %v", err)
+	}
+
+	// Batch methods on non-batch.
+	dc := wire.DatabaseClient(d.FullName())
+	if err := dc.Commit(ctx); verror.ErrorID(err) != wire.ErrNotBoundToBatch.ID {
+		t.Fatalf("dc.Commit() should have failed: %v", err)
+	}
+	if err := dc.Abort(ctx); verror.ErrorID(err) != wire.ErrNotBoundToBatch.ID {
+		t.Fatalf("dc.Abort() should have failed: %v", err)
+	}
+
+	// Non-batch methods on batch.
+	bc := wire.DatabaseClient(b.FullName())
+	if err := bc.Create(ctx, nil); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+		t.Fatalf("bc.Create() should have failed: %v", err)
+	}
+	if err := bc.Delete(ctx); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+		t.Fatalf("bc.Delete() should have failed: %v", err)
+	}
+	if _, err := bc.BeginBatch(ctx, wire.BatchOptions{}); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+		t.Fatalf("bc.BeginBatch() should have failed: %v", err)
+	}
+	if _, _, err := bc.GetPermissions(ctx); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+		t.Fatalf("bc.GetPermissions() should have failed: %v", err)
+	}
+	if err := bc.SetPermissions(ctx, nil, ""); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+		t.Fatalf("bc.SetPermissions() should have failed: %v", err)
+	}
+	// TODO(sadovsky): Test all other SyncGroupManager methods.
+	if _, err := bc.GetSyncGroupNames(ctx); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+		t.Fatalf("bc.GetSyncGroupNames() should have failed: %v", err)
+	}
+}
diff --git a/v23/syncbase/nosql/client_test.go b/v23/syncbase/nosql/client_test.go
index 77d565e..87d0581 100644
--- a/v23/syncbase/nosql/client_test.go
+++ b/v23/syncbase/nosql/client_test.go
@@ -11,31 +11,37 @@
 	"v.io/syncbase/v23/syncbase"
 	"v.io/syncbase/v23/syncbase/nosql"
 	tu "v.io/syncbase/v23/syncbase/testutil"
-	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/v23/verror"
 	_ "v.io/x/ref/runtime/factories/generic"
 )
 
-////////////////////////////////////////
-// Test cases
-
 // TODO(sadovsky): Finish writing tests.
 
+// Tests various Name, FullName, and Key methods.
 func TestNameAndKey(t *testing.T) {
-	a := syncbase.NewService("s").App("a")
-	d := a.NoSQLDatabase("d")
+	d := syncbase.NewService("s").App("a").NoSQLDatabase("d")
 	tb := d.Table("tb")
 	r := tb.Row("r")
 
 	if d.Name() != "d" {
 		t.Errorf("Wrong name: %q", d.Name())
 	}
+	if d.FullName() != naming.Join("s", "a", "d") {
+		t.Errorf("Wrong full name: %q", d.FullName())
+	}
 	if tb.Name() != "tb" {
 		t.Errorf("Wrong name: %q", tb.Name())
 	}
+	if tb.FullName() != naming.Join("s", "a", "d", "tb") {
+		t.Errorf("Wrong full name: %q", tb.FullName())
+	}
 	if r.Key() != "r" {
 		t.Errorf("Wrong key: %q", r.Key())
 	}
+	if r.FullName() != naming.Join("s", "a", "d", "tb", "r") {
+		t.Errorf("Wrong full name: %q", r.FullName())
+	}
 }
 
 // Tests that Database.Create works as expected.
@@ -110,42 +116,6 @@
 	S string
 }
 
-func checkScan(t *testing.T, ctx *context.T, tb nosql.Table, r nosql.RowRange, wantKeys []string, wantValues []interface{}) {
-	if len(wantKeys) != len(wantValues) {
-		panic("bad input args")
-	}
-	it := tb.Scan(ctx, r)
-	gotKeys := []string{}
-	for it.Advance() {
-		gotKey := it.Key()
-		gotKeys = append(gotKeys, gotKey)
-		i := len(gotKeys) - 1
-		if i >= len(wantKeys) {
-			continue
-		}
-		// Check key.
-		wantKey := wantKeys[i]
-		if gotKey != wantKey {
-			tu.Fatalf(t, "Keys do not match: got %q, want %q", gotKey, wantKey)
-		}
-		// Check value.
-		wantValue := wantValues[i]
-		gotValue := reflect.Zero(reflect.TypeOf(wantValue)).Interface()
-		if err := it.Value(&gotValue); err != nil {
-			tu.Fatalf(t, "it.Value() failed: %v", err)
-		}
-		if !reflect.DeepEqual(gotValue, wantValue) {
-			tu.Fatalf(t, "Values do not match: got %v, want %v", gotValue, wantValue)
-		}
-	}
-	if err := it.Err(); err != nil {
-		tu.Fatalf(t, "tb.Scan() failed: %v", err)
-	}
-	if len(gotKeys) != len(wantKeys) {
-		tu.Fatalf(t, "Unmatched keys: got %v, want %v", gotKeys, wantKeys)
-	}
-}
-
 // Tests that Table.Scan works as expected.
 func TestTableScan(t *testing.T) {
 	ctx, sName, cleanup := tu.SetupOrDie(nil)
@@ -154,7 +124,7 @@
 	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
 	tb := tu.CreateTable(t, ctx, d, "tb")
 
-	checkScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
 
 	fooWant := Foo{I: 4, S: "f"}
 	if err := tb.Put(ctx, "foo", &fooWant); err != nil {
@@ -166,29 +136,29 @@
 	}
 
 	// Match all keys.
-	checkScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
-	checkScan(t, ctx, tb, nosql.Range("", ""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
-	checkScan(t, ctx, tb, nosql.Range("", "z"), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
-	checkScan(t, ctx, tb, nosql.Range("a", ""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
-	checkScan(t, ctx, tb, nosql.Range("a", "z"), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Range("", ""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Range("", "z"), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Range("a", ""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Range("a", "z"), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
 
 	// Match "bar" only.
-	checkScan(t, ctx, tb, nosql.Prefix("b"), []string{"bar"}, []interface{}{&barWant})
-	checkScan(t, ctx, tb, nosql.Prefix("bar"), []string{"bar"}, []interface{}{&barWant})
-	checkScan(t, ctx, tb, nosql.Range("bar", "baz"), []string{"bar"}, []interface{}{&barWant})
-	checkScan(t, ctx, tb, nosql.Range("bar", "foo"), []string{"bar"}, []interface{}{&barWant})
-	checkScan(t, ctx, tb, nosql.Range("", "foo"), []string{"bar"}, []interface{}{&barWant})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix("b"), []string{"bar"}, []interface{}{&barWant})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix("bar"), []string{"bar"}, []interface{}{&barWant})
+	tu.CheckScan(t, ctx, tb, nosql.Range("bar", "baz"), []string{"bar"}, []interface{}{&barWant})
+	tu.CheckScan(t, ctx, tb, nosql.Range("bar", "foo"), []string{"bar"}, []interface{}{&barWant})
+	tu.CheckScan(t, ctx, tb, nosql.Range("", "foo"), []string{"bar"}, []interface{}{&barWant})
 
 	// Match "foo" only.
-	checkScan(t, ctx, tb, nosql.Prefix("f"), []string{"foo"}, []interface{}{&fooWant})
-	checkScan(t, ctx, tb, nosql.Prefix("foo"), []string{"foo"}, []interface{}{&fooWant})
-	checkScan(t, ctx, tb, nosql.Range("foo", "fox"), []string{"foo"}, []interface{}{&fooWant})
-	checkScan(t, ctx, tb, nosql.Range("foo", ""), []string{"foo"}, []interface{}{&fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix("f"), []string{"foo"}, []interface{}{&fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix("foo"), []string{"foo"}, []interface{}{&fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Range("foo", "fox"), []string{"foo"}, []interface{}{&fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Range("foo", ""), []string{"foo"}, []interface{}{&fooWant})
 
 	// Match nothing.
-	checkScan(t, ctx, tb, nosql.Range("a", "bar"), []string{}, []interface{}{})
-	checkScan(t, ctx, tb, nosql.Range("bar", "bar"), []string{}, []interface{}{})
-	checkScan(t, ctx, tb, nosql.Prefix("z"), []string{}, []interface{}{})
+	tu.CheckScan(t, ctx, tb, nosql.Range("a", "bar"), []string{}, []interface{}{})
+	tu.CheckScan(t, ctx, tb, nosql.Range("bar", "bar"), []string{}, []interface{}{})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix("z"), []string{}, []interface{}{})
 }
 
 // Tests that Table.Delete works as expected.
@@ -199,7 +169,7 @@
 	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
 	tb := tu.CreateTable(t, ctx, d, "tb")
 
-	checkScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
 
 	// Put foo and bar.
 	fooWant := Foo{I: 4, S: "f"}
@@ -210,25 +180,25 @@
 	if err := tb.Put(ctx, "bar", &barWant); err != nil {
 		t.Fatalf("tb.Put() failed: %v", err)
 	}
-	checkScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
 
 	// Delete foo.
 	if err := tb.Delete(ctx, nosql.Prefix("f")); err != nil {
 		t.Fatalf("tb.Delete() failed: %v", err)
 	}
-	checkScan(t, ctx, tb, nosql.Prefix(""), []string{"bar"}, []interface{}{&barWant})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"bar"}, []interface{}{&barWant})
 
 	// Restore foo.
 	if err := tb.Put(ctx, "foo", &fooWant); err != nil {
 		t.Fatalf("tb.Put() failed: %v", err)
 	}
-	checkScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
 
 	// Delete everything.
 	if err := tb.Delete(ctx, nosql.Prefix("")); err != nil {
 		t.Fatalf("tb.Delete() failed: %v", err)
 	}
-	checkScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+	tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
 }
 
 // Tests that Table.{Get,Put,Delete} work as expected.
diff --git a/v23/syncbase/nosql/database.go b/v23/syncbase/nosql/database.go
index 1a95499..d778702 100644
--- a/v23/syncbase/nosql/database.go
+++ b/v23/syncbase/nosql/database.go
@@ -10,22 +10,23 @@
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/security/access"
-	"v.io/v23/verror"
 )
 
-func NewDatabase(parentFullName, relativeName string) Database {
+func NewDatabase(parentFullName, relativeName string) *database {
 	fullName := naming.Join(parentFullName, relativeName)
 	return &database{
-		c:        wire.DatabaseClient(fullName),
-		fullName: fullName,
-		name:     relativeName,
+		c:              wire.DatabaseClient(fullName),
+		parentFullName: parentFullName,
+		fullName:       fullName,
+		name:           relativeName,
 	}
 }
 
 type database struct {
-	c        wire.DatabaseClientMethods
-	fullName string
-	name     string
+	c              wire.DatabaseClientMethods
+	parentFullName string
+	fullName       string
+	name           string
 }
 
 var _ Database = (*database)(nil)
@@ -73,8 +74,12 @@
 }
 
 // BeginBatch implements Database.BeginBatch.
-func (d *database) BeginBatch(ctx *context.T, opts BatchOptions) (BatchDatabase, error) {
-	return nil, verror.NewErrNotImplemented(ctx)
+func (d *database) BeginBatch(ctx *context.T, opts wire.BatchOptions) (BatchDatabase, error) {
+	relativeName, err := d.c.BeginBatch(ctx, opts)
+	if err != nil {
+		return nil, err
+	}
+	return &batch{database: *NewDatabase(d.parentFullName, relativeName)}, nil
 }
 
 // SetPermissions implements Database.SetPermissions.
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index 8b473d8..3faa52e 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -19,6 +19,12 @@
 // It allows clients to pass the handle to helper methods that are
 // batch-agnostic.
 type DatabaseHandle interface {
+	// Name returns the relative name of this DatabaseHandle.
+	Name() string
+
+	// FullName returns the full name (object name) of this DatabaseHandle.
+	FullName() string
+
 	// Table returns the Table with the given name.
 	// relativeName must not contain slashes.
 	Table(relativeName string) Table
@@ -27,8 +33,6 @@
 	ListTables(ctx *context.T) ([]string, error)
 }
 
-type BatchOptions wire.BatchOptions
-
 // Database represents a collection of Tables. Batches, queries, sync, watch,
 // etc. all operate at the Database level.
 //
@@ -36,12 +40,6 @@
 type Database interface {
 	DatabaseHandle
 
-	// Name returns the relative name of this Database.
-	Name() string
-
-	// FullName returns the full name (object name) of this Database.
-	FullName() string
-
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	Create(ctx *context.T, perms access.Permissions) error
@@ -73,7 +71,7 @@
 	//
 	// Concurrency semantics can be configured using BatchOptions.
 	// TODO(sadovsky): Maybe use varargs for options.
-	BeginBatch(ctx *context.T, opts BatchOptions) (BatchDatabase, error)
+	BeginBatch(ctx *context.T, opts wire.BatchOptions) (BatchDatabase, error)
 
 	// SetPermissions and GetPermissions are included from the AccessController
 	// interface.
diff --git a/v23/syncbase/nosql/stream.go b/v23/syncbase/nosql/stream.go
index fa75e60..9cd2870 100644
--- a/v23/syncbase/nosql/stream.go
+++ b/v23/syncbase/nosql/stream.go
@@ -86,11 +86,7 @@
 	if s.err == nil {
 		return nil
 	}
-	idAction := verror.ErrInternal
-	if verror.ErrorID(s.err) == verror.ErrCanceled.ID {
-		idAction = verror.ErrCanceled
-	}
-	return verror.New(idAction, nil, s.err)
+	return verror.Convert(verror.IDAction{}, nil, s.err)
 }
 
 // Cancel implements Stream.Cancel.
diff --git a/v23/syncbase/testutil/util.go b/v23/syncbase/testutil/util.go
index 7445f6b..fbcd391 100644
--- a/v23/syncbase/testutil/util.go
+++ b/v23/syncbase/testutil/util.go
@@ -8,6 +8,7 @@
 import (
 	"io/ioutil"
 	"os"
+	"reflect"
 	"runtime/debug"
 	"testing"
 
@@ -102,6 +103,42 @@
 	return perms
 }
 
+func CheckScan(t *testing.T, ctx *context.T, tb nosql.Table, r nosql.RowRange, wantKeys []string, wantValues []interface{}) {
+	if len(wantKeys) != len(wantValues) {
+		panic("bad input args")
+	}
+	it := tb.Scan(ctx, r)
+	gotKeys := []string{}
+	for it.Advance() {
+		gotKey := it.Key()
+		gotKeys = append(gotKeys, gotKey)
+		i := len(gotKeys) - 1
+		if i >= len(wantKeys) {
+			continue
+		}
+		// Check key.
+		wantKey := wantKeys[i]
+		if gotKey != wantKey {
+			Fatalf(t, "Keys do not match: got %q, want %q", gotKey, wantKey)
+		}
+		// Check value.
+		wantValue := wantValues[i]
+		gotValue := reflect.Zero(reflect.TypeOf(wantValue)).Interface()
+		if err := it.Value(&gotValue); err != nil {
+			Fatalf(t, "it.Value() failed: %v", err)
+		}
+		if !reflect.DeepEqual(gotValue, wantValue) {
+			Fatalf(t, "Values do not match: got %v, want %v", gotValue, wantValue)
+		}
+	}
+	if err := it.Err(); err != nil {
+		Fatalf(t, "tb.Scan() failed: %v", err)
+	}
+	if len(gotKeys) != len(wantKeys) {
+		Fatalf(t, "Unmatched keys: got %v, want %v", gotKeys, wantKeys)
+	}
+}
+
 ////////////////////////////////////////
 // Internal helpers
 
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index aa4bfb3..1e1fe1e 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -19,6 +19,7 @@
 	"v.io/v23/verror"
 )
 
+// app is a per-app singleton (i.e. not per-request) that handles App RPCs.
 type app struct {
 	name string
 	s    *service
@@ -66,12 +67,14 @@
 func (a *app) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	// Check perms.
 	sn := a.s.st.NewSnapshot()
+	closeSnapshot := func() error {
+		return sn.Close()
+	}
 	if err := util.Get(ctx, call, sn, a, &appData{}); err != nil {
-		sn.Close()
+		closeSnapshot()
 		return nil, err
 	}
-	pattern := "*"
-	return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.JoinKeyParts(util.DbInfoPrefix, a.name))
 }
 
 ////////////////////////////////////////
diff --git a/x/ref/services/syncbase/server/dispatcher.go b/x/ref/services/syncbase/server/dispatcher.go
index f6a4dde..17fb042 100644
--- a/x/ref/services/syncbase/server/dispatcher.go
+++ b/x/ref/services/syncbase/server/dispatcher.go
@@ -48,8 +48,8 @@
 
 	aExists := false
 	var a *app
-	if aint, err := disp.s.App(nil, nil, appName); err == nil {
-		a = aint.(*app) // panics on failure, as desired
+	if aInt, err := disp.s.App(nil, nil, appName); err == nil {
+		a = aInt.(*app) // panics on failure, as desired
 		aExists = true
 	} else {
 		if verror.ErrorID(err) != verror.ErrNoExist.ID {
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 1f3cbd3..82a366d 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -5,7 +5,12 @@
 package nosql
 
 import (
+	"math/rand"
 	"path"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
@@ -19,19 +24,41 @@
 	"v.io/x/lib/vlog"
 )
 
+// database is a per-database singleton (i.e. not per-request). It does not
+// directly handle RPCs.
+// Note: If a database does not exist at the time of a database RPC, the
+// dispatcher creates a short-lived database object to service that particular
+// request.
 type database struct {
 	name string
 	a    interfaces.App
 	// The fields below are initialized iff this database exists.
 	st store.Store // stores all data for a single database
+
+	// Active snapshots and transactions corresponding to client batches.
+	// TODO(sadovsky): Add timeouts and GC.
+	mu  sync.Mutex // protects the fields below
+	sns map[uint64]store.Snapshot
+	txs map[uint64]store.Transaction
+}
+
+// databaseReq is a per-request object that handles Database RPCs.
+// It embeds database and tracks request-specific batch state.
+type databaseReq struct {
+	*database
+	// If non-nil, sn or tx will be non-nil.
+	batchId *uint64
+	sn      store.Snapshot
+	tx      store.Transaction
 }
 
 var (
-	_ wire.DatabaseServerMethods = (*database)(nil)
+	_ wire.DatabaseServerMethods = (*databaseReq)(nil)
 	_ interfaces.Database        = (*database)(nil)
 	_ util.Layer                 = (*database)(nil)
 )
 
+// DatabaseOptions configures a database.
 type DatabaseOptions struct {
 	// Database-level permissions.
 	Perms access.Permissions
@@ -62,6 +89,8 @@
 		name: name,
 		a:    a,
 		st:   st,
+		sns:  make(map[uint64]store.Snapshot),
+		txs:  make(map[uint64]store.Transaction),
 	}
 	data := &databaseData{
 		Name:  d.name,
@@ -76,34 +105,100 @@
 ////////////////////////////////////////
 // RPC methods
 
-func (d *database) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	// This database does not yet exist; d is just an ephemeral handle that holds
 	// {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
 	// handle and store it in d.a.dbs[d.name].
 	return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms)
 }
 
-func (d *database) Delete(ctx *context.T, call rpc.ServerCall) error {
+func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return d.a.DeleteNoSQLDatabase(ctx, call, d.name)
 }
 
-func (d *database) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
-	return "", verror.NewErrNotImplemented(ctx)
+var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
+	if d.batchId != nil {
+		return "", wire.NewErrBoundToBatch(ctx)
+	}
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	var id uint64
+	var batchType string
+	for {
+		id = uint64(rng.Int63())
+		if bo.ReadOnly {
+			if _, ok := d.sns[id]; !ok {
+				d.sns[id] = d.st.NewSnapshot()
+				batchType = "sn"
+				break
+			}
+		} else {
+			if _, ok := d.txs[id]; !ok {
+				d.txs[id] = d.st.NewTransaction()
+				batchType = "tx"
+				break
+			}
+		}
+	}
+	return strings.Join([]string{d.name, batchType, strconv.FormatUint(id, 10)}, util.BatchSep), nil
 }
 
-func (d *database) Commit(ctx *context.T, call rpc.ServerCall) error {
-	return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall) error {
+	if d.batchId == nil {
+		return wire.NewErrNotBoundToBatch(ctx)
+	}
+	if d.tx == nil {
+		return wire.NewErrReadOnlyBatch(ctx)
+	}
+	var err error
+	if err = d.tx.Commit(); err == nil {
+		d.mu.Lock()
+		delete(d.txs, *d.batchId)
+		d.mu.Unlock()
+	}
+	return err
 }
 
-func (d *database) Abort(ctx *context.T, call rpc.ServerCall) error {
-	return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall) error {
+	if d.batchId == nil {
+		return wire.NewErrNotBoundToBatch(ctx)
+	}
+	var err error
+	if d.tx != nil {
+		if err = d.tx.Abort(); err == nil {
+			d.mu.Lock()
+			delete(d.txs, *d.batchId)
+			d.mu.Unlock()
+		}
+	} else {
+		if err = d.sn.Close(); err == nil {
+			d.mu.Lock()
+			delete(d.sns, *d.batchId)
+			d.mu.Unlock()
+		}
+	}
+	return err
 }
 
-func (d *database) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return d.a.SetDatabasePerms(ctx, call, d.name, perms, version)
 }
 
-func (d *database) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+	if d.batchId != nil {
+		return nil, "", wire.NewErrBoundToBatch(ctx)
+	}
 	data := &databaseData{}
 	if err := util.Get(ctx, call, d.st, d, data); err != nil {
 		return nil, "", err
@@ -111,15 +206,20 @@
 	return data.Perms, util.FormatVersion(data.Version), nil
 }
 
-func (d *database) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+	if d.batchId != nil {
+		return nil, wire.NewErrBoundToBatch(ctx)
+	}
 	// Check perms.
 	sn := d.st.NewSnapshot()
+	closeSnapshot := func() error {
+		return sn.Close()
+	}
 	if err := util.Get(ctx, call, sn, d, &databaseData{}); err != nil {
-		sn.Close()
+		closeSnapshot()
 		return nil, err
 	}
-	pattern := "*"
-	return util.Glob(ctx, call, pattern, sn, util.TablePrefix)
+	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
 }
 
 ////////////////////////////////////////
@@ -167,3 +267,26 @@
 func (d *database) StKey() string {
 	return util.DatabasePrefix
 }
+
+////////////////////////////////////////
+// Internal helpers
+
+func (d *databaseReq) batchReader() store.StoreReader {
+	if d.batchId == nil {
+		return nil
+	} else if d.sn != nil {
+		return d.sn
+	} else {
+		return d.tx
+	}
+}
+
+func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
+	if d.batchId == nil {
+		return nil, nil
+	} else if d.tx != nil {
+		return d.tx, nil
+	} else {
+		return nil, wire.NewErrReadOnlyBatch(nil)
+	}
+}
diff --git a/x/ref/services/syncbase/server/nosql/database_sgm.go b/x/ref/services/syncbase/server/nosql/database_sgm.go
index 768b478..b5797b0 100644
--- a/x/ref/services/syncbase/server/nosql/database_sgm.go
+++ b/x/ref/services/syncbase/server/nosql/database_sgm.go
@@ -16,39 +16,66 @@
 ////////////////////////////////////////
 // SyncGroup RPC methods
 
-func (d *database) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+func (d *databaseReq) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	if d.batchId != nil {
+		return nil, wire.NewErrBoundToBatch(ctx)
+	}
 	return nil, verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+func (d *databaseReq) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	sd := vsync.NewSyncDatabase(d)
 	return sd.CreateSyncGroup(ctx, call, sgName, spec, myInfo)
 }
 
-func (d *database) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+func (d *databaseReq) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+	if d.batchId != nil {
+		return wire.SyncGroupSpec{}, wire.NewErrBoundToBatch(ctx)
+	}
 	return wire.SyncGroupSpec{}, verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+func (d *databaseReq) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+func (d *databaseReq) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+func (d *databaseReq) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+func (d *databaseReq) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+	if d.batchId != nil {
+		return wire.SyncGroupSpec{}, "", wire.NewErrBoundToBatch(ctx)
+	}
 	return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+func (d *databaseReq) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+func (d *databaseReq) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+	if d.batchId != nil {
+		return nil, wire.NewErrBoundToBatch(ctx)
+	}
 	return nil, verror.NewErrNotImplemented(ctx)
 }
diff --git a/x/ref/services/syncbase/server/nosql/dispatcher.go b/x/ref/services/syncbase/server/nosql/dispatcher.go
index d8fe2f0..3bbea26 100644
--- a/x/ref/services/syncbase/server/nosql/dispatcher.go
+++ b/x/ref/services/syncbase/server/nosql/dispatcher.go
@@ -5,12 +5,14 @@
 package nosql
 
 import (
+	"strconv"
 	"strings"
 
 	wire "v.io/syncbase/v23/services/syncbase"
 	nosqlWire "v.io/syncbase/v23/services/syncbase/nosql"
 	pubutil "v.io/syncbase/v23/syncbase/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/verror"
@@ -35,9 +37,15 @@
 		vlog.Fatal("invalid nosql.dispatcher Lookup")
 	}
 
+	dParts := strings.Split(parts[0], util.BatchSep)
+	dName := dParts[0]
+
 	// Validate all key atoms up front, so that we can avoid doing so in all our
 	// method implementations.
-	for _, s := range parts {
+	if !pubutil.ValidName(dName) {
+		return nil, nil, wire.NewErrInvalidName(nil, suffix)
+	}
+	for _, s := range parts[1:] {
 		if !pubutil.ValidName(s) {
 			return nil, nil, wire.NewErrInvalidName(nil, suffix)
 		}
@@ -45,22 +53,28 @@
 
 	dExists := false
 	var d *database
-	if dint, err := disp.a.NoSQLDatabase(nil, nil, parts[0]); err == nil {
-		d = dint.(*database) // panics on failure, as desired
+	if dInt, err := disp.a.NoSQLDatabase(nil, nil, dName); err == nil {
+		d = dInt.(*database) // panics on failure, as desired
 		dExists = true
 	} else {
 		if verror.ErrorID(err) != verror.ErrNoExist.ID {
 			return nil, nil, err
 		} else {
+			// Database does not exist. Create a short-lived database object to
+			// service this request.
 			d = &database{
-				name: parts[0],
+				name: dName,
 				a:    disp.a,
 			}
 		}
 	}
 
+	dReq := &databaseReq{database: d}
+	if !setBatchFields(dReq, dParts) {
+		return nil, nil, wire.NewErrInvalidName(nil, suffix)
+	}
 	if len(parts) == 1 {
-		return nosqlWire.DatabaseServer(d), nil, nil
+		return nosqlWire.DatabaseServer(dReq), nil, nil
 	}
 
 	// All table and row methods require the database to exist. If it doesn't,
@@ -73,21 +87,50 @@
 	// downstream handling of this request. Depending on the order in which things
 	// execute, the client may not get an error, but in any case ultimately the
 	// store will end up in a consistent state.
-	t := &table{
+	tReq := &tableReq{
 		name: parts[1],
-		d:    d,
+		d:    dReq,
 	}
 	if len(parts) == 2 {
-		return nosqlWire.TableServer(t), nil, nil
+		return nosqlWire.TableServer(tReq), nil, nil
 	}
 
-	r := &row{
+	rReq := &rowReq{
 		key: parts[2],
-		t:   t,
+		t:   tReq,
 	}
 	if len(parts) == 3 {
-		return nosqlWire.RowServer(r), nil, nil
+		return nosqlWire.RowServer(rReq), nil, nil
 	}
 
 	return nil, nil, verror.NewErrNoExist(nil)
 }
+
+// setBatchFields sets the batch-related fields in databaseReq based on the
+// value of dParts, the parts of the database name component. It returns false
+// if dParts is malformed.
+func setBatchFields(d *databaseReq, dParts []string) bool {
+	if len(dParts) == 1 {
+		return true
+	}
+	if len(dParts) != 3 {
+		return false
+	}
+	batchId, err := strconv.ParseUint(dParts[2], 0, 64)
+	if err != nil {
+		return false
+	}
+	d.batchId = &batchId
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	var ok bool
+	switch dParts[1] {
+	case "sn":
+		d.sn, ok = d.sns[batchId]
+	case "tx":
+		d.tx, ok = d.txs[batchId]
+	default:
+		return false
+	}
+	return ok
+}
diff --git a/x/ref/services/syncbase/server/nosql/row.go b/x/ref/services/syncbase/server/nosql/row.go
index 057dc10..3397fd1 100644
--- a/x/ref/services/syncbase/server/nosql/row.go
+++ b/x/ref/services/syncbase/server/nosql/row.go
@@ -13,67 +13,95 @@
 	"v.io/v23/verror"
 )
 
-// TODO(sadovsky): Extend data layout to support version tracking for sync.
-// See go/vanadium-local-structured-store.
-
-// TODO(sadovsky): Handle the case where we're in a batch.
-
-type row struct {
+// rowReq is a per-request object that handles Row RPCs.
+type rowReq struct {
 	key string
-	t   *table
+	t   *tableReq
 }
 
 var (
-	_ wire.RowServerMethods = (*row)(nil)
-	_ util.Layer            = (*row)(nil)
+	_ wire.RowServerMethods = (*rowReq)(nil)
+	_ util.Layer            = (*rowReq)(nil)
 )
 
 ////////////////////////////////////////
 // RPC methods
 
-func (r *row) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
-	return r.get(ctx, call, r.t.d.st)
+func (r *rowReq) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
+	impl := func(st store.StoreReader) ([]byte, error) {
+		return r.get(ctx, call, st)
+	}
+	var st store.StoreReader
+	if r.t.d.batchId != nil {
+		st = r.t.d.batchReader()
+	} else {
+		sn := r.t.d.st.NewSnapshot()
+		st = sn
+		defer sn.Close()
+	}
+	return impl(st)
 }
 
-func (r *row) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
-	return r.put(ctx, call, r.t.d.st, value)
+func (r *rowReq) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
+	impl := func(st store.StoreReadWriter) error {
+		return r.put(ctx, call, st, value)
+	}
+	if r.t.d.batchId != nil {
+		if st, err := r.t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(r.t.d.st, impl)
+	}
 }
 
-func (r *row) Delete(ctx *context.T, call rpc.ServerCall) error {
-	return r.del(ctx, call, r.t.d.st)
+func (r *rowReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+	impl := func(st store.StoreReadWriter) error {
+		return r.delete(ctx, call, st)
+	}
+	if r.t.d.batchId != nil {
+		if st, err := r.t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(r.t.d.st, impl)
+	}
 }
 
 ////////////////////////////////////////
 // util.Layer methods
 
-func (r *row) Name() string {
+func (r *rowReq) Name() string {
 	return r.key
 }
 
-func (r *row) StKey() string {
+func (r *rowReq) StKey() string {
 	return util.JoinKeyParts(util.RowPrefix, r.stKeyPart())
 }
 
 ////////////////////////////////////////
 // Internal helpers
 
-func (r *row) stKeyPart() string {
+func (r *rowReq) stKeyPart() string {
 	return util.JoinKeyParts(r.t.stKeyPart(), r.key)
 }
 
-// TODO(sadovsky): Update access checks to use prefix permissions.
-
-// checkAccess checks that this row's table exists in the database and performs
+// checkAccess checks that this row's table exists in the database, and performs
 // an authorization check (currently against the table perms).
 // Returns a VDL-compatible error.
-func (r *row) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
+// TODO(sadovsky): Use prefix permissions.
+func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
 	return util.Get(ctx, call, st, r.t, &tableData{})
 }
 
 // get reads data from the storage engine.
 // Performs authorization check.
 // Returns a VDL-compatible error.
-func (r *row) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
+func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
 	if err := r.checkAccess(ctx, call, st); err != nil {
 		return nil, err
 	}
@@ -90,7 +118,7 @@
 // put writes data to the storage engine.
 // Performs authorization check.
 // Returns a VDL-compatible error.
-func (r *row) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
+func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
 	if err := r.checkAccess(ctx, call, st); err != nil {
 		return err
 	}
@@ -100,10 +128,10 @@
 	return nil
 }
 
-// del deletes data from the storage engine.
+// delete deletes data from the storage engine.
 // Performs authorization check.
 // Returns a VDL-compatible error.
-func (r *row) del(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
 	if err := r.checkAccess(ctx, call, st); err != nil {
 		return err
 	}
diff --git a/x/ref/services/syncbase/server/nosql/table.go b/x/ref/services/syncbase/server/nosql/table.go
index e73afd2..7619fc4 100644
--- a/x/ref/services/syncbase/server/nosql/table.go
+++ b/x/ref/services/syncbase/server/nosql/table.go
@@ -14,22 +14,24 @@
 	"v.io/v23/verror"
 )
 
-// TODO(sadovsky): Handle the case where we're in a batch.
-
-type table struct {
+// tableReq is a per-request object that handles Table RPCs.
+type tableReq struct {
 	name string
-	d    *database
+	d    *databaseReq
 }
 
 var (
-	_ wire.TableServerMethods = (*table)(nil)
-	_ util.Layer              = (*table)(nil)
+	_ wire.TableServerMethods = (*tableReq)(nil)
+	_ util.Layer              = (*tableReq)(nil)
 )
 
 ////////////////////////////////////////
 // RPC methods
 
-func (t *table) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (t *tableReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+	if t.d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
 		// Check databaseData perms.
 		dData := &databaseData{}
@@ -56,7 +58,10 @@
 	})
 }
 
-func (t *table) Delete(ctx *context.T, call rpc.ServerCall) error {
+func (t *tableReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+	if t.d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
 		// Read-check-delete tableData.
 		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
@@ -70,8 +75,8 @@
 	})
 }
 
-func (t *table) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
-	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
+	impl := func(st store.StoreReadWriter) error {
 		// Check perms.
 		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
 			return err
@@ -88,83 +93,136 @@
 			return verror.New(verror.ErrInternal, ctx, err)
 		}
 		return nil
-	})
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
 }
 
-func (t *table) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
-	sn := t.d.st.NewSnapshot()
-	defer sn.Close()
-	// Check perms.
-	if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
-		return err
+func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
+	impl := func(st store.StoreReader) error {
+		// Check perms.
+		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+			return err
+		}
+		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+		sender := call.SendStream()
+		key, value := []byte{}, []byte{}
+		for it.Advance() {
+			key, value = it.Key(key), it.Value(value)
+			parts := util.SplitKeyParts(string(key))
+			sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+		}
+		if err := it.Err(); err != nil {
+			return verror.New(verror.ErrInternal, ctx, err)
+		}
+		return nil
 	}
-	it := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
-	sender := call.SendStream()
-	key, value := []byte{}, []byte{}
-	for it.Advance() {
-		key, value = it.Key(key), it.Value(value)
-		parts := util.SplitKeyParts(string(key))
-		sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+	var st store.StoreReader
+	if t.d.batchId != nil {
+		st = t.d.batchReader()
+	} else {
+		sn := t.d.st.NewSnapshot()
+		st = sn
+		defer sn.Close()
 	}
-	if err := it.Err(); err != nil {
-		return verror.New(verror.ErrInternal, ctx, err)
-	}
-	return nil
+	return impl(st)
 }
 
-func (t *table) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
 	if prefix != "" {
 		return verror.NewErrNotImplemented(ctx)
 	}
-	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+	impl := func(st store.StoreReadWriter) error {
 		data := &tableData{}
 		return util.Update(ctx, call, st, t, data, func() error {
 			data.Perms = perms
 			return nil
 		})
-	})
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
 }
 
-func (t *table) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
+func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
 	if key != "" {
 		return nil, verror.NewErrNotImplemented(ctx)
 	}
-	data := &tableData{}
-	if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
-		return nil, err
+	impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
+		data := &tableData{}
+		if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+			return nil, err
+		}
+		return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
 	}
-	return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+	var st store.StoreReader
+	if t.d.batchId != nil {
+		st = t.d.batchReader()
+	} else {
+		sn := t.d.st.NewSnapshot()
+		st = sn
+		defer sn.Close()
+	}
+	return impl(st)
 }
 
-func (t *table) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
+func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (t *table) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
-	sn := t.d.st.NewSnapshot()
-	// Check perms.
-	if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
-		sn.Close()
-		return nil, err
+func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+	impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
+		// Check perms.
+		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+			closeStoreReader()
+			return nil, err
+		}
+		return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
 	}
-	pattern := "*"
-	return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.RowPrefix, t.name))
+	var st store.StoreReader
+	var closeStoreReader func() error
+	if t.d.batchId != nil {
+		st = t.d.batchReader()
+		closeStoreReader = func() error {
+			return nil
+		}
+	} else {
+		sn := t.d.st.NewSnapshot()
+		st = sn
+		closeStoreReader = func() error {
+			return sn.Close()
+		}
+	}
+	return impl(st, closeStoreReader)
 }
 
 ////////////////////////////////////////
 // util.Layer methods
 
-func (t *table) Name() string {
+func (t *tableReq) Name() string {
 	return t.name
 }
 
-func (t *table) StKey() string {
+func (t *tableReq) StKey() string {
 	return util.JoinKeyParts(util.TablePrefix, t.stKeyPart())
 }
 
 ////////////////////////////////////////
 // Internal helpers
 
-func (t *table) stKeyPart() string {
+func (t *tableReq) stKeyPart() string {
 	return t.name
 }
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index fd3e3b8..86a2a37 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -23,15 +23,7 @@
 	"v.io/v23/verror"
 )
 
-type ServiceOptions struct {
-	// Service-level permissions.
-	Perms access.Permissions
-	// Root dir for data storage.
-	RootDir string
-	// Storage engine to use (for service and per-database engines).
-	Engine string
-}
-
+// service is a singleton (i.e. not per-request) that handles Service RPCs.
 type service struct {
 	st   store.Store // keeps track of which apps and databases exist, etc.
 	sync interfaces.SyncServerMethods
@@ -48,6 +40,16 @@
 	_ util.Layer                = (*service)(nil)
 )
 
+// ServiceOptions configures a service.
+type ServiceOptions struct {
+	// Service-level permissions.
+	Perms access.Permissions
+	// Root dir for data storage.
+	RootDir string
+	// Storage engine to use (for service and per-database engines).
+	Engine string
+}
+
 // NewService creates a new service instance and returns it.
 // Returns a VDL-compatible error.
 func NewService(ctx *context.T, call rpc.ServerCall, opts ServiceOptions) (*service, error) {
@@ -103,12 +105,14 @@
 func (s *service) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	// Check perms.
 	sn := s.st.NewSnapshot()
+	closeSnapshot := func() error {
+		return sn.Close()
+	}
 	if err := util.Get(ctx, call, sn, s, &serviceData{}); err != nil {
-		sn.Close()
+		closeSnapshot()
 		return nil, err
 	}
-	pattern := "*"
-	return util.Glob(ctx, call, pattern, sn, util.AppPrefix)
+	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.AppPrefix)
 }
 
 ////////////////////////////////////////
diff --git a/x/ref/services/syncbase/server/util/constants.go b/x/ref/services/syncbase/server/util/constants.go
index 88e1115..44d0ff6 100644
--- a/x/ref/services/syncbase/server/util/constants.go
+++ b/x/ref/services/syncbase/server/util/constants.go
@@ -23,4 +23,8 @@
 const (
 	// Service object name suffix for Syncbase-to-Syncbase RPCs.
 	SyncbaseSuffix = "$internal"
+	// Separator for batch info in database names.
+	BatchSep = ":"
+	// Separator for parts of storage engine keys.
+	KeyPartSep = ":"
 )
diff --git a/x/ref/services/syncbase/server/util/glob.go b/x/ref/services/syncbase/server/util/glob.go
index 87a204e..acdd74f 100644
--- a/x/ref/services/syncbase/server/util/glob.go
+++ b/x/ref/services/syncbase/server/util/glob.go
@@ -53,29 +53,29 @@
 	return res, nil
 }
 
-// Takes ownership of sn.
+// Glob performs a glob. It calls closeStoreReader to close st.
 // TODO(sadovsky): Why do we make developers implement Glob differently from
 // other streaming RPCs? It's confusing that Glob must return immediately and
 // write its results to a channel, while other streaming RPC handlers must block
 // and write their results to the output stream. See nlacasse's TODO below, too.
-func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan string, error) {
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, st store.StoreReader, closeStoreReader func() error, stKeyPrefix string) (<-chan string, error) {
 	// TODO(sadovsky): Support glob with non-prefix pattern.
 	if _, err := glob.Parse(pattern); err != nil {
-		sn.Close()
+		closeStoreReader()
 		return nil, verror.New(verror.ErrBadArg, ctx, err)
 	}
 	prefix, err := globPatternToPrefix(pattern)
 	if err != nil {
-		sn.Close()
+		closeStoreReader()
 		if verror.ErrorID(err) == verror.ErrBadArg.ID {
 			return nil, verror.NewErrNotImplemented(ctx)
 		}
 		return nil, verror.New(verror.ErrInternal, ctx, err)
 	}
-	it := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+	it := st.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
 	ch := make(chan string)
 	go func() {
-		defer sn.Close()
+		defer closeStoreReader()
 		defer close(ch)
 		key := []byte{}
 		for it.Advance() {
diff --git a/x/ref/services/syncbase/server/util/key_util.go b/x/ref/services/syncbase/server/util/key_util.go
index b469f9e..80a8a6d 100644
--- a/x/ref/services/syncbase/server/util/key_util.go
+++ b/x/ref/services/syncbase/server/util/key_util.go
@@ -13,12 +13,12 @@
 // JoinKeyParts builds keys for accessing data in the storage engine.
 func JoinKeyParts(parts ...string) string {
 	// TODO(sadovsky): Figure out which delimiter makes the most sense.
-	return strings.Join(parts, ":")
+	return strings.Join(parts, KeyPartSep)
 }
 
 // SplitKeyParts is the inverse of JoinKeyParts.
 func SplitKeyParts(key string) []string {
-	return strings.Split(key, ":")
+	return strings.Split(key, KeyPartSep)
 }
 
 // ScanPrefixArgs returns args for sn.Scan() for the specified prefix.