syncbase: implement client batches

Change-Id: I9556f926c32b6e37772d276975285c6bffb3a9fc
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index aa4bfb3..1e1fe1e 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -19,6 +19,7 @@
 	"v.io/v23/verror"
 )
 
+// app is a per-app singleton (i.e. not per-request) that handles App RPCs.
 type app struct {
 	name string
 	s    *service
@@ -66,12 +67,14 @@
 func (a *app) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	// Check perms.
 	sn := a.s.st.NewSnapshot()
+	closeSnapshot := func() error {
+		return sn.Close()
+	}
 	if err := util.Get(ctx, call, sn, a, &appData{}); err != nil {
-		sn.Close()
+		closeSnapshot()
 		return nil, err
 	}
-	pattern := "*"
-	return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.JoinKeyParts(util.DbInfoPrefix, a.name))
 }
 
 ////////////////////////////////////////
diff --git a/services/syncbase/server/dispatcher.go b/services/syncbase/server/dispatcher.go
index f6a4dde..17fb042 100644
--- a/services/syncbase/server/dispatcher.go
+++ b/services/syncbase/server/dispatcher.go
@@ -48,8 +48,8 @@
 
 	aExists := false
 	var a *app
-	if aint, err := disp.s.App(nil, nil, appName); err == nil {
-		a = aint.(*app) // panics on failure, as desired
+	if aInt, err := disp.s.App(nil, nil, appName); err == nil {
+		a = aInt.(*app) // panics on failure, as desired
 		aExists = true
 	} else {
 		if verror.ErrorID(err) != verror.ErrNoExist.ID {
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 1f3cbd3..82a366d 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -5,7 +5,12 @@
 package nosql
 
 import (
+	"math/rand"
 	"path"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
 
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
@@ -19,19 +24,41 @@
 	"v.io/x/lib/vlog"
 )
 
+// database is a per-database singleton (i.e. not per-request). It does not
+// directly handle RPCs.
+// Note: If a database does not exist at the time of a database RPC, the
+// dispatcher creates a short-lived database object to service that particular
+// request.
 type database struct {
 	name string
 	a    interfaces.App
 	// The fields below are initialized iff this database exists.
 	st store.Store // stores all data for a single database
+
+	// Active snapshots and transactions corresponding to client batches.
+	// TODO(sadovsky): Add timeouts and GC.
+	mu  sync.Mutex // protects the fields below
+	sns map[uint64]store.Snapshot
+	txs map[uint64]store.Transaction
+}
+
+// databaseReq is a per-request object that handles Database RPCs.
+// It embeds database and tracks request-specific batch state.
+type databaseReq struct {
+	*database
+	// If non-nil, sn or tx will be non-nil.
+	batchId *uint64
+	sn      store.Snapshot
+	tx      store.Transaction
 }
 
 var (
-	_ wire.DatabaseServerMethods = (*database)(nil)
+	_ wire.DatabaseServerMethods = (*databaseReq)(nil)
 	_ interfaces.Database        = (*database)(nil)
 	_ util.Layer                 = (*database)(nil)
 )
 
+// DatabaseOptions configures a database.
 type DatabaseOptions struct {
 	// Database-level permissions.
 	Perms access.Permissions
@@ -62,6 +89,8 @@
 		name: name,
 		a:    a,
 		st:   st,
+		sns:  make(map[uint64]store.Snapshot),
+		txs:  make(map[uint64]store.Transaction),
 	}
 	data := &databaseData{
 		Name:  d.name,
@@ -76,34 +105,100 @@
 ////////////////////////////////////////
 // RPC methods
 
-func (d *database) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	// This database does not yet exist; d is just an ephemeral handle that holds
 	// {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
 	// handle and store it in d.a.dbs[d.name].
 	return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms)
 }
 
-func (d *database) Delete(ctx *context.T, call rpc.ServerCall) error {
+func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return d.a.DeleteNoSQLDatabase(ctx, call, d.name)
 }
 
-func (d *database) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
-	return "", verror.NewErrNotImplemented(ctx)
+var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
+	if d.batchId != nil {
+		return "", wire.NewErrBoundToBatch(ctx)
+	}
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	var id uint64
+	var batchType string
+	for {
+		id = uint64(rng.Int63())
+		if bo.ReadOnly {
+			if _, ok := d.sns[id]; !ok {
+				d.sns[id] = d.st.NewSnapshot()
+				batchType = "sn"
+				break
+			}
+		} else {
+			if _, ok := d.txs[id]; !ok {
+				d.txs[id] = d.st.NewTransaction()
+				batchType = "tx"
+				break
+			}
+		}
+	}
+	return strings.Join([]string{d.name, batchType, strconv.FormatUint(id, 10)}, util.BatchSep), nil
 }
 
-func (d *database) Commit(ctx *context.T, call rpc.ServerCall) error {
-	return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall) error {
+	if d.batchId == nil {
+		return wire.NewErrNotBoundToBatch(ctx)
+	}
+	if d.tx == nil {
+		return wire.NewErrReadOnlyBatch(ctx)
+	}
+	var err error
+	if err = d.tx.Commit(); err == nil {
+		d.mu.Lock()
+		delete(d.txs, *d.batchId)
+		d.mu.Unlock()
+	}
+	return err
 }
 
-func (d *database) Abort(ctx *context.T, call rpc.ServerCall) error {
-	return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall) error {
+	if d.batchId == nil {
+		return wire.NewErrNotBoundToBatch(ctx)
+	}
+	var err error
+	if d.tx != nil {
+		if err = d.tx.Abort(); err == nil {
+			d.mu.Lock()
+			delete(d.txs, *d.batchId)
+			d.mu.Unlock()
+		}
+	} else {
+		if err = d.sn.Close(); err == nil {
+			d.mu.Lock()
+			delete(d.sns, *d.batchId)
+			d.mu.Unlock()
+		}
+	}
+	return err
 }
 
-func (d *database) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return d.a.SetDatabasePerms(ctx, call, d.name, perms, version)
 }
 
-func (d *database) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+	if d.batchId != nil {
+		return nil, "", wire.NewErrBoundToBatch(ctx)
+	}
 	data := &databaseData{}
 	if err := util.Get(ctx, call, d.st, d, data); err != nil {
 		return nil, "", err
@@ -111,15 +206,20 @@
 	return data.Perms, util.FormatVersion(data.Version), nil
 }
 
-func (d *database) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+	if d.batchId != nil {
+		return nil, wire.NewErrBoundToBatch(ctx)
+	}
 	// Check perms.
 	sn := d.st.NewSnapshot()
+	closeSnapshot := func() error {
+		return sn.Close()
+	}
 	if err := util.Get(ctx, call, sn, d, &databaseData{}); err != nil {
-		sn.Close()
+		closeSnapshot()
 		return nil, err
 	}
-	pattern := "*"
-	return util.Glob(ctx, call, pattern, sn, util.TablePrefix)
+	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
 }
 
 ////////////////////////////////////////
@@ -167,3 +267,26 @@
 func (d *database) StKey() string {
 	return util.DatabasePrefix
 }
+
+////////////////////////////////////////
+// Internal helpers
+
+func (d *databaseReq) batchReader() store.StoreReader {
+	if d.batchId == nil {
+		return nil
+	} else if d.sn != nil {
+		return d.sn
+	} else {
+		return d.tx
+	}
+}
+
+func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
+	if d.batchId == nil {
+		return nil, nil
+	} else if d.tx != nil {
+		return d.tx, nil
+	} else {
+		return nil, wire.NewErrReadOnlyBatch(nil)
+	}
+}
diff --git a/services/syncbase/server/nosql/database_sgm.go b/services/syncbase/server/nosql/database_sgm.go
index 768b478..b5797b0 100644
--- a/services/syncbase/server/nosql/database_sgm.go
+++ b/services/syncbase/server/nosql/database_sgm.go
@@ -16,39 +16,66 @@
 ////////////////////////////////////////
 // SyncGroup RPC methods
 
-func (d *database) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+func (d *databaseReq) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	if d.batchId != nil {
+		return nil, wire.NewErrBoundToBatch(ctx)
+	}
 	return nil, verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+func (d *databaseReq) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	sd := vsync.NewSyncDatabase(d)
 	return sd.CreateSyncGroup(ctx, call, sgName, spec, myInfo)
 }
 
-func (d *database) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+func (d *databaseReq) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+	if d.batchId != nil {
+		return wire.SyncGroupSpec{}, wire.NewErrBoundToBatch(ctx)
+	}
 	return wire.SyncGroupSpec{}, verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+func (d *databaseReq) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+func (d *databaseReq) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+func (d *databaseReq) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+func (d *databaseReq) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+	if d.batchId != nil {
+		return wire.SyncGroupSpec{}, "", wire.NewErrBoundToBatch(ctx)
+	}
 	return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+func (d *databaseReq) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (d *database) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+func (d *databaseReq) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+	if d.batchId != nil {
+		return nil, wire.NewErrBoundToBatch(ctx)
+	}
 	return nil, verror.NewErrNotImplemented(ctx)
 }
diff --git a/services/syncbase/server/nosql/dispatcher.go b/services/syncbase/server/nosql/dispatcher.go
index d8fe2f0..3bbea26 100644
--- a/services/syncbase/server/nosql/dispatcher.go
+++ b/services/syncbase/server/nosql/dispatcher.go
@@ -5,12 +5,14 @@
 package nosql
 
 import (
+	"strconv"
 	"strings"
 
 	wire "v.io/syncbase/v23/services/syncbase"
 	nosqlWire "v.io/syncbase/v23/services/syncbase/nosql"
 	pubutil "v.io/syncbase/v23/syncbase/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/verror"
@@ -35,9 +37,15 @@
 		vlog.Fatal("invalid nosql.dispatcher Lookup")
 	}
 
+	dParts := strings.Split(parts[0], util.BatchSep)
+	dName := dParts[0]
+
 	// Validate all key atoms up front, so that we can avoid doing so in all our
 	// method implementations.
-	for _, s := range parts {
+	if !pubutil.ValidName(dName) {
+		return nil, nil, wire.NewErrInvalidName(nil, suffix)
+	}
+	for _, s := range parts[1:] {
 		if !pubutil.ValidName(s) {
 			return nil, nil, wire.NewErrInvalidName(nil, suffix)
 		}
@@ -45,22 +53,28 @@
 
 	dExists := false
 	var d *database
-	if dint, err := disp.a.NoSQLDatabase(nil, nil, parts[0]); err == nil {
-		d = dint.(*database) // panics on failure, as desired
+	if dInt, err := disp.a.NoSQLDatabase(nil, nil, dName); err == nil {
+		d = dInt.(*database) // panics on failure, as desired
 		dExists = true
 	} else {
 		if verror.ErrorID(err) != verror.ErrNoExist.ID {
 			return nil, nil, err
 		} else {
+			// Database does not exist. Create a short-lived database object to
+			// service this request.
 			d = &database{
-				name: parts[0],
+				name: dName,
 				a:    disp.a,
 			}
 		}
 	}
 
+	dReq := &databaseReq{database: d}
+	if !setBatchFields(dReq, dParts) {
+		return nil, nil, wire.NewErrInvalidName(nil, suffix)
+	}
 	if len(parts) == 1 {
-		return nosqlWire.DatabaseServer(d), nil, nil
+		return nosqlWire.DatabaseServer(dReq), nil, nil
 	}
 
 	// All table and row methods require the database to exist. If it doesn't,
@@ -73,21 +87,50 @@
 	// downstream handling of this request. Depending on the order in which things
 	// execute, the client may not get an error, but in any case ultimately the
 	// store will end up in a consistent state.
-	t := &table{
+	tReq := &tableReq{
 		name: parts[1],
-		d:    d,
+		d:    dReq,
 	}
 	if len(parts) == 2 {
-		return nosqlWire.TableServer(t), nil, nil
+		return nosqlWire.TableServer(tReq), nil, nil
 	}
 
-	r := &row{
+	rReq := &rowReq{
 		key: parts[2],
-		t:   t,
+		t:   tReq,
 	}
 	if len(parts) == 3 {
-		return nosqlWire.RowServer(r), nil, nil
+		return nosqlWire.RowServer(rReq), nil, nil
 	}
 
 	return nil, nil, verror.NewErrNoExist(nil)
 }
+
+// setBatchFields sets the batch-related fields in databaseReq based on the
+// value of dParts, the parts of the database name component. It returns false
+// if dParts is malformed.
+func setBatchFields(d *databaseReq, dParts []string) bool {
+	if len(dParts) == 1 {
+		return true
+	}
+	if len(dParts) != 3 {
+		return false
+	}
+	batchId, err := strconv.ParseUint(dParts[2], 0, 64)
+	if err != nil {
+		return false
+	}
+	d.batchId = &batchId
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	var ok bool
+	switch dParts[1] {
+	case "sn":
+		d.sn, ok = d.sns[batchId]
+	case "tx":
+		d.tx, ok = d.txs[batchId]
+	default:
+		return false
+	}
+	return ok
+}
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index 057dc10..3397fd1 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -13,67 +13,95 @@
 	"v.io/v23/verror"
 )
 
-// TODO(sadovsky): Extend data layout to support version tracking for sync.
-// See go/vanadium-local-structured-store.
-
-// TODO(sadovsky): Handle the case where we're in a batch.
-
-type row struct {
+// rowReq is a per-request object that handles Row RPCs.
+type rowReq struct {
 	key string
-	t   *table
+	t   *tableReq
 }
 
 var (
-	_ wire.RowServerMethods = (*row)(nil)
-	_ util.Layer            = (*row)(nil)
+	_ wire.RowServerMethods = (*rowReq)(nil)
+	_ util.Layer            = (*rowReq)(nil)
 )
 
 ////////////////////////////////////////
 // RPC methods
 
-func (r *row) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
-	return r.get(ctx, call, r.t.d.st)
+func (r *rowReq) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
+	impl := func(st store.StoreReader) ([]byte, error) {
+		return r.get(ctx, call, st)
+	}
+	var st store.StoreReader
+	if r.t.d.batchId != nil {
+		st = r.t.d.batchReader()
+	} else {
+		sn := r.t.d.st.NewSnapshot()
+		st = sn
+		defer sn.Close()
+	}
+	return impl(st)
 }
 
-func (r *row) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
-	return r.put(ctx, call, r.t.d.st, value)
+func (r *rowReq) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
+	impl := func(st store.StoreReadWriter) error {
+		return r.put(ctx, call, st, value)
+	}
+	if r.t.d.batchId != nil {
+		if st, err := r.t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(r.t.d.st, impl)
+	}
 }
 
-func (r *row) Delete(ctx *context.T, call rpc.ServerCall) error {
-	return r.del(ctx, call, r.t.d.st)
+func (r *rowReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+	impl := func(st store.StoreReadWriter) error {
+		return r.delete(ctx, call, st)
+	}
+	if r.t.d.batchId != nil {
+		if st, err := r.t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(r.t.d.st, impl)
+	}
 }
 
 ////////////////////////////////////////
 // util.Layer methods
 
-func (r *row) Name() string {
+func (r *rowReq) Name() string {
 	return r.key
 }
 
-func (r *row) StKey() string {
+func (r *rowReq) StKey() string {
 	return util.JoinKeyParts(util.RowPrefix, r.stKeyPart())
 }
 
 ////////////////////////////////////////
 // Internal helpers
 
-func (r *row) stKeyPart() string {
+func (r *rowReq) stKeyPart() string {
 	return util.JoinKeyParts(r.t.stKeyPart(), r.key)
 }
 
-// TODO(sadovsky): Update access checks to use prefix permissions.
-
-// checkAccess checks that this row's table exists in the database and performs
+// checkAccess checks that this row's table exists in the database, and performs
 // an authorization check (currently against the table perms).
 // Returns a VDL-compatible error.
-func (r *row) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
+// TODO(sadovsky): Use prefix permissions.
+func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
 	return util.Get(ctx, call, st, r.t, &tableData{})
 }
 
 // get reads data from the storage engine.
 // Performs authorization check.
 // Returns a VDL-compatible error.
-func (r *row) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
+func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
 	if err := r.checkAccess(ctx, call, st); err != nil {
 		return nil, err
 	}
@@ -90,7 +118,7 @@
 // put writes data to the storage engine.
 // Performs authorization check.
 // Returns a VDL-compatible error.
-func (r *row) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
+func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
 	if err := r.checkAccess(ctx, call, st); err != nil {
 		return err
 	}
@@ -100,10 +128,10 @@
 	return nil
 }
 
-// del deletes data from the storage engine.
+// delete deletes data from the storage engine.
 // Performs authorization check.
 // Returns a VDL-compatible error.
-func (r *row) del(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
 	if err := r.checkAccess(ctx, call, st); err != nil {
 		return err
 	}
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index e73afd2..7619fc4 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -14,22 +14,24 @@
 	"v.io/v23/verror"
 )
 
-// TODO(sadovsky): Handle the case where we're in a batch.
-
-type table struct {
+// tableReq is a per-request object that handles Table RPCs.
+type tableReq struct {
 	name string
-	d    *database
+	d    *databaseReq
 }
 
 var (
-	_ wire.TableServerMethods = (*table)(nil)
-	_ util.Layer              = (*table)(nil)
+	_ wire.TableServerMethods = (*tableReq)(nil)
+	_ util.Layer              = (*tableReq)(nil)
 )
 
 ////////////////////////////////////////
 // RPC methods
 
-func (t *table) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (t *tableReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+	if t.d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
 		// Check databaseData perms.
 		dData := &databaseData{}
@@ -56,7 +58,10 @@
 	})
 }
 
-func (t *table) Delete(ctx *context.T, call rpc.ServerCall) error {
+func (t *tableReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+	if t.d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
 	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
 		// Read-check-delete tableData.
 		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
@@ -70,8 +75,8 @@
 	})
 }
 
-func (t *table) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
-	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
+	impl := func(st store.StoreReadWriter) error {
 		// Check perms.
 		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
 			return err
@@ -88,83 +93,136 @@
 			return verror.New(verror.ErrInternal, ctx, err)
 		}
 		return nil
-	})
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
 }
 
-func (t *table) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
-	sn := t.d.st.NewSnapshot()
-	defer sn.Close()
-	// Check perms.
-	if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
-		return err
+func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
+	impl := func(st store.StoreReader) error {
+		// Check perms.
+		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+			return err
+		}
+		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+		sender := call.SendStream()
+		key, value := []byte{}, []byte{}
+		for it.Advance() {
+			key, value = it.Key(key), it.Value(value)
+			parts := util.SplitKeyParts(string(key))
+			sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+		}
+		if err := it.Err(); err != nil {
+			return verror.New(verror.ErrInternal, ctx, err)
+		}
+		return nil
 	}
-	it := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
-	sender := call.SendStream()
-	key, value := []byte{}, []byte{}
-	for it.Advance() {
-		key, value = it.Key(key), it.Value(value)
-		parts := util.SplitKeyParts(string(key))
-		sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+	var st store.StoreReader
+	if t.d.batchId != nil {
+		st = t.d.batchReader()
+	} else {
+		sn := t.d.st.NewSnapshot()
+		st = sn
+		defer sn.Close()
 	}
-	if err := it.Err(); err != nil {
-		return verror.New(verror.ErrInternal, ctx, err)
-	}
-	return nil
+	return impl(st)
 }
 
-func (t *table) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
 	if prefix != "" {
 		return verror.NewErrNotImplemented(ctx)
 	}
-	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+	impl := func(st store.StoreReadWriter) error {
 		data := &tableData{}
 		return util.Update(ctx, call, st, t, data, func() error {
 			data.Perms = perms
 			return nil
 		})
-	})
+	}
+	if t.d.batchId != nil {
+		if st, err := t.d.batchReadWriter(); err != nil {
+			return err
+		} else {
+			return impl(st)
+		}
+	} else {
+		return store.RunInTransaction(t.d.st, impl)
+	}
 }
 
-func (t *table) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
+func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
 	if key != "" {
 		return nil, verror.NewErrNotImplemented(ctx)
 	}
-	data := &tableData{}
-	if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
-		return nil, err
+	impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
+		data := &tableData{}
+		if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+			return nil, err
+		}
+		return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
 	}
-	return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+	var st store.StoreReader
+	if t.d.batchId != nil {
+		st = t.d.batchReader()
+	} else {
+		sn := t.d.st.NewSnapshot()
+		st = sn
+		defer sn.Close()
+	}
+	return impl(st)
 }
 
-func (t *table) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
+func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
 	return verror.NewErrNotImplemented(ctx)
 }
 
-func (t *table) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
-	sn := t.d.st.NewSnapshot()
-	// Check perms.
-	if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
-		sn.Close()
-		return nil, err
+func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+	impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
+		// Check perms.
+		if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+			closeStoreReader()
+			return nil, err
+		}
+		return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
 	}
-	pattern := "*"
-	return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.RowPrefix, t.name))
+	var st store.StoreReader
+	var closeStoreReader func() error
+	if t.d.batchId != nil {
+		st = t.d.batchReader()
+		closeStoreReader = func() error {
+			return nil
+		}
+	} else {
+		sn := t.d.st.NewSnapshot()
+		st = sn
+		closeStoreReader = func() error {
+			return sn.Close()
+		}
+	}
+	return impl(st, closeStoreReader)
 }
 
 ////////////////////////////////////////
 // util.Layer methods
 
-func (t *table) Name() string {
+func (t *tableReq) Name() string {
 	return t.name
 }
 
-func (t *table) StKey() string {
+func (t *tableReq) StKey() string {
 	return util.JoinKeyParts(util.TablePrefix, t.stKeyPart())
 }
 
 ////////////////////////////////////////
 // Internal helpers
 
-func (t *table) stKeyPart() string {
+func (t *tableReq) stKeyPart() string {
 	return t.name
 }
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index fd3e3b8..86a2a37 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -23,15 +23,7 @@
 	"v.io/v23/verror"
 )
 
-type ServiceOptions struct {
-	// Service-level permissions.
-	Perms access.Permissions
-	// Root dir for data storage.
-	RootDir string
-	// Storage engine to use (for service and per-database engines).
-	Engine string
-}
-
+// service is a singleton (i.e. not per-request) that handles Service RPCs.
 type service struct {
 	st   store.Store // keeps track of which apps and databases exist, etc.
 	sync interfaces.SyncServerMethods
@@ -48,6 +40,16 @@
 	_ util.Layer                = (*service)(nil)
 )
 
+// ServiceOptions configures a service.
+type ServiceOptions struct {
+	// Service-level permissions.
+	Perms access.Permissions
+	// Root dir for data storage.
+	RootDir string
+	// Storage engine to use (for service and per-database engines).
+	Engine string
+}
+
 // NewService creates a new service instance and returns it.
 // Returns a VDL-compatible error.
 func NewService(ctx *context.T, call rpc.ServerCall, opts ServiceOptions) (*service, error) {
@@ -103,12 +105,14 @@
 func (s *service) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	// Check perms.
 	sn := s.st.NewSnapshot()
+	closeSnapshot := func() error {
+		return sn.Close()
+	}
 	if err := util.Get(ctx, call, sn, s, &serviceData{}); err != nil {
-		sn.Close()
+		closeSnapshot()
 		return nil, err
 	}
-	pattern := "*"
-	return util.Glob(ctx, call, pattern, sn, util.AppPrefix)
+	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.AppPrefix)
 }
 
 ////////////////////////////////////////
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 88e1115..44d0ff6 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -23,4 +23,8 @@
 const (
 	// Service object name suffix for Syncbase-to-Syncbase RPCs.
 	SyncbaseSuffix = "$internal"
+	// Separator for batch info in database names.
+	BatchSep = ":"
+	// Separator for parts of storage engine keys.
+	KeyPartSep = ":"
 )
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
index 87a204e..acdd74f 100644
--- a/services/syncbase/server/util/glob.go
+++ b/services/syncbase/server/util/glob.go
@@ -53,29 +53,29 @@
 	return res, nil
 }
 
-// Takes ownership of sn.
+// Glob performs a glob. It calls closeStoreReader to close st.
 // TODO(sadovsky): Why do we make developers implement Glob differently from
 // other streaming RPCs? It's confusing that Glob must return immediately and
 // write its results to a channel, while other streaming RPC handlers must block
 // and write their results to the output stream. See nlacasse's TODO below, too.
-func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan string, error) {
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, st store.StoreReader, closeStoreReader func() error, stKeyPrefix string) (<-chan string, error) {
 	// TODO(sadovsky): Support glob with non-prefix pattern.
 	if _, err := glob.Parse(pattern); err != nil {
-		sn.Close()
+		closeStoreReader()
 		return nil, verror.New(verror.ErrBadArg, ctx, err)
 	}
 	prefix, err := globPatternToPrefix(pattern)
 	if err != nil {
-		sn.Close()
+		closeStoreReader()
 		if verror.ErrorID(err) == verror.ErrBadArg.ID {
 			return nil, verror.NewErrNotImplemented(ctx)
 		}
 		return nil, verror.New(verror.ErrInternal, ctx, err)
 	}
-	it := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+	it := st.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
 	ch := make(chan string)
 	go func() {
-		defer sn.Close()
+		defer closeStoreReader()
 		defer close(ch)
 		key := []byte{}
 		for it.Advance() {
diff --git a/services/syncbase/server/util/key_util.go b/services/syncbase/server/util/key_util.go
index b469f9e..80a8a6d 100644
--- a/services/syncbase/server/util/key_util.go
+++ b/services/syncbase/server/util/key_util.go
@@ -13,12 +13,12 @@
 // JoinKeyParts builds keys for accessing data in the storage engine.
 func JoinKeyParts(parts ...string) string {
 	// TODO(sadovsky): Figure out which delimiter makes the most sense.
-	return strings.Join(parts, ":")
+	return strings.Join(parts, KeyPartSep)
 }
 
 // SplitKeyParts is the inverse of JoinKeyParts.
 func SplitKeyParts(key string) []string {
-	return strings.Split(key, ":")
+	return strings.Split(key, KeyPartSep)
 }
 
 // ScanPrefixArgs returns args for sn.Scan() for the specified prefix.