internal storage engine: improve compile-time typing

This change does to internal storage engine API changes.

First, renaming Snapshot.Close() -> Snapshot.Abort().
Before this change, the Store interface implemented the
Snapshot interface, now it doesn't.

Second, adding the SnapshotOrTranscation interface.
Only Snapshot and Transaction implement this interface.
Transaction doesn't implement Snapshot.

Third, the StoreReadWriter becomes private. The StoreReadWriter
was replaced by the Transaction. This was done because in lots
of places the StoreReadWriter was checked to be the Transaction
at runtime.

This change also updates the code to the new API and removes
extra runtime type checking.

MultiPart: 1/2

Change-Id: Ic213cc0479a6cd8604bc62c29edaf7adb95b260e
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index 5dd9cad..957f4ed 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -88,14 +88,11 @@
 	}
 	// Check perms.
 	sn := a.s.st.NewSnapshot()
-	closeSnapshot := func() error {
-		return sn.Close()
-	}
 	if err := util.GetWithAuth(ctx, call, sn, a.stKey(), &appData{}); err != nil {
-		closeSnapshot()
+		sn.Abort()
 		return nil, err
 	}
-	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+	return util.Glob(ctx, call, "*", sn, sn.Abort, util.JoinKeyParts(util.DbInfoPrefix, a.name))
 }
 
 ////////////////////////////////////////
@@ -155,13 +152,13 @@
 	// 1. Check appData perms, create dbInfo record.
 	rootDir, engine := a.rootDirForDb(dbName), a.s.opts.Engine
 	aData := &appData{}
-	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
 		// Check appData perms.
-		if err := util.GetWithAuth(ctx, call, st, a.stKey(), aData); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, a.stKey(), aData); err != nil {
 			return err
 		}
 		// Check for "database already exists".
-		if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if _, err := a.getDbInfo(ctx, tx, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -174,7 +171,7 @@
 			RootDir: rootDir,
 			Engine:  engine,
 		}
-		return a.putDbInfo(ctx, st, dbName, info)
+		return a.putDbInfo(ctx, tx, dbName, info)
 	}); err != nil {
 		return err
 	}
@@ -193,8 +190,8 @@
 	}
 
 	// 3. Flip dbInfo.Initialized to true.
-	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
+	if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
+		return a.updateDbInfo(ctx, tx, dbName, func(info *dbInfo) error {
 			info.Initialized = true
 			return nil
 		})
@@ -235,8 +232,8 @@
 	}
 
 	// 2. Flip dbInfo.Deleted to true.
-	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
+	if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
+		return a.updateDbInfo(ctx, tx, dbName, func(info *dbInfo) error {
 			info.Deleted = true
 			return nil
 		})
diff --git a/x/ref/services/syncbase/server/db_info.go b/x/ref/services/syncbase/server/db_info.go
index 39ffdc8..f750d57 100644
--- a/x/ref/services/syncbase/server/db_info.go
+++ b/x/ref/services/syncbase/server/db_info.go
@@ -24,33 +24,32 @@
 }
 
 // getDbInfo reads data from the storage engine.
-func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, sntx store.SnapshotOrTransaction, dbName string) (*dbInfo, error) {
 	info := &dbInfo{}
-	if err := util.Get(ctx, st, dbInfoStKey(a, dbName), info); err != nil {
+	if err := util.Get(ctx, sntx, dbInfoStKey(a, dbName), info); err != nil {
 		return nil, err
 	}
 	return info, nil
 }
 
 // putDbInfo writes data to the storage engine.
-func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
-	return util.Put(ctx, st, dbInfoStKey(a, dbName), info)
+func (a *app) putDbInfo(ctx *context.T, tx store.Transaction, dbName string, info *dbInfo) error {
+	return util.Put(ctx, tx, dbInfoStKey(a, dbName), info)
 }
 
 // delDbInfo deletes data from the storage engine.
-func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
-	return util.Delete(ctx, st, dbInfoStKey(a, dbName))
+func (a *app) delDbInfo(ctx *context.T, stw store.StoreWriter, dbName string) error {
+	return util.Delete(ctx, stw, dbInfoStKey(a, dbName))
 }
 
 // updateDbInfo performs a read-modify-write. fn should "modify" v.
-func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
-	_ = st.(store.Transaction) // panics on failure, as desired
-	info, err := a.getDbInfo(ctx, st, dbName)
+func (a *app) updateDbInfo(ctx *context.T, tx store.Transaction, dbName string, fn func(info *dbInfo) error) error {
+	info, err := a.getDbInfo(ctx, tx, dbName)
 	if err != nil {
 		return err
 	}
 	if err := fn(info); err != nil {
 		return err
 	}
-	return a.putDbInfo(ctx, st, dbName, info)
+	return a.putDbInfo(ctx, tx, dbName, info)
 }
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 3fe1b96..d4875c0 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -245,7 +245,7 @@
 			d.mu.Unlock()
 		}
 	} else {
-		if err = d.sn.Close(); err == nil {
+		if err = d.sn.Abort(); err == nil {
 			d.mu.Lock()
 			delete(d.sns, *d.batchId)
 			d.mu.Unlock()
@@ -276,13 +276,12 @@
 		}
 		return rs.Err()
 	}
-	var st store.StoreReader
+	var sntx store.SnapshotOrTransaction
 	if d.batchId != nil {
-		st = d.batchReader()
+		sntx = d.batchReader()
 	} else {
-		sn := d.st.NewSnapshot()
-		st = sn
-		defer sn.Close()
+		sntx = d.st.NewSnapshot()
+		defer sntx.Abort()
 	}
 	// queryDb implements query_db.Database
 	// which is needed by the query package's
@@ -291,7 +290,7 @@
 		ctx:  ctx,
 		call: call,
 		req:  d,
-		st:   st,
+		sntx: sntx,
 	}
 
 	return impl(query_exec.Exec(db, q))
@@ -349,14 +348,11 @@
 	}
 	// Check perms.
 	sn := d.st.NewSnapshot()
-	closeSnapshot := func() error {
-		return sn.Close()
-	}
 	if err := util.GetWithAuth(ctx, call, sn, d.stKey(), &databaseData{}); err != nil {
-		closeSnapshot()
+		sn.Abort()
 		return nil, err
 	}
-	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
+	return util.Glob(ctx, call, "*", sn, sn.Abort, util.TablePrefix)
 }
 
 ////////////////////////////////////////
@@ -421,9 +417,9 @@
 	if !d.exists {
 		vlog.Fatalf("database %q does not exist", d.name)
 	}
-	return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(d.st, func(tx store.Transaction) error {
 		data := &databaseData{}
-		return util.UpdateWithAuth(ctx, call, st, d.stKey(), data, func() error {
+		return util.UpdateWithAuth(ctx, call, tx, d.stKey(), data, func() error {
 			if err := util.CheckVersion(ctx, version, data.Version); err != nil {
 				return err
 			}
@@ -446,7 +442,7 @@
 	ctx  *context.T
 	call wire.DatabaseExecServerCall
 	req  *databaseReq
-	st   store.StoreReader
+	sntx store.SnapshotOrTransaction
 }
 
 func (db *queryDb) GetContext() *context.T {
@@ -462,7 +458,7 @@
 		},
 	}
 	// Now that we have a table, we need to check permissions.
-	if err := util.GetWithAuth(db.ctx, db.call, db.st, tDb.req.stKey(), &tableData{}); err != nil {
+	if err := util.GetWithAuth(db.ctx, db.call, db.sntx, tDb.req.stKey(), &tableData{}); err != nil {
 		return nil, err
 	}
 	return tDb, nil
@@ -478,7 +474,7 @@
 	for _, keyRange := range keyRanges {
 		// TODO(jkline): For now, acquire all of the streams at once to minimize the race condition.
 		//               Need a way to Scan multiple ranges at the same state of uncommitted changes.
-		streams = append(streams, t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
+		streams = append(streams, t.qdb.sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
 	}
 	return &kvs{
 		t:        t,
@@ -568,7 +564,7 @@
 	return util.DatabasePrefix
 }
 
-func (d *databaseReq) batchReader() store.StoreReader {
+func (d *databaseReq) batchReader() store.SnapshotOrTransaction {
 	if d.batchId == nil {
 		return nil
 	} else if d.sn != nil {
@@ -578,7 +574,7 @@
 	}
 }
 
-func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
+func (d *databaseReq) batchTransaction() (store.Transaction, error) {
 	if d.batchId == nil {
 		return nil, nil
 	} else if d.tx != nil {
diff --git a/x/ref/services/syncbase/server/nosql/database_sm.go b/x/ref/services/syncbase/server/nosql/database_sm.go
index 5d1239e..3c87a6b 100644
--- a/x/ref/services/syncbase/server/nosql/database_sm.go
+++ b/x/ref/services/syncbase/server/nosql/database_sm.go
@@ -41,9 +41,9 @@
 	}
 
 	// Check permissions on Database and store schema metadata.
-	return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(d.st, func(tx store.Transaction) error {
 		dbData := databaseData{}
-		return util.UpdateWithAuth(ctx, call, st, d.stKey(), &dbData, func() error {
+		return util.UpdateWithAuth(ctx, call, tx, d.stKey(), &dbData, func() error {
 			// NOTE: For now we expect the client to not issue multiple
 			// concurrent SetSchemaMetadata calls.
 			dbData.SchemaMetadata = &metadata
diff --git a/x/ref/services/syncbase/server/nosql/row.go b/x/ref/services/syncbase/server/nosql/row.go
index 62a5f3a..a6fbf9d 100644
--- a/x/ref/services/syncbase/server/nosql/row.go
+++ b/x/ref/services/syncbase/server/nosql/row.go
@@ -32,35 +32,33 @@
 }
 
 func (r *rowReq) Get(ctx *context.T, call rpc.ServerCall, schemaVersion int32) ([]byte, error) {
-	impl := func(st store.StoreReader) ([]byte, error) {
+	impl := func(sntx store.SnapshotOrTransaction) ([]byte, error) {
 		if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return []byte{}, err
 		}
-		return r.get(ctx, call, st)
+		return r.get(ctx, call, sntx)
 	}
-	var st store.StoreReader
 	if r.t.d.batchId != nil {
-		st = r.t.d.batchReader()
+		return impl(r.t.d.batchReader())
 	} else {
 		sn := r.t.d.st.NewSnapshot()
-		st = sn
-		defer sn.Close()
+		defer sn.Abort()
+		return impl(sn)
 	}
-	return impl(st)
 }
 
 func (r *rowReq) Put(ctx *context.T, call rpc.ServerCall, schemaVersion int32, value []byte) error {
-	impl := func(st store.StoreReadWriter) error {
+	impl := func(tx store.Transaction) error {
 		if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return err
 		}
-		return r.put(ctx, call, st, value)
+		return r.put(ctx, call, tx, value)
 	}
 	if r.t.d.batchId != nil {
-		if st, err := r.t.d.batchReadWriter(); err != nil {
+		if tx, err := r.t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(r.t.d.st, impl)
@@ -68,17 +66,17 @@
 }
 
 func (r *rowReq) Delete(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
-	impl := func(st store.StoreReadWriter) error {
+	impl := func(tx store.Transaction) error {
 		if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return err
 		}
-		return r.delete(ctx, call, st)
+		return r.delete(ctx, call, tx)
 	}
 	if r.t.d.batchId != nil {
-		if st, err := r.t.d.batchReadWriter(); err != nil {
+		if tx, err := r.t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(r.t.d.st, impl)
@@ -98,17 +96,17 @@
 
 // checkAccess checks that this row's table exists in the database, and performs
 // an authorization check.
-func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
-	return r.t.checkAccess(ctx, call, st, r.key)
+func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction) error {
+	return r.t.checkAccess(ctx, call, sntx, r.key)
 }
 
 // get reads data from the storage engine.
 // Performs authorization check.
-func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
-	if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction) ([]byte, error) {
+	if err := r.checkAccess(ctx, call, sntx); err != nil {
 		return nil, err
 	}
-	value, err := st.Get([]byte(r.stKey()), nil)
+	value, err := sntx.Get([]byte(r.stKey()), nil)
 	if err != nil {
 		if verror.ErrorID(err) == store.ErrUnknownKey.ID {
 			return nil, verror.New(verror.ErrNoExist, ctx, r.stKey())
@@ -120,11 +118,11 @@
 
 // put writes data to the storage engine.
 // Performs authorization check.
-func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
-	if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, tx store.Transaction, value []byte) error {
+	if err := r.checkAccess(ctx, call, tx); err != nil {
 		return err
 	}
-	if err := st.Put([]byte(r.stKey()), value); err != nil {
+	if err := tx.Put([]byte(r.stKey()), value); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
 	return nil
@@ -132,11 +130,11 @@
 
 // delete deletes data from the storage engine.
 // Performs authorization check.
-func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
-	if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, tx store.Transaction) error {
+	if err := r.checkAccess(ctx, call, tx); err != nil {
 		return err
 	}
-	if err := st.Delete([]byte(r.stKey())); err != nil {
+	if err := tx.Delete([]byte(r.stKey())); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
 	return nil
diff --git a/x/ref/services/syncbase/server/nosql/table.go b/x/ref/services/syncbase/server/nosql/table.go
index 7b371e0..558a1c1 100644
--- a/x/ref/services/syncbase/server/nosql/table.go
+++ b/x/ref/services/syncbase/server/nosql/table.go
@@ -37,14 +37,14 @@
 	if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 		return err
 	}
-	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
 		// Check databaseData perms.
 		dData := &databaseData{}
-		if err := util.GetWithAuth(ctx, call, st, t.d.stKey(), dData); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, t.d.stKey(), dData); err != nil {
 			return err
 		}
 		// Check for "table already exists".
-		if err := util.Get(ctx, st, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.Get(ctx, tx, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -59,7 +59,7 @@
 			Name:  t.name,
 			Perms: perms,
 		}
-		return util.Put(ctx, st, t.stKey(), data)
+		return util.Put(ctx, tx, t.stKey(), data)
 	})
 }
 
@@ -70,16 +70,16 @@
 	if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 		return err
 	}
-	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
 		// Read-check-delete tableData.
-		if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, t.stKey(), &tableData{}); err != nil {
 			if verror.ErrorID(err) == verror.ErrNoExist.ID {
 				return nil // delete is idempotent
 			}
 			return err
 		}
 		// TODO(sadovsky): Delete all rows in this table.
-		return util.Delete(ctx, st, t.stKey())
+		return util.Delete(ctx, tx, t.stKey())
 	})
 }
 
@@ -91,9 +91,9 @@
 }
 
 func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, schemaVersion int32, start, limit []byte) error {
-	impl := func(st store.StoreReadWriter) error {
+	impl := func(tx store.Transaction) error {
 		// Check for table-level access before doing a scan.
-		if err := t.checkAccess(ctx, call, st, ""); err != nil {
+		if err := t.checkAccess(ctx, call, tx, ""); err != nil {
 			return err
 		}
 		// Check if the db schema version and the version provided by client
@@ -101,21 +101,21 @@
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return err
 		}
-		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+		it := tx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
 		key := []byte{}
 		for it.Advance() {
 			key = it.Key(key)
 			// Check perms.
 			parts := util.SplitKeyParts(string(key))
 			externalKey := parts[len(parts)-1]
-			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+			if err := t.checkAccess(ctx, call, tx, externalKey); err != nil {
 				// TODO(rogulenko): Revisit this behavior. Probably we should
 				// delete all rows that we have access to.
 				it.Cancel()
 				return err
 			}
 			// Delete the key-value pair.
-			if err := st.Delete(key); err != nil {
+			if err := tx.Delete(key); err != nil {
 				return verror.New(verror.ErrInternal, ctx, err)
 			}
 		}
@@ -125,10 +125,10 @@
 		return nil
 	}
 	if t.d.batchId != nil {
-		if st, err := t.d.batchReadWriter(); err != nil {
+		if tx, err := t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(t.d.st, impl)
@@ -136,15 +136,15 @@
 }
 
 func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, schemaVersion int32, start, limit []byte) error {
-	impl := func(st store.StoreReader) error {
+	impl := func(sntx store.SnapshotOrTransaction) error {
 		// Check for table-level access before doing a scan.
-		if err := t.checkAccess(ctx, call, st, ""); err != nil {
+		if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
 			return err
 		}
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return err
 		}
-		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+		it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
 		sender := call.SendStream()
 		key, value := []byte{}, []byte{}
 		for it.Advance() {
@@ -152,7 +152,7 @@
 			// Check perms.
 			parts := util.SplitKeyParts(string(key))
 			externalKey := parts[len(parts)-1]
-			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+			if err := t.checkAccess(ctx, call, sntx, externalKey); err != nil {
 				it.Cancel()
 				return err
 			}
@@ -163,28 +163,26 @@
 		}
 		return nil
 	}
-	var st store.StoreReader
 	if t.d.batchId != nil {
-		st = t.d.batchReader()
+		return impl(t.d.batchReader())
 	} else {
-		sn := t.d.st.NewSnapshot()
-		st = sn
-		defer sn.Close()
+		sntx := t.d.st.NewSnapshot()
+		defer sntx.Abort()
+		return impl(sntx)
 	}
-	return impl(st)
 }
 
 func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, key string) ([]wire.PrefixPermissions, error) {
-	impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
+	impl := func(sntx store.SnapshotOrTransaction) ([]wire.PrefixPermissions, error) {
 		// Check permissions only at table level.
-		if err := t.checkAccess(ctx, call, st, ""); err != nil {
+		if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
 			return nil, err
 		}
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return nil, err
 		}
 		// Get the most specific permissions object.
-		prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+		prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
 		if err != nil {
 			return nil, err
 		}
@@ -192,27 +190,25 @@
 		// Collect all parent permissions objects all the way up to the table level.
 		for prefix != "" {
 			prefix = prefixPerms.Parent
-			if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+			if prefixPerms, err = t.permsForPrefix(ctx, sntx, prefixPerms.Parent); err != nil {
 				return nil, err
 			}
 			result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
 		}
 		return result, nil
 	}
-	var st store.StoreReader
 	if t.d.batchId != nil {
-		st = t.d.batchReader()
+		return impl(t.d.batchReader())
 	} else {
-		sn := t.d.st.NewSnapshot()
-		st = sn
-		defer sn.Close()
+		sntx := t.d.st.NewSnapshot()
+		defer sntx.Abort()
+		return impl(sntx)
 	}
-	return impl(st)
 }
 
 func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, prefix string, perms access.Permissions) error {
-	impl := func(st store.StoreReadWriter) error {
-		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+	impl := func(tx store.Transaction) error {
+		if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
 			return err
 		}
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
@@ -220,18 +216,18 @@
 		}
 		// Concurrent transactions that touch this table should fail with
 		// ErrConcurrentTransaction when this transaction commits.
-		if err := t.lock(ctx, st); err != nil {
+		if err := t.lock(ctx, tx); err != nil {
 			return err
 		}
 		if prefix == "" {
 			data := &tableData{}
-			return util.UpdateWithAuth(ctx, call, st, t.stKey(), data, func() error {
+			return util.UpdateWithAuth(ctx, call, tx, t.stKey(), data, func() error {
 				data.Perms = perms
 				return nil
 			})
 		}
 		// Get the most specific permissions object.
-		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
 		if err != nil {
 			return err
 		}
@@ -240,7 +236,7 @@
 		// parents for all children of the prefix to the node corresponding to
 		// the prefix.
 		if parent != prefix {
-			if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+			if err := t.updateParentRefs(ctx, tx, prefix, prefix); err != nil {
 				return err
 			}
 		} else {
@@ -250,16 +246,16 @@
 		stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
 		prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
 		// Put the (prefix, perms) pair to the database.
-		if err := util.Put(ctx, st, stPrefix, prefixPerms); err != nil {
+		if err := util.Put(ctx, tx, stPrefix, prefixPerms); err != nil {
 			return err
 		}
-		return util.Put(ctx, st, stPrefixLimit, prefixPerms)
+		return util.Put(ctx, tx, stPrefixLimit, prefixPerms)
 	}
 	if t.d.batchId != nil {
-		if st, err := t.d.batchReadWriter(); err != nil {
+		if tx, err := t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(t.d.st, impl)
@@ -270,8 +266,8 @@
 	if prefix == "" {
 		return verror.New(verror.ErrBadArg, ctx, prefix)
 	}
-	impl := func(st store.StoreReadWriter) error {
-		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+	impl := func(tx store.Transaction) error {
+		if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
 			return err
 		}
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
@@ -279,11 +275,11 @@
 		}
 		// Concurrent transactions that touch this table should fail with
 		// ErrConcurrentTransaction when this transaction commits.
-		if err := t.lock(ctx, st); err != nil {
+		if err := t.lock(ctx, tx); err != nil {
 			return err
 		}
 		// Get the most specific permissions object.
-		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
 		if err != nil {
 			return err
 		}
@@ -295,21 +291,21 @@
 		// We need to delete the node corresponding to the prefix from the prefix
 		// permissions tree. We do it by updating parents for all children of the
 		// prefix to the parent of the node corresponding to the prefix.
-		if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+		if err := t.updateParentRefs(ctx, tx, prefix, prefixPerms.Parent); err != nil {
 			return err
 		}
 		stPrefix := []byte(t.prefixPermsKey(prefix))
 		stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
-		if err := st.Delete(stPrefix); err != nil {
+		if err := tx.Delete(stPrefix); err != nil {
 			return err
 		}
-		return st.Delete(stPrefixLimit)
+		return tx.Delete(stPrefixLimit)
 	}
 	if t.d.batchId != nil {
-		if st, err := t.d.batchReadWriter(); err != nil {
+		if tx, err := t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(t.d.st, impl)
@@ -317,30 +313,23 @@
 }
 
 func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
-	impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
+	impl := func(sntx store.SnapshotOrTransaction, closeSntx func() error) (<-chan string, error) {
 		// Check perms.
-		if err := t.checkAccess(ctx, call, st, ""); err != nil {
-			closeStoreReader()
+		if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
+			closeSntx()
 			return nil, err
 		}
 		// TODO(rogulenko): Check prefix permissions for children.
-		return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
+		return util.Glob(ctx, call, "*", sntx, closeSntx, util.JoinKeyParts(util.RowPrefix, t.name))
 	}
-	var st store.StoreReader
-	var closeStoreReader func() error
 	if t.d.batchId != nil {
-		st = t.d.batchReader()
-		closeStoreReader = func() error {
+		return impl(t.d.batchReader(), func() error {
 			return nil
-		}
+		})
 	} else {
 		sn := t.d.st.NewSnapshot()
-		st = sn
-		closeStoreReader = func() error {
-			return sn.Close()
-		}
+		return impl(sn, sn.Abort)
 	}
-	return impl(st, closeStoreReader)
 }
 
 ////////////////////////////////////////
@@ -356,11 +345,11 @@
 
 // updateParentRefs updates the parent for all children of the given
 // prefix to newParent.
-func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+func (t *tableReq) updateParentRefs(ctx *context.T, tx store.Transaction, prefix, newParent string) error {
 	stPrefix := []byte(t.prefixPermsKey(prefix))
 	stPrefixStart := append(stPrefix, 0)
 	stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
-	it := st.Scan(stPrefixStart, stPrefixLimit)
+	it := tx.Scan(stPrefixStart, stPrefixLimit)
 	var key, value []byte
 	for it.Advance() {
 		key, value = it.Key(key), it.Value(value)
@@ -370,7 +359,7 @@
 			return verror.New(verror.ErrInternal, ctx, err)
 		}
 		prefixPerms.Parent = newParent
-		if err := util.Put(ctx, st, string(key), prefixPerms); err != nil {
+		if err := util.Put(ctx, tx, string(key), prefixPerms); err != nil {
 			it.Cancel()
 			return err
 		}
@@ -393,12 +382,12 @@
 // TODO(rogulenko): Revisit this behavior to provide more granularity.
 // One option is to add a prefix and its parent to the write set of the current
 // transaction when the permissions object for that prefix is updated.
-func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+func (t *tableReq) lock(ctx *context.T, tx store.Transaction) error {
 	var data tableData
-	if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
+	if err := util.Get(ctx, tx, t.stKey(), &data); err != nil {
 		return err
 	}
-	return util.Put(ctx, st, t.stKey(), data)
+	return util.Put(ctx, tx, t.stKey(), data)
 }
 
 // checkAccess checks that this table exists in the database, and performs
@@ -407,13 +396,13 @@
 // TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
 // access check to be a check for "Resolve", i.e. also check access to
 // service, app and database.
-func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
-	prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction, key string) error {
+	prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
 	if err != nil {
 		return err
 	}
 	if prefix != "" {
-		if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
+		if err := util.GetWithAuth(ctx, call, sntx, t.stKey(), &tableData{}); err != nil {
 			return err
 		}
 	}
@@ -448,10 +437,10 @@
 //   proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
 //   K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
 //   prefix of K; in this case line 3 returns correct result.
-func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
-	it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+func (t *tableReq) permsForKey(ctx *context.T, sntx store.SnapshotOrTransaction, key string) (string, stPrefixPerms, error) {
+	it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
 	if !it.Advance() {
-		prefixPerms, err := t.permsForPrefix(ctx, st, "")
+		prefixPerms, err := t.permsForPrefix(ctx, sntx, "")
 		return "", prefixPerms, err
 	}
 	defer it.Cancel()
@@ -465,22 +454,22 @@
 	if strings.HasPrefix(key, prefix) {
 		return prefix, prefixPerms, nil
 	}
-	prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+	prefixPerms, err := t.permsForPrefix(ctx, sntx, prefixPerms.Parent)
 	return prefixPerms.Parent, prefixPerms, err
 }
 
 // permsForPrefix returns the permissions object associated with the
 // provided prefix.
-func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+func (t *tableReq) permsForPrefix(ctx *context.T, sntx store.SnapshotOrTransaction, prefix string) (stPrefixPerms, error) {
 	if prefix == "" {
 		var data tableData
-		if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
+		if err := util.Get(ctx, sntx, t.stKey(), &data); err != nil {
 			return stPrefixPerms{}, err
 		}
 		return stPrefixPerms{Perms: data.Perms}, nil
 	}
 	var prefixPerms stPrefixPerms
-	if err := util.Get(ctx, st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+	if err := util.Get(ctx, sntx, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
 		return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
 	}
 	return prefixPerms, nil
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index 8ed1480..5b87252 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -139,9 +139,9 @@
 // RPC methods
 
 func (s *service) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
-	return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(s.st, func(tx store.Transaction) error {
 		data := &serviceData{}
-		return util.UpdateWithAuth(ctx, call, st, s.stKey(), data, func() error {
+		return util.UpdateWithAuth(ctx, call, tx, s.stKey(), data, func() error {
 			if err := util.CheckVersion(ctx, version, data.Version); err != nil {
 				return err
 			}
@@ -163,14 +163,11 @@
 func (s *service) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	// Check perms.
 	sn := s.st.NewSnapshot()
-	closeSnapshot := func() error {
-		return sn.Close()
-	}
 	if err := util.GetWithAuth(ctx, call, sn, s.stKey(), &serviceData{}); err != nil {
-		closeSnapshot()
+		sn.Abort()
 		return nil, err
 	}
-	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.AppPrefix)
+	return util.Glob(ctx, call, "*", sn, sn.Abort, util.AppPrefix)
 }
 
 ////////////////////////////////////////
@@ -225,14 +222,14 @@
 		dbs:    make(map[string]interfaces.Database),
 	}
 
-	if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(s.st, func(tx store.Transaction) error {
 		// Check serviceData perms.
 		sData := &serviceData{}
-		if err := util.GetWithAuth(ctx, call, st, s.stKey(), sData); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, s.stKey(), sData); err != nil {
 			return err
 		}
 		// Check for "app already exists".
-		if err := util.Get(ctx, st, a.stKey(), &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.Get(ctx, tx, a.stKey(), &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -246,7 +243,7 @@
 			Name:  appName,
 			Perms: perms,
 		}
-		return util.Put(ctx, st, a.stKey(), data)
+		return util.Put(ctx, tx, a.stKey(), data)
 	}); err != nil {
 		return err
 	}
@@ -263,16 +260,16 @@
 		return nil // delete is idempotent
 	}
 
-	if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(s.st, func(tx store.Transaction) error {
 		// Read-check-delete appData.
-		if err := util.GetWithAuth(ctx, call, st, a.stKey(), &appData{}); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, a.stKey(), &appData{}); err != nil {
 			if verror.ErrorID(err) == verror.ErrNoExist.ID {
 				return nil // delete is idempotent
 			}
 			return err
 		}
 		// TODO(sadovsky): Delete all databases in this app.
-		return util.Delete(ctx, st, a.stKey())
+		return util.Delete(ctx, tx, a.stKey())
 	}); err != nil {
 		return err
 	}
@@ -288,9 +285,9 @@
 	if !ok {
 		return verror.New(verror.ErrNoExist, ctx, appName)
 	}
-	return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(s.st, func(tx store.Transaction) error {
 		data := &appData{}
-		return util.UpdateWithAuth(ctx, call, st, a.stKey(), data, func() error {
+		return util.UpdateWithAuth(ctx, call, tx, a.stKey(), data, func() error {
 			if err := util.CheckVersion(ctx, version, data.Version); err != nil {
 				return err
 			}
diff --git a/x/ref/services/syncbase/server/util/glob.go b/x/ref/services/syncbase/server/util/glob.go
index acdd74f..2cbc3f9 100644
--- a/x/ref/services/syncbase/server/util/glob.go
+++ b/x/ref/services/syncbase/server/util/glob.go
@@ -53,29 +53,29 @@
 	return res, nil
 }
 
-// Glob performs a glob. It calls closeStoreReader to close st.
+// Glob performs a glob. It calls closeSntx to close sntx.
 // TODO(sadovsky): Why do we make developers implement Glob differently from
 // other streaming RPCs? It's confusing that Glob must return immediately and
 // write its results to a channel, while other streaming RPC handlers must block
 // and write their results to the output stream. See nlacasse's TODO below, too.
-func Glob(ctx *context.T, call rpc.ServerCall, pattern string, st store.StoreReader, closeStoreReader func() error, stKeyPrefix string) (<-chan string, error) {
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sntx store.SnapshotOrTransaction, closeSntx func() error, stKeyPrefix string) (<-chan string, error) {
 	// TODO(sadovsky): Support glob with non-prefix pattern.
 	if _, err := glob.Parse(pattern); err != nil {
-		closeStoreReader()
+		closeSntx()
 		return nil, verror.New(verror.ErrBadArg, ctx, err)
 	}
 	prefix, err := globPatternToPrefix(pattern)
 	if err != nil {
-		closeStoreReader()
+		closeSntx()
 		if verror.ErrorID(err) == verror.ErrBadArg.ID {
 			return nil, verror.NewErrNotImplemented(ctx)
 		}
 		return nil, verror.New(verror.ErrInternal, ctx, err)
 	}
-	it := st.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+	it := sntx.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
 	ch := make(chan string)
 	go func() {
-		defer closeStoreReader()
+		defer closeSntx()
 		defer close(ch)
 		key := []byte{}
 		for it.Advance() {
diff --git a/x/ref/services/syncbase/server/util/store_util.go b/x/ref/services/syncbase/server/util/store_util.go
index 38fa8ce..b8f1905 100644
--- a/x/ref/services/syncbase/server/util/store_util.go
+++ b/x/ref/services/syncbase/server/util/store_util.go
@@ -64,21 +64,21 @@
 	return nil
 }
 
-// Put does st.Put(k, v) and wraps the returned error.
-func Put(ctx *context.T, st store.StoreWriter, k string, v interface{}) error {
+// Put does stw.Put(k, v) and wraps the returned error.
+func Put(ctx *context.T, stw store.StoreWriter, k string, v interface{}) error {
 	bytes, err := vom.Encode(v)
 	if err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
-	if err = st.Put([]byte(k), bytes); err != nil {
+	if err = stw.Put([]byte(k), bytes); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
 	return nil
 }
 
-// Delete does st.Delete(k, v) and wraps the returned error.
-func Delete(ctx *context.T, st store.StoreWriter, k string) error {
-	if err := st.Delete([]byte(k)); err != nil {
+// Delete does stw.Delete(k, v) and wraps the returned error.
+func Delete(ctx *context.T, stw store.StoreWriter, k string) error {
+	if err := stw.Delete([]byte(k)); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
 	return nil
@@ -87,15 +87,14 @@
 // UpdateWithAuth performs a read-modify-write.
 // Input v is populated by the "read" step. fn should "modify" v.
 // Performs an auth check as part of the "read" step.
-func UpdateWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, k string, v Permser, fn func() error) error {
-	_ = st.(store.Transaction) // panics on failure, as desired
-	if err := GetWithAuth(ctx, call, st, k, v); err != nil {
+func UpdateWithAuth(ctx *context.T, call rpc.ServerCall, tx store.Transaction, k string, v Permser, fn func() error) error {
+	if err := GetWithAuth(ctx, call, tx, k, v); err != nil {
 		return err
 	}
 	if err := fn(); err != nil {
 		return err
 	}
-	return Put(ctx, st, k, v)
+	return Put(ctx, tx, k, v)
 }
 
 // Wraps a call to Get and returns true if Get found the object, false
diff --git a/x/ref/services/syncbase/server/watchable/snapshot.go b/x/ref/services/syncbase/server/watchable/snapshot.go
index ac3694f..37af4e1 100644
--- a/x/ref/services/syncbase/server/watchable/snapshot.go
+++ b/x/ref/services/syncbase/server/watchable/snapshot.go
@@ -9,6 +9,7 @@
 )
 
 type snapshot struct {
+	store.SnapshotSpecImpl
 	isn store.Snapshot
 	st  *wstore
 }
@@ -22,9 +23,9 @@
 	}
 }
 
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
-	return s.isn.Close()
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
+	return s.isn.Abort()
 }
 
 // Get implements the store.StoreReader interface.
diff --git a/x/ref/services/syncbase/server/watchable/store.go b/x/ref/services/syncbase/server/watchable/store.go
index 2b0037a..010643d 100644
--- a/x/ref/services/syncbase/server/watchable/store.go
+++ b/x/ref/services/syncbase/server/watchable/store.go
@@ -79,7 +79,7 @@
 		return st.ist.Get(key, valbuf)
 	}
 	sn := newSnapshot(st)
-	defer sn.Close()
+	defer sn.Abort()
 	return sn.Get(key, valbuf)
 }
 
@@ -95,16 +95,16 @@
 // Put implements the store.StoreWriter interface.
 func (st *wstore) Put(key, value []byte) error {
 	// Use watchable.Store transaction so this op gets logged.
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Put(key, value)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Put(key, value)
 	})
 }
 
 // Delete implements the store.StoreWriter interface.
 func (st *wstore) Delete(key []byte) error {
 	// Use watchable.Store transaction so this op gets logged.
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Delete(key)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Delete(key)
 	})
 }
 
diff --git a/x/ref/services/syncbase/server/watchable/stream.go b/x/ref/services/syncbase/server/watchable/stream.go
index 9c8c392..26502e1 100644
--- a/x/ref/services/syncbase/server/watchable/stream.go
+++ b/x/ref/services/syncbase/server/watchable/stream.go
@@ -13,7 +13,7 @@
 // stream streams keys and values for versioned records.
 type stream struct {
 	iit      store.Stream
-	st       store.StoreReader
+	sntx     store.SnapshotOrTransaction
 	mu       sync.Mutex
 	err      error
 	hasValue bool
@@ -25,11 +25,10 @@
 
 // newStreamVersioned creates a new stream. It assumes all records in range
 // [start, limit) are managed, i.e. versioned.
-func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
-	checkTransactionOrSnapshot(st)
+func newStreamVersioned(sntx store.SnapshotOrTransaction, start, limit []byte) *stream {
 	return &stream{
-		iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
-		st:  st,
+		iit:  sntx.Scan(makeVersionKey(start), makeVersionKey(limit)),
+		sntx: sntx,
 	}
 }
 
@@ -46,7 +45,7 @@
 	}
 	versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
 	s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
-	s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
+	s.value, s.err = s.sntx.Get(makeAtVersionKey(s.key, version), nil)
 	if s.err != nil {
 		return false
 	}
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 93b6b33..fb82f72 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -177,7 +177,7 @@
 // operations (create, join, leave, destroy) to notify the sync watcher of the
 // change at its proper position in the timeline (the transaction commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(ctx *context.T, tx store.StoreReadWriter, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
@@ -195,7 +195,7 @@
 // current keys and their versions to use when initializing the sync metadata
 // at the point in the timeline when these keys become syncable (at commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncSnapshotOp(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
@@ -227,10 +227,10 @@
 // GetVersion returns the current version of a managed key. This method is used
 // by the Sync module when the initiator is attempting to add new versions of
 // objects. Reading the version key is used for optimistic concurrency
-// control. At minimum, an object implementing the StoreReader interface is
+// control. At minimum, an object implementing the Transaction interface is
 // required since this is a Get operation.
-func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
-	switch w := st.(type) {
+func GetVersion(ctx *context.T, tx store.Transaction, key []byte) ([]byte, error) {
+	switch w := tx.(type) {
 	case *transaction:
 		w.mu.Lock()
 		defer w.mu.Unlock()
@@ -238,8 +238,6 @@
 			return nil, convertError(w.err)
 		}
 		return getVersion(w.itx, key)
-	case *wstore:
-		return getVersion(w.ist, key)
 	}
 	return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
 }
@@ -266,8 +264,8 @@
 // PutAtVersion puts a value for the managed key at the requested version. This
 // method is used by the Sync module exclusively when the initiator adds objects
 // with versions created on other Syncbases. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutAtVersion(ctx *context.T, tx store.StoreReadWriter, key, valbuf, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
 	wtx := tx.(*transaction)
 
 	wtx.mu.Lock()
@@ -285,8 +283,8 @@
 // version. This method is used by the Sync module exclusively when the
 // initiator selects which of the already stored versions (via PutAtVersion
 // calls) becomes the current version. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutVersion(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
 	wtx := tx.(*transaction)
 
 	wtx.mu.Lock()
diff --git a/x/ref/services/syncbase/server/watchable/transaction_test.go b/x/ref/services/syncbase/server/watchable/transaction_test.go
index 43b7b94..8b95536 100644
--- a/x/ref/services/syncbase/server/watchable/transaction_test.go
+++ b/x/ref/services/syncbase/server/watchable/transaction_test.go
@@ -33,17 +33,17 @@
 	updateVal: "val-b2",
 }
 
-func checkAndUpdate(st store.StoreReadWriter, data testData) error {
+func checkAndUpdate(tx store.Transaction, data testData) error {
 	// check and update data1
 	keyBytes := []byte(data.key)
-	val, err := st.Get(keyBytes, nil)
+	val, err := tx.Get(keyBytes, nil)
 	if err != nil {
 		return fmt.Errorf("can't get key %q: %v", data.key, err)
 	}
 	if !bytes.Equal(val, []byte(data.createVal)) {
 		return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
 	}
-	if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
+	if err := tx.Put(keyBytes, []byte(data.updateVal)); err != nil {
 		return fmt.Errorf("can't put {%q: %v}: %v", data.key, data.updateVal, err)
 	}
 	return nil
@@ -79,13 +79,13 @@
 	setMockSystemClock(wst1, mockClock)
 
 	// Create data in store
-	if err := store.RunInTransaction(wst1, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(wst1, func(tx store.Transaction) error {
 		// add data1
-		if err := st.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
+		if err := tx.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
 			return fmt.Errorf("can't put {%q: %v}: %v", data1.key, data1.createVal, err)
 		}
 		// add data2
-		if err := st.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
+		if err := tx.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
 			return fmt.Errorf("can't put {%q: %v}: %v", data2.key, data2.createVal, err)
 		}
 		return nil
@@ -110,11 +110,11 @@
 		t.Errorf("unexpected sequence number for update. seq for create: %d, seq for update: %d", seqForCreate, seqForUpdate)
 	}
 
-	if err := store.RunInTransaction(wst2, func(st store.StoreReadWriter) error {
-		if err := checkAndUpdate(st, data1); err != nil {
+	if err := store.RunInTransaction(wst2, func(tx store.Transaction) error {
+		if err := checkAndUpdate(tx, data1); err != nil {
 			return err
 		}
-		if err := checkAndUpdate(st, data2); err != nil {
+		if err := checkAndUpdate(tx, data2); err != nil {
 			return err
 		}
 		return nil
@@ -144,33 +144,33 @@
 		t.Fatalf("Wrap failed: %v", err)
 	}
 
-	if err := store.RunInTransaction(wst, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(wst, func(tx store.Transaction) error {
 		putKey, putVal := []byte("foo"), []byte("bar")
-		if err := st.Put(putKey, putVal); err != nil {
+		if err := tx.Put(putKey, putVal); err != nil {
 			return err
 		}
 		getKey := []byte("foo")
-		if getVal, err := st.Get(getKey, nil); err != nil {
+		if getVal, err := tx.Get(getKey, nil); err != nil {
 			return err
 		} else {
 			eq(t, getVal, putVal)
 		}
 		start, limit := []byte("aaa"), []byte("bbb")
-		st.Scan(start, limit)
+		tx.Scan(start, limit)
 		delKey := []byte("foo")
-		if err := st.Delete(delKey); err != nil {
+		if err := tx.Delete(delKey); err != nil {
 			return err
 		}
 		sgPrefixes := []string{"sga", "sgb"}
-		if err := AddSyncGroupOp(nil, st, sgPrefixes, false); err != nil {
+		if err := AddSyncGroupOp(nil, tx, sgPrefixes, false); err != nil {
 			return err
 		}
 		snKey, snVersion := []byte("aa"), []byte("123")
-		if err := AddSyncSnapshotOp(nil, st, snKey, snVersion); err != nil {
+		if err := AddSyncSnapshotOp(nil, tx, snKey, snVersion); err != nil {
 			return err
 		}
 		pvKey, pvVersion := []byte("pv"), []byte("456")
-		if err := PutVersion(nil, st, pvKey, pvVersion); err != nil {
+		if err := PutVersion(nil, tx, pvKey, pvVersion); err != nil {
 			return err
 		}
 		for _, buf := range [][]byte{putKey, putVal, getKey, start, limit, delKey, snKey, snVersion, pvKey, pvVersion} {
diff --git a/x/ref/services/syncbase/server/watchable/util.go b/x/ref/services/syncbase/server/watchable/util.go
index 3d08129..64f8eeb 100644
--- a/x/ref/services/syncbase/server/watchable/util.go
+++ b/x/ref/services/syncbase/server/watchable/util.go
@@ -50,21 +50,20 @@
 	return []byte(join(string(key), string(version)))
 }
 
-func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
-	return st.Get(makeVersionKey(key), nil)
+func getVersion(sntx store.SnapshotOrTransaction, key []byte) ([]byte, error) {
+	return sntx.Get(makeVersionKey(key), nil)
 }
 
 func getAtVersion(st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
 	return st.Get(makeAtVersionKey(key, version), valbuf)
 }
 
-func getVersioned(st store.StoreReader, key, valbuf []byte) ([]byte, error) {
-	checkTransactionOrSnapshot(st)
-	version, err := getVersion(st, key)
+func getVersioned(sntx store.SnapshotOrTransaction, key, valbuf []byte) ([]byte, error) {
+	version, err := getVersion(sntx, key)
 	if err != nil {
 		return valbuf, err
 	}
-	return getAtVersion(st, key, valbuf, version)
+	return getAtVersion(sntx, key, valbuf, version)
 }
 
 func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
@@ -82,14 +81,6 @@
 	return tx.Delete(makeVersionKey(key))
 }
 
-func checkTransactionOrSnapshot(st store.StoreReader) {
-	_, isTransaction := st.(store.Transaction)
-	_, isSnapshot := st.(store.Snapshot)
-	if !isTransaction && !isSnapshot {
-		panic("neither a Transaction nor a Snapshot")
-	}
-}
-
 func getLogEntryKey(seq uint64) string {
 	// Note: MaxUint64 is 0xffffffffffffffff.
 	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
diff --git a/x/ref/services/syncbase/store/constants.go b/x/ref/services/syncbase/store/constants.go
index f97f2fd..26551fa 100644
--- a/x/ref/services/syncbase/store/constants.go
+++ b/x/ref/services/syncbase/store/constants.go
@@ -6,10 +6,10 @@
 
 // TODO(sadovsky): Maybe define verrors for these.
 const (
-	ErrMsgClosedStore    = "closed store"
-	ErrMsgClosedSnapshot = "closed snapshot"
-	ErrMsgCanceledStream = "canceled stream"
-	ErrMsgCommittedTxn   = "already called commit"
-	ErrMsgAbortedTxn     = "already called abort"
-	ErrMsgExpiredTxn     = "expired transaction"
+	ErrMsgClosedStore     = "closed store"
+	ErrMsgAbortedSnapshot = "aborted snapshot"
+	ErrMsgCanceledStream  = "canceled stream"
+	ErrMsgCommittedTxn    = "already called commit"
+	ErrMsgAbortedTxn      = "already called abort"
+	ErrMsgExpiredTxn      = "expired transaction"
 )
diff --git a/x/ref/services/syncbase/store/invalid_types.go b/x/ref/services/syncbase/store/invalid_types.go
index bb008e2..a230684 100644
--- a/x/ref/services/syncbase/store/invalid_types.go
+++ b/x/ref/services/syncbase/store/invalid_types.go
@@ -10,6 +10,7 @@
 
 // InvalidSnapshot is a Snapshot for which all methods return errors.
 type InvalidSnapshot struct {
+	SnapshotSpecImpl
 	Error error // returned by all methods
 }
 
@@ -32,8 +33,8 @@
 ////////////////////////////////////////////////////////////
 // InvalidSnapshot
 
-// Close implements the store.Snapshot interface.
-func (s *InvalidSnapshot) Close() error {
+// Abort implements the store.Snapshot interface.
+func (s *InvalidSnapshot) Abort() error {
 	return convertError(s.Error)
 }
 
diff --git a/x/ref/services/syncbase/store/leveldb/db.go b/x/ref/services/syncbase/store/leveldb/db.go
index 765103f..4f1ba6c 100644
--- a/x/ref/services/syncbase/store/leveldb/db.go
+++ b/x/ref/services/syncbase/store/leveldb/db.go
@@ -166,7 +166,7 @@
 	d.mu.RLock()
 	defer d.mu.RUnlock()
 	if d.err != nil {
-		return &store.InvalidSnapshot{d.err}
+		return &store.InvalidSnapshot{Error: d.err}
 	}
 	return newSnapshot(d, d.node)
 }
diff --git a/x/ref/services/syncbase/store/leveldb/snapshot.go b/x/ref/services/syncbase/store/leveldb/snapshot.go
index e43d67f..a07be02 100644
--- a/x/ref/services/syncbase/store/leveldb/snapshot.go
+++ b/x/ref/services/syncbase/store/leveldb/snapshot.go
@@ -16,6 +16,7 @@
 // snapshot is a wrapper around LevelDB snapshot that implements
 // the store.Snapshot interface.
 type snapshot struct {
+	store.SnapshotSpecImpl
 	// mu protects the state of the snapshot.
 	mu        sync.RWMutex
 	node      *store.ResourceNode
@@ -39,13 +40,13 @@
 		cOpts:     cOpts,
 	}
 	parent.AddChild(s.node, func() {
-		s.Close()
+		s.Abort()
 	})
 	return s
 }
 
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.err != nil {
@@ -56,7 +57,7 @@
 	s.cOpts = nil
 	C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
 	s.cSnapshot = nil
-	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
+	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedSnapshot)
 	return nil
 }
 
diff --git a/x/ref/services/syncbase/store/memstore/snapshot.go b/x/ref/services/syncbase/store/memstore/snapshot.go
index 43beffd..128d127 100644
--- a/x/ref/services/syncbase/store/memstore/snapshot.go
+++ b/x/ref/services/syncbase/store/memstore/snapshot.go
@@ -12,6 +12,7 @@
 )
 
 type snapshot struct {
+	store.SnapshotSpecImpl
 	mu   sync.Mutex
 	node *store.ResourceNode
 	data map[string][]byte
@@ -31,20 +32,20 @@
 		data: dataCopy,
 	}
 	parent.AddChild(s.node, func() {
-		s.Close()
+		s.Abort()
 	})
 	return s
 }
 
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.err != nil {
 		return convertError(s.err)
 	}
 	s.node.Close()
-	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
+	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedSnapshot)
 	return nil
 }
 
diff --git a/x/ref/services/syncbase/store/memstore/store.go b/x/ref/services/syncbase/store/memstore/store.go
index 7e3f816..f4ef644 100644
--- a/x/ref/services/syncbase/store/memstore/store.go
+++ b/x/ref/services/syncbase/store/memstore/store.go
@@ -73,15 +73,15 @@
 
 // Put implements the store.StoreWriter interface.
 func (st *memstore) Put(key, value []byte) error {
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Put(key, value)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Put(key, value)
 	})
 }
 
 // Delete implements the store.StoreWriter interface.
 func (st *memstore) Delete(key []byte) error {
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Delete(key)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Delete(key)
 	})
 }
 
@@ -101,7 +101,7 @@
 	st.mu.Lock()
 	defer st.mu.Unlock()
 	if st.err != nil {
-		return &store.InvalidSnapshot{st.err}
+		return &store.InvalidSnapshot{Error: st.err}
 	}
 	return newSnapshot(st, st.node)
 }
diff --git a/x/ref/services/syncbase/store/model.go b/x/ref/services/syncbase/store/model.go
index a2c48f0..be7265d 100644
--- a/x/ref/services/syncbase/store/model.go
+++ b/x/ref/services/syncbase/store/model.go
@@ -37,15 +37,15 @@
 	Delete(key []byte) error
 }
 
-// StoreReadWriter combines StoreReader and StoreWriter.
-type StoreReadWriter interface {
+// storeReadWriter combines StoreReader and StoreWriter.
+type storeReadWriter interface {
 	StoreReader
 	StoreWriter
 }
 
 // Store is a CRUD-capable storage engine that supports transactions.
 type Store interface {
-	StoreReadWriter
+	storeReadWriter
 
 	// Close closes the store.
 	Close() error
@@ -59,6 +59,29 @@
 	NewSnapshot() Snapshot
 }
 
+// SnapshotOrTransaction represents a Snapshot or a Transaction.
+type SnapshotOrTransaction interface {
+	StoreReader
+
+	// Abort closes the snapshot or transaction.
+	// Any subsequent method calls will fail.
+	// NOTE: this method is also used to distinguish between StoreReader and
+	// SnapshotOrTransaction.
+	Abort() error
+}
+
+// Snapshot is a handle to particular state in time of a Store.
+//
+// All read operations are executed against a consistent view of Store commit
+// history. Snapshots don't acquire locks and thus don't block transactions.
+type Snapshot interface {
+	SnapshotOrTransaction
+
+	// __snapshotSpec is a utility method to distinguish between Snapshot and
+	// SnapshotOrTransaction. This is a no-op.
+	__snapshotSpec()
+}
+
 // Transaction provides a mechanism for atomic reads and writes. Instead of
 // calling this function directly, clients are encouraged to use the
 // RunInTransaction() helper function, which detects "concurrent transaction"
@@ -77,27 +100,13 @@
 // Once a transaction has been committed or aborted, subsequent method calls
 // will fail with no effect.
 type Transaction interface {
-	StoreReadWriter
+	SnapshotOrTransaction
+	StoreWriter
 
 	// Commit commits the transaction.
 	// Fails if writes from outside this transaction conflict with reads from
 	// within this transaction.
 	Commit() error
-
-	// Abort aborts the transaction.
-	Abort() error
-}
-
-// Snapshot is a handle to particular state in time of a Store.
-//
-// All read operations are executed against a consistent view of Store commit
-// history. Snapshots don't acquire locks and thus don't block transactions.
-type Snapshot interface {
-	StoreReader
-
-	// Close closes the snapshot.
-	// Any subsequent method calls will fail.
-	Close() error
 }
 
 // Stream is an interface for iterating through a collection of key-value pairs.
diff --git a/x/ref/services/syncbase/store/test/snapshot.go b/x/ref/services/syncbase/store/test/snapshot.go
index 4dbee5f..04dee18 100644
--- a/x/ref/services/syncbase/store/test/snapshot.go
+++ b/x/ref/services/syncbase/store/test/snapshot.go
@@ -26,12 +26,12 @@
 	verifyAdvance(t, s, key1, value1)
 	verifyAdvance(t, s, nil, nil)
 
-	// Test functions after Close.
-	if err := snapshot.Close(); err != nil {
-		t.Fatalf("can't close the snapshot: %v", err)
+	// Test functions after Abort.
+	if err := snapshot.Abort(); err != nil {
+		t.Fatalf("can't abort the snapshot: %v", err)
 	}
-	expectedErrMsg := store.ErrMsgClosedSnapshot
-	verifyError(t, snapshot.Close(), verror.ErrCanceled.ID, expectedErrMsg)
+	expectedErrMsg := store.ErrMsgAbortedSnapshot
+	verifyError(t, snapshot.Abort(), verror.ErrCanceled.ID, expectedErrMsg)
 
 	_, err := snapshot.Get(key1, nil)
 	verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
diff --git a/x/ref/services/syncbase/store/test/store.go b/x/ref/services/syncbase/store/test/store.go
index 2d6d9a1..48022f9 100644
--- a/x/ref/services/syncbase/store/test/store.go
+++ b/x/ref/services/syncbase/store/test/store.go
@@ -137,7 +137,7 @@
 	s.verify(t, st)
 	for i := 0; i < len(states); i++ {
 		states[i].verify(t, snapshots[i])
-		snapshots[i].Close()
+		snapshots[i].Abort()
 	}
 }
 
@@ -230,7 +230,7 @@
 	}
 	for _, snapshot := range snapshots {
 		_, err := snapshot.Get(key1, nil)
-		verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgClosedSnapshot)
+		verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgAbortedSnapshot)
 	}
 	for _, tx := range transactions {
 		_, err := tx.Get(key1, nil)
diff --git a/x/ref/services/syncbase/store/test/transaction.go b/x/ref/services/syncbase/store/test/transaction.go
index 3e1039b..6cf26e8 100644
--- a/x/ref/services/syncbase/store/test/transaction.go
+++ b/x/ref/services/syncbase/store/test/transaction.go
@@ -155,7 +155,7 @@
 		go func(idx int) {
 			rnd := rand.New(rand.NewSource(239017 * int64(idx)))
 			perm := rnd.Perm(n)
-			if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+			if err := store.RunInTransaction(st, func(tx store.Transaction) error {
 				for j := 0; j <= m; j++ {
 					var keystr string
 					if j < m {
@@ -164,7 +164,7 @@
 						keystr = fmt.Sprintf("%05d", n)
 					}
 					key := []byte(keystr)
-					val, err := st.Get(key, nil)
+					val, err := tx.Get(key, nil)
 					if err != nil {
 						return fmt.Errorf("can't get key %q: %v", key, err)
 					}
@@ -178,7 +178,7 @@
 					} else {
 						newValue = intValue + int64(m)
 					}
-					if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
+					if err := tx.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
 						return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
 					}
 				}
diff --git a/x/ref/services/syncbase/store/util.go b/x/ref/services/syncbase/store/util.go
index aac934b..72c4beb 100644
--- a/x/ref/services/syncbase/store/util.go
+++ b/x/ref/services/syncbase/store/util.go
@@ -8,9 +8,13 @@
 	"v.io/v23/verror"
 )
 
+type SnapshotSpecImpl struct{}
+
+func (s *SnapshotSpecImpl) __snapshotSpec() {}
+
 // RunInTransaction runs the given fn in a transaction, managing retries and
 // commit/abort.
-func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
+func RunInTransaction(st Store, fn func(tx Transaction) error) error {
 	// TODO(rogulenko): Make the number of attempts configurable.
 	// TODO(rogulenko): Change the default number of attempts to 3. Currently,
 	// some storage engine tests fail when the number of attempts is that low.
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 3850d5b..121f594 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -222,9 +222,7 @@
 
 // endBatch marks the end of a given batch.  The batch information is persisted
 // to the store and removed from the temporary in-memory entry.
-func (s *syncService) endBatch(ctx *context.T, tx store.StoreReadWriter, btid, count uint64) error {
-	_ = tx.(store.Transaction)
-
+func (s *syncService) endBatch(ctx *context.T, tx store.Transaction, btid, count uint64) error {
 	s.batchesLock.Lock()
 	defer s.batchesLock.Unlock()
 
@@ -271,9 +269,7 @@
 //
 // The grafting structure is not needed when nodes are being added locally by
 // the Watcher, passing a nil grafting structure.
-func (s *syncService) addNode(ctx *context.T, tx store.StoreReadWriter, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error {
-	_ = tx.(store.Transaction)
-
+func (s *syncService) addNode(ctx *context.T, tx store.Transaction, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error {
 	if parents != nil {
 		if len(parents) > 2 {
 			return verror.New(verror.ErrInternal, ctx, "cannot have more than 2 parents")
@@ -380,9 +376,7 @@
 // to track DAG attachements during a sync operation.  It is not needed if the
 // parent linkage is due to a local change (from conflict resolution selecting
 // an existing version).
-func (s *syncService) addParent(ctx *context.T, tx store.StoreReadWriter, oid, version, parent string, graft graftMap) error {
-	_ = tx.(store.Transaction)
-
+func (s *syncService) addParent(ctx *context.T, tx store.Transaction, oid, version, parent string, graft graftMap) error {
 	if version == parent {
 		return verror.New(verror.ErrInternal, ctx, "object", oid, version, "cannot be its own parent")
 	}
@@ -447,9 +441,7 @@
 }
 
 // moveHead moves the object head node in the DAG.
-func moveHead(ctx *context.T, tx store.StoreReadWriter, oid, head string) error {
-	_ = tx.(store.Transaction)
-
+func moveHead(ctx *context.T, tx store.Transaction, oid, head string) error {
 	// Verify that the node exists.
 	if ok, err := hasNode(ctx, tx, oid, head); err != nil {
 		return err
@@ -565,7 +557,7 @@
 
 // getObjectGraft returns the graftInfo for an object ID.  If the graftMap is
 // nil, a nil graftInfo is returned because grafting is not being tracked.
-func getObjectGraftInfo(ctx *context.T, st store.StoreReader, graft graftMap, oid string) *graftInfo {
+func getObjectGraftInfo(ctx *context.T, sntx store.SnapshotOrTransaction, graft graftMap, oid string) *graftInfo {
 	if graft == nil {
 		return nil
 	}
@@ -580,7 +572,7 @@
 	}
 
 	// If the object has a head node, include it in the set of new heads.
-	if head, err := getHead(ctx, st, oid); err == nil {
+	if head, err := getHead(ctx, sntx, oid); err == nil {
 		info.newHeads[head] = true
 		info.oldHeadSnap = head
 	}
@@ -636,9 +628,7 @@
 // The batch set passed is used to track batches affected by the deletion of DAG
 // objects across multiple calls to prune().  It is later given to pruneDone()
 // to do GC on these batches.
-func prune(ctx *context.T, tx store.StoreReadWriter, oid, version string, batches batchSet, delLogRec func(ctx *context.T, tx store.StoreReadWriter, logrec string) error) error {
-	_ = tx.(store.Transaction)
-
+func prune(ctx *context.T, tx store.Transaction, oid, version string, batches batchSet, delLogRec func(ctx *context.T, tx store.Transaction, logrec string) error) error {
 	if batches == nil {
 		return verror.New(verror.ErrInternal, ctx, "missing batch set")
 	}
@@ -690,9 +680,7 @@
 // pruneDone is called when object pruning is finished within a single pass of
 // the sync garbage collector.  It updates the batch sets affected by objects
 // deleted by prune().
-func pruneDone(ctx *context.T, tx store.StoreReadWriter, batches batchSet) error {
-	_ = tx.(store.Transaction)
-
+func pruneDone(ctx *context.T, tx store.Transaction, batches batchSet) error {
 	// Update batch sets by removing the pruned objects from them.
 	for btid, pruneInfo := range batches {
 		info, err := getBatch(ctx, tx, btid)
@@ -734,9 +722,7 @@
 }
 
 // setNode stores the DAG node entry.
-func setNode(ctx *context.T, tx store.StoreReadWriter, oid, version string, node *dagNode) error {
-	_ = tx.(store.Transaction)
-
+func setNode(ctx *context.T, tx store.Transaction, oid, version string, node *dagNode) error {
 	if version == NoVersion {
 		return verror.New(verror.ErrInternal, ctx, "invalid version", version)
 	}
@@ -759,9 +745,7 @@
 }
 
 // delNode deletes the DAG node entry.
-func delNode(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
-	_ = tx.(store.Transaction)
-
+func delNode(ctx *context.T, tx store.Transaction, oid, version string) error {
 	if version == NoVersion {
 		return verror.New(verror.ErrInternal, ctx, "invalid version", version)
 	}
@@ -787,9 +771,7 @@
 }
 
 // setHead stores version as the DAG object head.
-func setHead(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
-	_ = tx.(store.Transaction)
-
+func setHead(ctx *context.T, tx store.Transaction, oid, version string) error {
 	if version == NoVersion {
 		return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version))
 	}
@@ -808,8 +790,7 @@
 }
 
 // delHead deletes the DAG object head.
-func delHead(ctx *context.T, tx store.StoreReadWriter, oid string) error {
-	_ = tx.(store.Transaction)
+func delHead(ctx *context.T, tx store.Transaction, oid string) error {
 	return util.Delete(ctx, tx, headKey(oid))
 }
 
@@ -819,9 +800,7 @@
 }
 
 // setBatch stores the DAG batch entry.
-func setBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64, info *batchInfo) error {
-	_ = tx.(store.Transaction)
-
+func setBatch(ctx *context.T, tx store.Transaction, btid uint64, info *batchInfo) error {
 	if btid == NoBatchId {
 		return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
 	}
@@ -844,9 +823,7 @@
 }
 
 // delBatch deletes the DAG batch entry.
-func delBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64) error {
-	_ = tx.(store.Transaction)
-
+func delBatch(ctx *context.T, tx store.Transaction, btid uint64) error {
 	if btid == NoBatchId {
 		return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
 	}
diff --git a/x/ref/services/syncbase/vsync/dag_test.go b/x/ref/services/syncbase/vsync/dag_test.go
index 3728635..9ef43f7 100644
--- a/x/ref/services/syncbase/vsync/dag_test.go
+++ b/x/ref/services/syncbase/vsync/dag_test.go
@@ -790,7 +790,7 @@
 		tx := st.NewTransaction()
 		del := 0
 		err := prune(nil, tx, oid, version, batches,
-			func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+			func(ctx *context.T, tx store.Transaction, lr string) error {
 				del++
 				return nil
 			})
@@ -872,7 +872,7 @@
 	batches := newBatchPruning()
 	tx := st.NewTransaction()
 	err := prune(nil, tx, oid, version, batches,
-		func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+		func(ctx *context.T, tx store.Transaction, lr string) error {
 			del++
 			if lr == "logrec-03" {
 				return fmt.Errorf("refuse to delete %s", lr)
@@ -1574,7 +1574,7 @@
 			t.Errorf("cannot getHead() on object %s: %v", oid, err)
 		}
 		err = prune(nil, tx, oid, head, batches,
-			func(ctx *context.T, itx store.StoreReadWriter, lr string) error {
+			func(ctx *context.T, itx store.Transaction, lr string) error {
 				return nil
 			})
 		if err != nil {
@@ -1614,7 +1614,7 @@
 
 	batches = newBatchPruning()
 	err = prune(nil, tx, oid_c, "3", batches,
-		func(ctx *context.T, itx store.StoreReadWriter, lr string) error {
+		func(ctx *context.T, itx store.Transaction, lr string) error {
 			return nil
 		})
 	if err != nil {
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 2211da4..8d5b384 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -386,9 +386,9 @@
 
 // TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
 // loop.
-func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
+func getNextLogRec(ctx *context.T, st store.Store, dev uint64, r *genRange) (*localLogRec, error) {
 	for i := r.cur; i <= r.max; i++ {
-		rec, err := getLogRec(ctx, sn, dev, i)
+		rec, err := getLogRec(ctx, st, dev, i)
 		if err == nil {
 			r.cur = i + 1
 			return rec, nil
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index 6b017ee..ebb55f6 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -19,7 +19,7 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
-
+	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/verror"
@@ -119,16 +119,19 @@
 	}
 
 	data := &syncData{}
-	if err := util.Get(ctx, sv.St(), s.stKey(), data); err != nil {
-		if verror.ErrorID(err) != verror.ErrNoExist.ID {
-			return nil, err
+	if err := store.RunInTransaction(sv.St(), func(tx store.Transaction) error {
+		if err := util.Get(ctx, sv.St(), s.stKey(), data); err != nil {
+			if verror.ErrorID(err) != verror.ErrNoExist.ID {
+				return err
+			}
+			// First invocation of vsync.New().
+			// TODO(sadovsky): Maybe move guid generation and storage to serviceData.
+			data.Id = rand64()
+			return util.Put(ctx, tx, s.stKey(), data)
 		}
-		// First invocation of vsync.New().
-		// TODO(sadovsky): Maybe move guid generation and storage to serviceData.
-		data.Id = rand64()
-		if err := util.Put(ctx, sv.St(), s.stKey(), data); err != nil {
-			return nil, err
-		}
+		return nil
+	}); err != nil {
+		return nil, err
 	}
 
 	// data.Id is now guaranteed to be initialized.
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index 0c1adce..8195559 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -269,8 +269,7 @@
 }
 
 // putDbSyncState persists the sync state object for a given Database.
-func putDbSyncState(ctx *context.T, tx store.StoreReadWriter, ds *dbSyncState) error {
-	_ = tx.(store.Transaction)
+func putDbSyncState(ctx *context.T, tx store.Transaction, ds *dbSyncState) error {
 	return util.Put(ctx, tx, dbSyncStateKey(), ds)
 }
 
@@ -331,8 +330,7 @@
 }
 
 // putLogRec stores the log record.
-func putLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
-	_ = tx.(store.Transaction)
+func putLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
 	return util.Put(ctx, tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec)
 }
 
@@ -346,7 +344,6 @@
 }
 
 // delLogRec deletes the log record for a given (devid, gen).
-func delLogRec(ctx *context.T, tx store.StoreReadWriter, id, gen uint64) error {
-	_ = tx.(store.Transaction)
+func delLogRec(ctx *context.T, tx store.Transaction, id, gen uint64) error {
 	return util.Delete(ctx, tx, logRecKey(id, gen))
 }
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 1b158f6..2f341e4 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -110,9 +110,7 @@
 }
 
 // addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(ctx *context.T, tx store.StoreReadWriter, sg *interfaces.SyncGroup) error {
-	_ = tx.(store.Transaction)
-
+func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error {
 	// Verify SyncGroup before storing it since it may have been received
 	// from a remote peer.
 	if err := verifySyncGroup(ctx, sg); err != nil {
@@ -170,9 +168,7 @@
 }
 
 // delSyncGroupById deletes the SyncGroup given its ID.
-func delSyncGroupById(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId) error {
-	_ = tx.(store.Transaction)
-
+func delSyncGroupById(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
 	sg, err := getSyncGroupById(ctx, tx, gid)
 	if err != nil {
 		return err
@@ -184,9 +180,7 @@
 }
 
 // delSyncGroupByName deletes the SyncGroup given its name.
-func delSyncGroupByName(ctx *context.T, tx store.StoreReadWriter, name string) error {
-	_ = tx.(store.Transaction)
-
+func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
 	gid, err := getSyncGroupId(ctx, tx, name)
 	if err != nil {
 		return err
@@ -218,7 +212,7 @@
 		// For each database, fetch its SyncGroup data entries by scanning their
 		// prefix range.  Use a database snapshot for the scan.
 		sn := st.NewSnapshot()
-		defer sn.Close()
+		defer sn.Abort()
 		name := appDbName(appName, dbName)
 
 		forEachSyncGroup(sn, func(sg *interfaces.SyncGroup) bool {
@@ -317,10 +311,10 @@
 }
 
 // hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) (bool, error) {
+func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
 	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
 	var sg interfaces.SyncGroup
-	if err := util.Get(nil, st, sgDataKey(gid), &sg); err != nil {
+	if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil {
 		if verror.ErrorID(err) == verror.ErrNoExist.ID {
 			err = nil
 		}
@@ -330,10 +324,10 @@
 }
 
 // hasSGNameEntry returns true if the SyncGroup name entry exists.
-func hasSGNameEntry(st store.StoreReader, name string) (bool, error) {
+func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
 	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
 	var gid interfaces.GroupId
-	if err := util.Get(nil, st, sgNameKey(name), &gid); err != nil {
+	if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil {
 		if verror.ErrorID(err) == verror.ErrNoExist.ID {
 			err = nil
 		}
@@ -343,14 +337,12 @@
 }
 
 // setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
-	_ = tx.(store.Transaction)
+func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
 	return util.Put(ctx, tx, sgDataKey(gid), sg)
 }
 
 // setSGNameEntry stores the SyncGroup name entry.
-func setSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string, gid interfaces.GroupId) error {
-	_ = tx.(store.Transaction)
+func setSGNameEntry(ctx *context.T, tx store.Transaction, name string, gid interfaces.GroupId) error {
 	return util.Put(ctx, tx, sgNameKey(name), gid)
 }
 
@@ -373,14 +365,12 @@
 }
 
 // delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId) error {
-	_ = tx.(store.Transaction)
+func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
 	return util.Delete(ctx, tx, sgDataKey(gid))
 }
 
 // delSGNameEntry deletes the SyncGroup name to ID mapping.
-func delSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string) error {
-	_ = tx.(store.Transaction)
+func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
 	return util.Delete(ctx, tx, sgNameKey(name))
 }
 
@@ -392,7 +382,7 @@
 	vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
 	defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
 
-	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
 			return err
@@ -452,7 +442,7 @@
 	var sg *interfaces.SyncGroup
 	nullSpec := wire.SyncGroupSpec{}
 
-	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
 			return err
@@ -509,7 +499,7 @@
 		return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
 	}
 
-	err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 
 		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
 
@@ -540,7 +530,7 @@
 	defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
 
 	sn := sd.db.St().NewSnapshot()
-	defer sn.Close()
+	defer sn.Abort()
 
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
@@ -573,7 +563,7 @@
 	defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
 
 	sn := sd.db.St().NewSnapshot()
-	defer sn.Close()
+	defer sn.Abort()
 
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
@@ -598,7 +588,7 @@
 	defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
 
 	sn := sd.db.St().NewSnapshot()
-	defer sn.Close()
+	defer sn.Abort()
 
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
@@ -622,7 +612,7 @@
 	vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
 	defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
 
-	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
 			return err
@@ -674,7 +664,7 @@
 
 	// Publish rejected. Persist that to avoid retrying in the
 	// future and to remember the split universe scenario.
-	err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Ensure SG still exists.
 		sg, err := getSyncGroupByName(ctx, tx, sgName)
 		if err != nil {
@@ -695,7 +685,7 @@
 // be time consuming.  Consider doing it asynchronously and letting the server
 // reply to the client earlier.  However it must happen within the scope of this
 // transaction (and its snapshot view).
-func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.StoreReadWriter, prefixes []string) error {
+func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.Transaction, prefixes []string) error {
 	if len(prefixes) == 0 {
 		return verror.New(verror.ErrInternal, ctx, "no prefixes specified")
 	}
@@ -790,7 +780,7 @@
 		return err
 	}
 
-	err = store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
+	err = store.RunInTransaction(st, func(tx store.Transaction) error {
 		localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
 
 		if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
@@ -857,7 +847,7 @@
 	}
 
 	var sg *interfaces.SyncGroup
-	err = store.RunInTransaction(dbSt, func(tx store.StoreReadWriter) error {
+	err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
 		var err error
 		sg, err = getSyncGroupById(ctx, tx, gid)
 		if err != nil {
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 8f5f60c..6d80317 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -169,7 +169,7 @@
 	// Transactional processing of the batch: convert these syncable log
 	// records to a batch of sync log records, filling their parent versions
 	// from the DAG head nodes.
-	err := store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
+	err := store.RunInTransaction(st, func(tx store.Transaction) error {
 		batch := make([]*localLogRec, 0, len(logs))
 		for _, entry := range logs {
 			if rec, err := convertLogRecord(ctx, tx, entry); err != nil {
@@ -193,9 +193,7 @@
 
 // processBatch applies a single batch of changes (object mutations) received
 // from watching a particular Database.
-func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, appBatch bool, totalCount uint64, tx store.StoreReadWriter) error {
-	_ = tx.(store.Transaction)
-
+func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, appBatch bool, totalCount uint64, tx store.Transaction) error {
 	count := uint64(len(batch))
 	if count == 0 {
 		return nil
@@ -247,7 +245,7 @@
 
 // processLocalLogRec processes a local log record by adding to the Database and
 // suitably updating the DAG metadata.
-func (s *syncService) processLocalLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
+func (s *syncService) processLocalLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
 	// Insert the new log record into the log.
 	if err := putLogRec(ctx, tx, rec); err != nil {
 		return err
@@ -361,8 +359,7 @@
 // simplify the store-to-sync interaction.  A deleted key would still have a
 // version and its value entry would encode the "deleted" flag, either in the
 // key or probably in a value wrapper that would contain other metadata.
-func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) (*localLogRec, error) {
-	_ = tx.(store.Transaction)
+func convertLogRecord(ctx *context.T, tx store.Transaction, logEnt *watchable.LogEntry) (*localLogRec, error) {
 	var rec *localLogRec
 	timestamp := logEnt.CommitTimestamp
 
@@ -404,9 +401,7 @@
 // newLocalLogRec creates a local sync log record given its information: key,
 // version, deletion flag, and timestamp.  It retrieves the current DAG head
 // for the key (if one exists) to use as its parent (previous) version.
-func newLocalLogRec(ctx *context.T, tx store.StoreReadWriter, key, version []byte, deleted bool, timestamp int64) *localLogRec {
-	_ = tx.(store.Transaction)
-
+func newLocalLogRec(ctx *context.T, tx store.Transaction, key, version []byte, deleted bool, timestamp int64) *localLogRec {
 	rec := localLogRec{}
 	oid := string(key)
 
@@ -484,8 +479,7 @@
 }
 
 // setResMark stores the watcher resume marker for a database.
-func setResMark(ctx *context.T, tx store.StoreReadWriter, resMark string) error {
-	_ = tx.(store.Transaction)
+func setResMark(ctx *context.T, tx store.Transaction, resMark string) error {
 	return util.Put(ctx, tx, resMarkKey(), resMark)
 }