Merge "internal storage engine: improve compile-time typing"
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index 5dd9cad..957f4ed 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -88,14 +88,11 @@
 	}
 	// Check perms.
 	sn := a.s.st.NewSnapshot()
-	closeSnapshot := func() error {
-		return sn.Close()
-	}
 	if err := util.GetWithAuth(ctx, call, sn, a.stKey(), &appData{}); err != nil {
-		closeSnapshot()
+		sn.Abort()
 		return nil, err
 	}
-	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+	return util.Glob(ctx, call, "*", sn, sn.Abort, util.JoinKeyParts(util.DbInfoPrefix, a.name))
 }
 
 ////////////////////////////////////////
@@ -155,13 +152,13 @@
 	// 1. Check appData perms, create dbInfo record.
 	rootDir, engine := a.rootDirForDb(dbName), a.s.opts.Engine
 	aData := &appData{}
-	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
 		// Check appData perms.
-		if err := util.GetWithAuth(ctx, call, st, a.stKey(), aData); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, a.stKey(), aData); err != nil {
 			return err
 		}
 		// Check for "database already exists".
-		if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if _, err := a.getDbInfo(ctx, tx, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -174,7 +171,7 @@
 			RootDir: rootDir,
 			Engine:  engine,
 		}
-		return a.putDbInfo(ctx, st, dbName, info)
+		return a.putDbInfo(ctx, tx, dbName, info)
 	}); err != nil {
 		return err
 	}
@@ -193,8 +190,8 @@
 	}
 
 	// 3. Flip dbInfo.Initialized to true.
-	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
+	if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
+		return a.updateDbInfo(ctx, tx, dbName, func(info *dbInfo) error {
 			info.Initialized = true
 			return nil
 		})
@@ -235,8 +232,8 @@
 	}
 
 	// 2. Flip dbInfo.Deleted to true.
-	if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
-		return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
+	if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
+		return a.updateDbInfo(ctx, tx, dbName, func(info *dbInfo) error {
 			info.Deleted = true
 			return nil
 		})
diff --git a/x/ref/services/syncbase/server/db_info.go b/x/ref/services/syncbase/server/db_info.go
index 39ffdc8..f750d57 100644
--- a/x/ref/services/syncbase/server/db_info.go
+++ b/x/ref/services/syncbase/server/db_info.go
@@ -24,33 +24,32 @@
 }
 
 // getDbInfo reads data from the storage engine.
-func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, sntx store.SnapshotOrTransaction, dbName string) (*dbInfo, error) {
 	info := &dbInfo{}
-	if err := util.Get(ctx, st, dbInfoStKey(a, dbName), info); err != nil {
+	if err := util.Get(ctx, sntx, dbInfoStKey(a, dbName), info); err != nil {
 		return nil, err
 	}
 	return info, nil
 }
 
 // putDbInfo writes data to the storage engine.
-func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
-	return util.Put(ctx, st, dbInfoStKey(a, dbName), info)
+func (a *app) putDbInfo(ctx *context.T, tx store.Transaction, dbName string, info *dbInfo) error {
+	return util.Put(ctx, tx, dbInfoStKey(a, dbName), info)
 }
 
 // delDbInfo deletes data from the storage engine.
-func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
-	return util.Delete(ctx, st, dbInfoStKey(a, dbName))
+func (a *app) delDbInfo(ctx *context.T, stw store.StoreWriter, dbName string) error {
+	return util.Delete(ctx, stw, dbInfoStKey(a, dbName))
 }
 
 // updateDbInfo performs a read-modify-write. fn should "modify" v.
-func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
-	_ = st.(store.Transaction) // panics on failure, as desired
-	info, err := a.getDbInfo(ctx, st, dbName)
+func (a *app) updateDbInfo(ctx *context.T, tx store.Transaction, dbName string, fn func(info *dbInfo) error) error {
+	info, err := a.getDbInfo(ctx, tx, dbName)
 	if err != nil {
 		return err
 	}
 	if err := fn(info); err != nil {
 		return err
 	}
-	return a.putDbInfo(ctx, st, dbName, info)
+	return a.putDbInfo(ctx, tx, dbName, info)
 }
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 3fe1b96..d4875c0 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -245,7 +245,7 @@
 			d.mu.Unlock()
 		}
 	} else {
-		if err = d.sn.Close(); err == nil {
+		if err = d.sn.Abort(); err == nil {
 			d.mu.Lock()
 			delete(d.sns, *d.batchId)
 			d.mu.Unlock()
@@ -276,13 +276,12 @@
 		}
 		return rs.Err()
 	}
-	var st store.StoreReader
+	var sntx store.SnapshotOrTransaction
 	if d.batchId != nil {
-		st = d.batchReader()
+		sntx = d.batchReader()
 	} else {
-		sn := d.st.NewSnapshot()
-		st = sn
-		defer sn.Close()
+		sntx = d.st.NewSnapshot()
+		defer sntx.Abort()
 	}
 	// queryDb implements query_db.Database
 	// which is needed by the query package's
@@ -291,7 +290,7 @@
 		ctx:  ctx,
 		call: call,
 		req:  d,
-		st:   st,
+		sntx: sntx,
 	}
 
 	return impl(query_exec.Exec(db, q))
@@ -349,14 +348,11 @@
 	}
 	// Check perms.
 	sn := d.st.NewSnapshot()
-	closeSnapshot := func() error {
-		return sn.Close()
-	}
 	if err := util.GetWithAuth(ctx, call, sn, d.stKey(), &databaseData{}); err != nil {
-		closeSnapshot()
+		sn.Abort()
 		return nil, err
 	}
-	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
+	return util.Glob(ctx, call, "*", sn, sn.Abort, util.TablePrefix)
 }
 
 ////////////////////////////////////////
@@ -421,9 +417,9 @@
 	if !d.exists {
 		vlog.Fatalf("database %q does not exist", d.name)
 	}
-	return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(d.st, func(tx store.Transaction) error {
 		data := &databaseData{}
-		return util.UpdateWithAuth(ctx, call, st, d.stKey(), data, func() error {
+		return util.UpdateWithAuth(ctx, call, tx, d.stKey(), data, func() error {
 			if err := util.CheckVersion(ctx, version, data.Version); err != nil {
 				return err
 			}
@@ -446,7 +442,7 @@
 	ctx  *context.T
 	call wire.DatabaseExecServerCall
 	req  *databaseReq
-	st   store.StoreReader
+	sntx store.SnapshotOrTransaction
 }
 
 func (db *queryDb) GetContext() *context.T {
@@ -462,7 +458,7 @@
 		},
 	}
 	// Now that we have a table, we need to check permissions.
-	if err := util.GetWithAuth(db.ctx, db.call, db.st, tDb.req.stKey(), &tableData{}); err != nil {
+	if err := util.GetWithAuth(db.ctx, db.call, db.sntx, tDb.req.stKey(), &tableData{}); err != nil {
 		return nil, err
 	}
 	return tDb, nil
@@ -478,7 +474,7 @@
 	for _, keyRange := range keyRanges {
 		// TODO(jkline): For now, acquire all of the streams at once to minimize the race condition.
 		//               Need a way to Scan multiple ranges at the same state of uncommitted changes.
-		streams = append(streams, t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
+		streams = append(streams, t.qdb.sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
 	}
 	return &kvs{
 		t:        t,
@@ -568,7 +564,7 @@
 	return util.DatabasePrefix
 }
 
-func (d *databaseReq) batchReader() store.StoreReader {
+func (d *databaseReq) batchReader() store.SnapshotOrTransaction {
 	if d.batchId == nil {
 		return nil
 	} else if d.sn != nil {
@@ -578,7 +574,7 @@
 	}
 }
 
-func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
+func (d *databaseReq) batchTransaction() (store.Transaction, error) {
 	if d.batchId == nil {
 		return nil, nil
 	} else if d.tx != nil {
diff --git a/x/ref/services/syncbase/server/nosql/database_sm.go b/x/ref/services/syncbase/server/nosql/database_sm.go
index 5d1239e..3c87a6b 100644
--- a/x/ref/services/syncbase/server/nosql/database_sm.go
+++ b/x/ref/services/syncbase/server/nosql/database_sm.go
@@ -41,9 +41,9 @@
 	}
 
 	// Check permissions on Database and store schema metadata.
-	return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(d.st, func(tx store.Transaction) error {
 		dbData := databaseData{}
-		return util.UpdateWithAuth(ctx, call, st, d.stKey(), &dbData, func() error {
+		return util.UpdateWithAuth(ctx, call, tx, d.stKey(), &dbData, func() error {
 			// NOTE: For now we expect the client to not issue multiple
 			// concurrent SetSchemaMetadata calls.
 			dbData.SchemaMetadata = &metadata
diff --git a/x/ref/services/syncbase/server/nosql/row.go b/x/ref/services/syncbase/server/nosql/row.go
index 62a5f3a..a6fbf9d 100644
--- a/x/ref/services/syncbase/server/nosql/row.go
+++ b/x/ref/services/syncbase/server/nosql/row.go
@@ -32,35 +32,33 @@
 }
 
 func (r *rowReq) Get(ctx *context.T, call rpc.ServerCall, schemaVersion int32) ([]byte, error) {
-	impl := func(st store.StoreReader) ([]byte, error) {
+	impl := func(sntx store.SnapshotOrTransaction) ([]byte, error) {
 		if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return []byte{}, err
 		}
-		return r.get(ctx, call, st)
+		return r.get(ctx, call, sntx)
 	}
-	var st store.StoreReader
 	if r.t.d.batchId != nil {
-		st = r.t.d.batchReader()
+		return impl(r.t.d.batchReader())
 	} else {
 		sn := r.t.d.st.NewSnapshot()
-		st = sn
-		defer sn.Close()
+		defer sn.Abort()
+		return impl(sn)
 	}
-	return impl(st)
 }
 
 func (r *rowReq) Put(ctx *context.T, call rpc.ServerCall, schemaVersion int32, value []byte) error {
-	impl := func(st store.StoreReadWriter) error {
+	impl := func(tx store.Transaction) error {
 		if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return err
 		}
-		return r.put(ctx, call, st, value)
+		return r.put(ctx, call, tx, value)
 	}
 	if r.t.d.batchId != nil {
-		if st, err := r.t.d.batchReadWriter(); err != nil {
+		if tx, err := r.t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(r.t.d.st, impl)
@@ -68,17 +66,17 @@
 }
 
 func (r *rowReq) Delete(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
-	impl := func(st store.StoreReadWriter) error {
+	impl := func(tx store.Transaction) error {
 		if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return err
 		}
-		return r.delete(ctx, call, st)
+		return r.delete(ctx, call, tx)
 	}
 	if r.t.d.batchId != nil {
-		if st, err := r.t.d.batchReadWriter(); err != nil {
+		if tx, err := r.t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(r.t.d.st, impl)
@@ -98,17 +96,17 @@
 
 // checkAccess checks that this row's table exists in the database, and performs
 // an authorization check.
-func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
-	return r.t.checkAccess(ctx, call, st, r.key)
+func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction) error {
+	return r.t.checkAccess(ctx, call, sntx, r.key)
 }
 
 // get reads data from the storage engine.
 // Performs authorization check.
-func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
-	if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction) ([]byte, error) {
+	if err := r.checkAccess(ctx, call, sntx); err != nil {
 		return nil, err
 	}
-	value, err := st.Get([]byte(r.stKey()), nil)
+	value, err := sntx.Get([]byte(r.stKey()), nil)
 	if err != nil {
 		if verror.ErrorID(err) == store.ErrUnknownKey.ID {
 			return nil, verror.New(verror.ErrNoExist, ctx, r.stKey())
@@ -120,11 +118,11 @@
 
 // put writes data to the storage engine.
 // Performs authorization check.
-func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
-	if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, tx store.Transaction, value []byte) error {
+	if err := r.checkAccess(ctx, call, tx); err != nil {
 		return err
 	}
-	if err := st.Put([]byte(r.stKey()), value); err != nil {
+	if err := tx.Put([]byte(r.stKey()), value); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
 	return nil
@@ -132,11 +130,11 @@
 
 // delete deletes data from the storage engine.
 // Performs authorization check.
-func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
-	if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, tx store.Transaction) error {
+	if err := r.checkAccess(ctx, call, tx); err != nil {
 		return err
 	}
-	if err := st.Delete([]byte(r.stKey())); err != nil {
+	if err := tx.Delete([]byte(r.stKey())); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
 	return nil
diff --git a/x/ref/services/syncbase/server/nosql/table.go b/x/ref/services/syncbase/server/nosql/table.go
index 7b371e0..558a1c1 100644
--- a/x/ref/services/syncbase/server/nosql/table.go
+++ b/x/ref/services/syncbase/server/nosql/table.go
@@ -37,14 +37,14 @@
 	if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 		return err
 	}
-	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
 		// Check databaseData perms.
 		dData := &databaseData{}
-		if err := util.GetWithAuth(ctx, call, st, t.d.stKey(), dData); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, t.d.stKey(), dData); err != nil {
 			return err
 		}
 		// Check for "table already exists".
-		if err := util.Get(ctx, st, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.Get(ctx, tx, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -59,7 +59,7 @@
 			Name:  t.name,
 			Perms: perms,
 		}
-		return util.Put(ctx, st, t.stKey(), data)
+		return util.Put(ctx, tx, t.stKey(), data)
 	})
 }
 
@@ -70,16 +70,16 @@
 	if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 		return err
 	}
-	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
 		// Read-check-delete tableData.
-		if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, t.stKey(), &tableData{}); err != nil {
 			if verror.ErrorID(err) == verror.ErrNoExist.ID {
 				return nil // delete is idempotent
 			}
 			return err
 		}
 		// TODO(sadovsky): Delete all rows in this table.
-		return util.Delete(ctx, st, t.stKey())
+		return util.Delete(ctx, tx, t.stKey())
 	})
 }
 
@@ -91,9 +91,9 @@
 }
 
 func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, schemaVersion int32, start, limit []byte) error {
-	impl := func(st store.StoreReadWriter) error {
+	impl := func(tx store.Transaction) error {
 		// Check for table-level access before doing a scan.
-		if err := t.checkAccess(ctx, call, st, ""); err != nil {
+		if err := t.checkAccess(ctx, call, tx, ""); err != nil {
 			return err
 		}
 		// Check if the db schema version and the version provided by client
@@ -101,21 +101,21 @@
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return err
 		}
-		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+		it := tx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
 		key := []byte{}
 		for it.Advance() {
 			key = it.Key(key)
 			// Check perms.
 			parts := util.SplitKeyParts(string(key))
 			externalKey := parts[len(parts)-1]
-			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+			if err := t.checkAccess(ctx, call, tx, externalKey); err != nil {
 				// TODO(rogulenko): Revisit this behavior. Probably we should
 				// delete all rows that we have access to.
 				it.Cancel()
 				return err
 			}
 			// Delete the key-value pair.
-			if err := st.Delete(key); err != nil {
+			if err := tx.Delete(key); err != nil {
 				return verror.New(verror.ErrInternal, ctx, err)
 			}
 		}
@@ -125,10 +125,10 @@
 		return nil
 	}
 	if t.d.batchId != nil {
-		if st, err := t.d.batchReadWriter(); err != nil {
+		if tx, err := t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(t.d.st, impl)
@@ -136,15 +136,15 @@
 }
 
 func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, schemaVersion int32, start, limit []byte) error {
-	impl := func(st store.StoreReader) error {
+	impl := func(sntx store.SnapshotOrTransaction) error {
 		// Check for table-level access before doing a scan.
-		if err := t.checkAccess(ctx, call, st, ""); err != nil {
+		if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
 			return err
 		}
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return err
 		}
-		it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+		it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
 		sender := call.SendStream()
 		key, value := []byte{}, []byte{}
 		for it.Advance() {
@@ -152,7 +152,7 @@
 			// Check perms.
 			parts := util.SplitKeyParts(string(key))
 			externalKey := parts[len(parts)-1]
-			if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+			if err := t.checkAccess(ctx, call, sntx, externalKey); err != nil {
 				it.Cancel()
 				return err
 			}
@@ -163,28 +163,26 @@
 		}
 		return nil
 	}
-	var st store.StoreReader
 	if t.d.batchId != nil {
-		st = t.d.batchReader()
+		return impl(t.d.batchReader())
 	} else {
-		sn := t.d.st.NewSnapshot()
-		st = sn
-		defer sn.Close()
+		sntx := t.d.st.NewSnapshot()
+		defer sntx.Abort()
+		return impl(sntx)
 	}
-	return impl(st)
 }
 
 func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, key string) ([]wire.PrefixPermissions, error) {
-	impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
+	impl := func(sntx store.SnapshotOrTransaction) ([]wire.PrefixPermissions, error) {
 		// Check permissions only at table level.
-		if err := t.checkAccess(ctx, call, st, ""); err != nil {
+		if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
 			return nil, err
 		}
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
 			return nil, err
 		}
 		// Get the most specific permissions object.
-		prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+		prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
 		if err != nil {
 			return nil, err
 		}
@@ -192,27 +190,25 @@
 		// Collect all parent permissions objects all the way up to the table level.
 		for prefix != "" {
 			prefix = prefixPerms.Parent
-			if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+			if prefixPerms, err = t.permsForPrefix(ctx, sntx, prefixPerms.Parent); err != nil {
 				return nil, err
 			}
 			result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
 		}
 		return result, nil
 	}
-	var st store.StoreReader
 	if t.d.batchId != nil {
-		st = t.d.batchReader()
+		return impl(t.d.batchReader())
 	} else {
-		sn := t.d.st.NewSnapshot()
-		st = sn
-		defer sn.Close()
+		sntx := t.d.st.NewSnapshot()
+		defer sntx.Abort()
+		return impl(sntx)
 	}
-	return impl(st)
 }
 
 func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, prefix string, perms access.Permissions) error {
-	impl := func(st store.StoreReadWriter) error {
-		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+	impl := func(tx store.Transaction) error {
+		if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
 			return err
 		}
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
@@ -220,18 +216,18 @@
 		}
 		// Concurrent transactions that touch this table should fail with
 		// ErrConcurrentTransaction when this transaction commits.
-		if err := t.lock(ctx, st); err != nil {
+		if err := t.lock(ctx, tx); err != nil {
 			return err
 		}
 		if prefix == "" {
 			data := &tableData{}
-			return util.UpdateWithAuth(ctx, call, st, t.stKey(), data, func() error {
+			return util.UpdateWithAuth(ctx, call, tx, t.stKey(), data, func() error {
 				data.Perms = perms
 				return nil
 			})
 		}
 		// Get the most specific permissions object.
-		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
 		if err != nil {
 			return err
 		}
@@ -240,7 +236,7 @@
 		// parents for all children of the prefix to the node corresponding to
 		// the prefix.
 		if parent != prefix {
-			if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+			if err := t.updateParentRefs(ctx, tx, prefix, prefix); err != nil {
 				return err
 			}
 		} else {
@@ -250,16 +246,16 @@
 		stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
 		prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
 		// Put the (prefix, perms) pair to the database.
-		if err := util.Put(ctx, st, stPrefix, prefixPerms); err != nil {
+		if err := util.Put(ctx, tx, stPrefix, prefixPerms); err != nil {
 			return err
 		}
-		return util.Put(ctx, st, stPrefixLimit, prefixPerms)
+		return util.Put(ctx, tx, stPrefixLimit, prefixPerms)
 	}
 	if t.d.batchId != nil {
-		if st, err := t.d.batchReadWriter(); err != nil {
+		if tx, err := t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(t.d.st, impl)
@@ -270,8 +266,8 @@
 	if prefix == "" {
 		return verror.New(verror.ErrBadArg, ctx, prefix)
 	}
-	impl := func(st store.StoreReadWriter) error {
-		if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+	impl := func(tx store.Transaction) error {
+		if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
 			return err
 		}
 		if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
@@ -279,11 +275,11 @@
 		}
 		// Concurrent transactions that touch this table should fail with
 		// ErrConcurrentTransaction when this transaction commits.
-		if err := t.lock(ctx, st); err != nil {
+		if err := t.lock(ctx, tx); err != nil {
 			return err
 		}
 		// Get the most specific permissions object.
-		parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+		parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
 		if err != nil {
 			return err
 		}
@@ -295,21 +291,21 @@
 		// We need to delete the node corresponding to the prefix from the prefix
 		// permissions tree. We do it by updating parents for all children of the
 		// prefix to the parent of the node corresponding to the prefix.
-		if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+		if err := t.updateParentRefs(ctx, tx, prefix, prefixPerms.Parent); err != nil {
 			return err
 		}
 		stPrefix := []byte(t.prefixPermsKey(prefix))
 		stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
-		if err := st.Delete(stPrefix); err != nil {
+		if err := tx.Delete(stPrefix); err != nil {
 			return err
 		}
-		return st.Delete(stPrefixLimit)
+		return tx.Delete(stPrefixLimit)
 	}
 	if t.d.batchId != nil {
-		if st, err := t.d.batchReadWriter(); err != nil {
+		if tx, err := t.d.batchTransaction(); err != nil {
 			return err
 		} else {
-			return impl(st)
+			return impl(tx)
 		}
 	} else {
 		return store.RunInTransaction(t.d.st, impl)
@@ -317,30 +313,23 @@
 }
 
 func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
-	impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
+	impl := func(sntx store.SnapshotOrTransaction, closeSntx func() error) (<-chan string, error) {
 		// Check perms.
-		if err := t.checkAccess(ctx, call, st, ""); err != nil {
-			closeStoreReader()
+		if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
+			closeSntx()
 			return nil, err
 		}
 		// TODO(rogulenko): Check prefix permissions for children.
-		return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
+		return util.Glob(ctx, call, "*", sntx, closeSntx, util.JoinKeyParts(util.RowPrefix, t.name))
 	}
-	var st store.StoreReader
-	var closeStoreReader func() error
 	if t.d.batchId != nil {
-		st = t.d.batchReader()
-		closeStoreReader = func() error {
+		return impl(t.d.batchReader(), func() error {
 			return nil
-		}
+		})
 	} else {
 		sn := t.d.st.NewSnapshot()
-		st = sn
-		closeStoreReader = func() error {
-			return sn.Close()
-		}
+		return impl(sn, sn.Abort)
 	}
-	return impl(st, closeStoreReader)
 }
 
 ////////////////////////////////////////
@@ -356,11 +345,11 @@
 
 // updateParentRefs updates the parent for all children of the given
 // prefix to newParent.
-func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+func (t *tableReq) updateParentRefs(ctx *context.T, tx store.Transaction, prefix, newParent string) error {
 	stPrefix := []byte(t.prefixPermsKey(prefix))
 	stPrefixStart := append(stPrefix, 0)
 	stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
-	it := st.Scan(stPrefixStart, stPrefixLimit)
+	it := tx.Scan(stPrefixStart, stPrefixLimit)
 	var key, value []byte
 	for it.Advance() {
 		key, value = it.Key(key), it.Value(value)
@@ -370,7 +359,7 @@
 			return verror.New(verror.ErrInternal, ctx, err)
 		}
 		prefixPerms.Parent = newParent
-		if err := util.Put(ctx, st, string(key), prefixPerms); err != nil {
+		if err := util.Put(ctx, tx, string(key), prefixPerms); err != nil {
 			it.Cancel()
 			return err
 		}
@@ -393,12 +382,12 @@
 // TODO(rogulenko): Revisit this behavior to provide more granularity.
 // One option is to add a prefix and its parent to the write set of the current
 // transaction when the permissions object for that prefix is updated.
-func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+func (t *tableReq) lock(ctx *context.T, tx store.Transaction) error {
 	var data tableData
-	if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
+	if err := util.Get(ctx, tx, t.stKey(), &data); err != nil {
 		return err
 	}
-	return util.Put(ctx, st, t.stKey(), data)
+	return util.Put(ctx, tx, t.stKey(), data)
 }
 
 // checkAccess checks that this table exists in the database, and performs
@@ -407,13 +396,13 @@
 // TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
 // access check to be a check for "Resolve", i.e. also check access to
 // service, app and database.
-func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
-	prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction, key string) error {
+	prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
 	if err != nil {
 		return err
 	}
 	if prefix != "" {
-		if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
+		if err := util.GetWithAuth(ctx, call, sntx, t.stKey(), &tableData{}); err != nil {
 			return err
 		}
 	}
@@ -448,10 +437,10 @@
 //   proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
 //   K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
 //   prefix of K; in this case line 3 returns correct result.
-func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
-	it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+func (t *tableReq) permsForKey(ctx *context.T, sntx store.SnapshotOrTransaction, key string) (string, stPrefixPerms, error) {
+	it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
 	if !it.Advance() {
-		prefixPerms, err := t.permsForPrefix(ctx, st, "")
+		prefixPerms, err := t.permsForPrefix(ctx, sntx, "")
 		return "", prefixPerms, err
 	}
 	defer it.Cancel()
@@ -465,22 +454,22 @@
 	if strings.HasPrefix(key, prefix) {
 		return prefix, prefixPerms, nil
 	}
-	prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+	prefixPerms, err := t.permsForPrefix(ctx, sntx, prefixPerms.Parent)
 	return prefixPerms.Parent, prefixPerms, err
 }
 
 // permsForPrefix returns the permissions object associated with the
 // provided prefix.
-func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+func (t *tableReq) permsForPrefix(ctx *context.T, sntx store.SnapshotOrTransaction, prefix string) (stPrefixPerms, error) {
 	if prefix == "" {
 		var data tableData
-		if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
+		if err := util.Get(ctx, sntx, t.stKey(), &data); err != nil {
 			return stPrefixPerms{}, err
 		}
 		return stPrefixPerms{Perms: data.Perms}, nil
 	}
 	var prefixPerms stPrefixPerms
-	if err := util.Get(ctx, st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+	if err := util.Get(ctx, sntx, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
 		return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
 	}
 	return prefixPerms, nil
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index 8ed1480..5b87252 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -139,9 +139,9 @@
 // RPC methods
 
 func (s *service) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
-	return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(s.st, func(tx store.Transaction) error {
 		data := &serviceData{}
-		return util.UpdateWithAuth(ctx, call, st, s.stKey(), data, func() error {
+		return util.UpdateWithAuth(ctx, call, tx, s.stKey(), data, func() error {
 			if err := util.CheckVersion(ctx, version, data.Version); err != nil {
 				return err
 			}
@@ -163,14 +163,11 @@
 func (s *service) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	// Check perms.
 	sn := s.st.NewSnapshot()
-	closeSnapshot := func() error {
-		return sn.Close()
-	}
 	if err := util.GetWithAuth(ctx, call, sn, s.stKey(), &serviceData{}); err != nil {
-		closeSnapshot()
+		sn.Abort()
 		return nil, err
 	}
-	return util.Glob(ctx, call, "*", sn, closeSnapshot, util.AppPrefix)
+	return util.Glob(ctx, call, "*", sn, sn.Abort, util.AppPrefix)
 }
 
 ////////////////////////////////////////
@@ -225,14 +222,14 @@
 		dbs:    make(map[string]interfaces.Database),
 	}
 
-	if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(s.st, func(tx store.Transaction) error {
 		// Check serviceData perms.
 		sData := &serviceData{}
-		if err := util.GetWithAuth(ctx, call, st, s.stKey(), sData); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, s.stKey(), sData); err != nil {
 			return err
 		}
 		// Check for "app already exists".
-		if err := util.Get(ctx, st, a.stKey(), &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+		if err := util.Get(ctx, tx, a.stKey(), &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 			if err != nil {
 				return err
 			}
@@ -246,7 +243,7 @@
 			Name:  appName,
 			Perms: perms,
 		}
-		return util.Put(ctx, st, a.stKey(), data)
+		return util.Put(ctx, tx, a.stKey(), data)
 	}); err != nil {
 		return err
 	}
@@ -263,16 +260,16 @@
 		return nil // delete is idempotent
 	}
 
-	if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(s.st, func(tx store.Transaction) error {
 		// Read-check-delete appData.
-		if err := util.GetWithAuth(ctx, call, st, a.stKey(), &appData{}); err != nil {
+		if err := util.GetWithAuth(ctx, call, tx, a.stKey(), &appData{}); err != nil {
 			if verror.ErrorID(err) == verror.ErrNoExist.ID {
 				return nil // delete is idempotent
 			}
 			return err
 		}
 		// TODO(sadovsky): Delete all databases in this app.
-		return util.Delete(ctx, st, a.stKey())
+		return util.Delete(ctx, tx, a.stKey())
 	}); err != nil {
 		return err
 	}
@@ -288,9 +285,9 @@
 	if !ok {
 		return verror.New(verror.ErrNoExist, ctx, appName)
 	}
-	return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(s.st, func(tx store.Transaction) error {
 		data := &appData{}
-		return util.UpdateWithAuth(ctx, call, st, a.stKey(), data, func() error {
+		return util.UpdateWithAuth(ctx, call, tx, a.stKey(), data, func() error {
 			if err := util.CheckVersion(ctx, version, data.Version); err != nil {
 				return err
 			}
diff --git a/x/ref/services/syncbase/server/util/glob.go b/x/ref/services/syncbase/server/util/glob.go
index acdd74f..2cbc3f9 100644
--- a/x/ref/services/syncbase/server/util/glob.go
+++ b/x/ref/services/syncbase/server/util/glob.go
@@ -53,29 +53,29 @@
 	return res, nil
 }
 
-// Glob performs a glob. It calls closeStoreReader to close st.
+// Glob performs a glob. It calls closeSntx to close sntx.
 // TODO(sadovsky): Why do we make developers implement Glob differently from
 // other streaming RPCs? It's confusing that Glob must return immediately and
 // write its results to a channel, while other streaming RPC handlers must block
 // and write their results to the output stream. See nlacasse's TODO below, too.
-func Glob(ctx *context.T, call rpc.ServerCall, pattern string, st store.StoreReader, closeStoreReader func() error, stKeyPrefix string) (<-chan string, error) {
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sntx store.SnapshotOrTransaction, closeSntx func() error, stKeyPrefix string) (<-chan string, error) {
 	// TODO(sadovsky): Support glob with non-prefix pattern.
 	if _, err := glob.Parse(pattern); err != nil {
-		closeStoreReader()
+		closeSntx()
 		return nil, verror.New(verror.ErrBadArg, ctx, err)
 	}
 	prefix, err := globPatternToPrefix(pattern)
 	if err != nil {
-		closeStoreReader()
+		closeSntx()
 		if verror.ErrorID(err) == verror.ErrBadArg.ID {
 			return nil, verror.NewErrNotImplemented(ctx)
 		}
 		return nil, verror.New(verror.ErrInternal, ctx, err)
 	}
-	it := st.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+	it := sntx.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
 	ch := make(chan string)
 	go func() {
-		defer closeStoreReader()
+		defer closeSntx()
 		defer close(ch)
 		key := []byte{}
 		for it.Advance() {
diff --git a/x/ref/services/syncbase/server/util/store_util.go b/x/ref/services/syncbase/server/util/store_util.go
index 38fa8ce..b8f1905 100644
--- a/x/ref/services/syncbase/server/util/store_util.go
+++ b/x/ref/services/syncbase/server/util/store_util.go
@@ -64,21 +64,21 @@
 	return nil
 }
 
-// Put does st.Put(k, v) and wraps the returned error.
-func Put(ctx *context.T, st store.StoreWriter, k string, v interface{}) error {
+// Put does stw.Put(k, v) and wraps the returned error.
+func Put(ctx *context.T, stw store.StoreWriter, k string, v interface{}) error {
 	bytes, err := vom.Encode(v)
 	if err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
-	if err = st.Put([]byte(k), bytes); err != nil {
+	if err = stw.Put([]byte(k), bytes); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
 	return nil
 }
 
-// Delete does st.Delete(k, v) and wraps the returned error.
-func Delete(ctx *context.T, st store.StoreWriter, k string) error {
-	if err := st.Delete([]byte(k)); err != nil {
+// Delete does stw.Delete(k, v) and wraps the returned error.
+func Delete(ctx *context.T, stw store.StoreWriter, k string) error {
+	if err := stw.Delete([]byte(k)); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
 	return nil
@@ -87,15 +87,14 @@
 // UpdateWithAuth performs a read-modify-write.
 // Input v is populated by the "read" step. fn should "modify" v.
 // Performs an auth check as part of the "read" step.
-func UpdateWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, k string, v Permser, fn func() error) error {
-	_ = st.(store.Transaction) // panics on failure, as desired
-	if err := GetWithAuth(ctx, call, st, k, v); err != nil {
+func UpdateWithAuth(ctx *context.T, call rpc.ServerCall, tx store.Transaction, k string, v Permser, fn func() error) error {
+	if err := GetWithAuth(ctx, call, tx, k, v); err != nil {
 		return err
 	}
 	if err := fn(); err != nil {
 		return err
 	}
-	return Put(ctx, st, k, v)
+	return Put(ctx, tx, k, v)
 }
 
 // Wraps a call to Get and returns true if Get found the object, false
diff --git a/x/ref/services/syncbase/server/watchable/snapshot.go b/x/ref/services/syncbase/server/watchable/snapshot.go
index ac3694f..37af4e1 100644
--- a/x/ref/services/syncbase/server/watchable/snapshot.go
+++ b/x/ref/services/syncbase/server/watchable/snapshot.go
@@ -9,6 +9,7 @@
 )
 
 type snapshot struct {
+	store.SnapshotSpecImpl
 	isn store.Snapshot
 	st  *wstore
 }
@@ -22,9 +23,9 @@
 	}
 }
 
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
-	return s.isn.Close()
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
+	return s.isn.Abort()
 }
 
 // Get implements the store.StoreReader interface.
diff --git a/x/ref/services/syncbase/server/watchable/store.go b/x/ref/services/syncbase/server/watchable/store.go
index 2b0037a..010643d 100644
--- a/x/ref/services/syncbase/server/watchable/store.go
+++ b/x/ref/services/syncbase/server/watchable/store.go
@@ -79,7 +79,7 @@
 		return st.ist.Get(key, valbuf)
 	}
 	sn := newSnapshot(st)
-	defer sn.Close()
+	defer sn.Abort()
 	return sn.Get(key, valbuf)
 }
 
@@ -95,16 +95,16 @@
 // Put implements the store.StoreWriter interface.
 func (st *wstore) Put(key, value []byte) error {
 	// Use watchable.Store transaction so this op gets logged.
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Put(key, value)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Put(key, value)
 	})
 }
 
 // Delete implements the store.StoreWriter interface.
 func (st *wstore) Delete(key []byte) error {
 	// Use watchable.Store transaction so this op gets logged.
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Delete(key)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Delete(key)
 	})
 }
 
diff --git a/x/ref/services/syncbase/server/watchable/stream.go b/x/ref/services/syncbase/server/watchable/stream.go
index 9c8c392..26502e1 100644
--- a/x/ref/services/syncbase/server/watchable/stream.go
+++ b/x/ref/services/syncbase/server/watchable/stream.go
@@ -13,7 +13,7 @@
 // stream streams keys and values for versioned records.
 type stream struct {
 	iit      store.Stream
-	st       store.StoreReader
+	sntx     store.SnapshotOrTransaction
 	mu       sync.Mutex
 	err      error
 	hasValue bool
@@ -25,11 +25,10 @@
 
 // newStreamVersioned creates a new stream. It assumes all records in range
 // [start, limit) are managed, i.e. versioned.
-func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
-	checkTransactionOrSnapshot(st)
+func newStreamVersioned(sntx store.SnapshotOrTransaction, start, limit []byte) *stream {
 	return &stream{
-		iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
-		st:  st,
+		iit:  sntx.Scan(makeVersionKey(start), makeVersionKey(limit)),
+		sntx: sntx,
 	}
 }
 
@@ -46,7 +45,7 @@
 	}
 	versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
 	s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
-	s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
+	s.value, s.err = s.sntx.Get(makeAtVersionKey(s.key, version), nil)
 	if s.err != nil {
 		return false
 	}
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 93b6b33..fb82f72 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -177,7 +177,7 @@
 // operations (create, join, leave, destroy) to notify the sync watcher of the
 // change at its proper position in the timeline (the transaction commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(ctx *context.T, tx store.StoreReadWriter, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
@@ -195,7 +195,7 @@
 // current keys and their versions to use when initializing the sync metadata
 // at the point in the timeline when these keys become syncable (at commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncSnapshotOp(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
@@ -227,10 +227,10 @@
 // GetVersion returns the current version of a managed key. This method is used
 // by the Sync module when the initiator is attempting to add new versions of
 // objects. Reading the version key is used for optimistic concurrency
-// control. At minimum, an object implementing the StoreReader interface is
+// control. At minimum, an object implementing the Transaction interface is
 // required since this is a Get operation.
-func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
-	switch w := st.(type) {
+func GetVersion(ctx *context.T, tx store.Transaction, key []byte) ([]byte, error) {
+	switch w := tx.(type) {
 	case *transaction:
 		w.mu.Lock()
 		defer w.mu.Unlock()
@@ -238,8 +238,6 @@
 			return nil, convertError(w.err)
 		}
 		return getVersion(w.itx, key)
-	case *wstore:
-		return getVersion(w.ist, key)
 	}
 	return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
 }
@@ -266,8 +264,8 @@
 // PutAtVersion puts a value for the managed key at the requested version. This
 // method is used by the Sync module exclusively when the initiator adds objects
 // with versions created on other Syncbases. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutAtVersion(ctx *context.T, tx store.StoreReadWriter, key, valbuf, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
 	wtx := tx.(*transaction)
 
 	wtx.mu.Lock()
@@ -285,8 +283,8 @@
 // version. This method is used by the Sync module exclusively when the
 // initiator selects which of the already stored versions (via PutAtVersion
 // calls) becomes the current version. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutVersion(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
 	wtx := tx.(*transaction)
 
 	wtx.mu.Lock()
diff --git a/x/ref/services/syncbase/server/watchable/transaction_test.go b/x/ref/services/syncbase/server/watchable/transaction_test.go
index 43b7b94..8b95536 100644
--- a/x/ref/services/syncbase/server/watchable/transaction_test.go
+++ b/x/ref/services/syncbase/server/watchable/transaction_test.go
@@ -33,17 +33,17 @@
 	updateVal: "val-b2",
 }
 
-func checkAndUpdate(st store.StoreReadWriter, data testData) error {
+func checkAndUpdate(tx store.Transaction, data testData) error {
 	// check and update data1
 	keyBytes := []byte(data.key)
-	val, err := st.Get(keyBytes, nil)
+	val, err := tx.Get(keyBytes, nil)
 	if err != nil {
 		return fmt.Errorf("can't get key %q: %v", data.key, err)
 	}
 	if !bytes.Equal(val, []byte(data.createVal)) {
 		return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
 	}
-	if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
+	if err := tx.Put(keyBytes, []byte(data.updateVal)); err != nil {
 		return fmt.Errorf("can't put {%q: %v}: %v", data.key, data.updateVal, err)
 	}
 	return nil
@@ -79,13 +79,13 @@
 	setMockSystemClock(wst1, mockClock)
 
 	// Create data in store
-	if err := store.RunInTransaction(wst1, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(wst1, func(tx store.Transaction) error {
 		// add data1
-		if err := st.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
+		if err := tx.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
 			return fmt.Errorf("can't put {%q: %v}: %v", data1.key, data1.createVal, err)
 		}
 		// add data2
-		if err := st.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
+		if err := tx.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
 			return fmt.Errorf("can't put {%q: %v}: %v", data2.key, data2.createVal, err)
 		}
 		return nil
@@ -110,11 +110,11 @@
 		t.Errorf("unexpected sequence number for update. seq for create: %d, seq for update: %d", seqForCreate, seqForUpdate)
 	}
 
-	if err := store.RunInTransaction(wst2, func(st store.StoreReadWriter) error {
-		if err := checkAndUpdate(st, data1); err != nil {
+	if err := store.RunInTransaction(wst2, func(tx store.Transaction) error {
+		if err := checkAndUpdate(tx, data1); err != nil {
 			return err
 		}
-		if err := checkAndUpdate(st, data2); err != nil {
+		if err := checkAndUpdate(tx, data2); err != nil {
 			return err
 		}
 		return nil
@@ -144,33 +144,33 @@
 		t.Fatalf("Wrap failed: %v", err)
 	}
 
-	if err := store.RunInTransaction(wst, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(wst, func(tx store.Transaction) error {
 		putKey, putVal := []byte("foo"), []byte("bar")
-		if err := st.Put(putKey, putVal); err != nil {
+		if err := tx.Put(putKey, putVal); err != nil {
 			return err
 		}
 		getKey := []byte("foo")
-		if getVal, err := st.Get(getKey, nil); err != nil {
+		if getVal, err := tx.Get(getKey, nil); err != nil {
 			return err
 		} else {
 			eq(t, getVal, putVal)
 		}
 		start, limit := []byte("aaa"), []byte("bbb")
-		st.Scan(start, limit)
+		tx.Scan(start, limit)
 		delKey := []byte("foo")
-		if err := st.Delete(delKey); err != nil {
+		if err := tx.Delete(delKey); err != nil {
 			return err
 		}
 		sgPrefixes := []string{"sga", "sgb"}
-		if err := AddSyncGroupOp(nil, st, sgPrefixes, false); err != nil {
+		if err := AddSyncGroupOp(nil, tx, sgPrefixes, false); err != nil {
 			return err
 		}
 		snKey, snVersion := []byte("aa"), []byte("123")
-		if err := AddSyncSnapshotOp(nil, st, snKey, snVersion); err != nil {
+		if err := AddSyncSnapshotOp(nil, tx, snKey, snVersion); err != nil {
 			return err
 		}
 		pvKey, pvVersion := []byte("pv"), []byte("456")
-		if err := PutVersion(nil, st, pvKey, pvVersion); err != nil {
+		if err := PutVersion(nil, tx, pvKey, pvVersion); err != nil {
 			return err
 		}
 		for _, buf := range [][]byte{putKey, putVal, getKey, start, limit, delKey, snKey, snVersion, pvKey, pvVersion} {
diff --git a/x/ref/services/syncbase/server/watchable/util.go b/x/ref/services/syncbase/server/watchable/util.go
index 3d08129..64f8eeb 100644
--- a/x/ref/services/syncbase/server/watchable/util.go
+++ b/x/ref/services/syncbase/server/watchable/util.go
@@ -50,21 +50,20 @@
 	return []byte(join(string(key), string(version)))
 }
 
-func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
-	return st.Get(makeVersionKey(key), nil)
+func getVersion(sntx store.SnapshotOrTransaction, key []byte) ([]byte, error) {
+	return sntx.Get(makeVersionKey(key), nil)
 }
 
 func getAtVersion(st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
 	return st.Get(makeAtVersionKey(key, version), valbuf)
 }
 
-func getVersioned(st store.StoreReader, key, valbuf []byte) ([]byte, error) {
-	checkTransactionOrSnapshot(st)
-	version, err := getVersion(st, key)
+func getVersioned(sntx store.SnapshotOrTransaction, key, valbuf []byte) ([]byte, error) {
+	version, err := getVersion(sntx, key)
 	if err != nil {
 		return valbuf, err
 	}
-	return getAtVersion(st, key, valbuf, version)
+	return getAtVersion(sntx, key, valbuf, version)
 }
 
 func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
@@ -82,14 +81,6 @@
 	return tx.Delete(makeVersionKey(key))
 }
 
-func checkTransactionOrSnapshot(st store.StoreReader) {
-	_, isTransaction := st.(store.Transaction)
-	_, isSnapshot := st.(store.Snapshot)
-	if !isTransaction && !isSnapshot {
-		panic("neither a Transaction nor a Snapshot")
-	}
-}
-
 func getLogEntryKey(seq uint64) string {
 	// Note: MaxUint64 is 0xffffffffffffffff.
 	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
diff --git a/x/ref/services/syncbase/store/constants.go b/x/ref/services/syncbase/store/constants.go
index f97f2fd..26551fa 100644
--- a/x/ref/services/syncbase/store/constants.go
+++ b/x/ref/services/syncbase/store/constants.go
@@ -6,10 +6,10 @@
 
 // TODO(sadovsky): Maybe define verrors for these.
 const (
-	ErrMsgClosedStore    = "closed store"
-	ErrMsgClosedSnapshot = "closed snapshot"
-	ErrMsgCanceledStream = "canceled stream"
-	ErrMsgCommittedTxn   = "already called commit"
-	ErrMsgAbortedTxn     = "already called abort"
-	ErrMsgExpiredTxn     = "expired transaction"
+	ErrMsgClosedStore     = "closed store"
+	ErrMsgAbortedSnapshot = "aborted snapshot"
+	ErrMsgCanceledStream  = "canceled stream"
+	ErrMsgCommittedTxn    = "already called commit"
+	ErrMsgAbortedTxn      = "already called abort"
+	ErrMsgExpiredTxn      = "expired transaction"
 )
diff --git a/x/ref/services/syncbase/store/invalid_types.go b/x/ref/services/syncbase/store/invalid_types.go
index bb008e2..a230684 100644
--- a/x/ref/services/syncbase/store/invalid_types.go
+++ b/x/ref/services/syncbase/store/invalid_types.go
@@ -10,6 +10,7 @@
 
 // InvalidSnapshot is a Snapshot for which all methods return errors.
 type InvalidSnapshot struct {
+	SnapshotSpecImpl
 	Error error // returned by all methods
 }
 
@@ -32,8 +33,8 @@
 ////////////////////////////////////////////////////////////
 // InvalidSnapshot
 
-// Close implements the store.Snapshot interface.
-func (s *InvalidSnapshot) Close() error {
+// Abort implements the store.Snapshot interface.
+func (s *InvalidSnapshot) Abort() error {
 	return convertError(s.Error)
 }
 
diff --git a/x/ref/services/syncbase/store/leveldb/db.go b/x/ref/services/syncbase/store/leveldb/db.go
index 765103f..4f1ba6c 100644
--- a/x/ref/services/syncbase/store/leveldb/db.go
+++ b/x/ref/services/syncbase/store/leveldb/db.go
@@ -166,7 +166,7 @@
 	d.mu.RLock()
 	defer d.mu.RUnlock()
 	if d.err != nil {
-		return &store.InvalidSnapshot{d.err}
+		return &store.InvalidSnapshot{Error: d.err}
 	}
 	return newSnapshot(d, d.node)
 }
diff --git a/x/ref/services/syncbase/store/leveldb/snapshot.go b/x/ref/services/syncbase/store/leveldb/snapshot.go
index e43d67f..a07be02 100644
--- a/x/ref/services/syncbase/store/leveldb/snapshot.go
+++ b/x/ref/services/syncbase/store/leveldb/snapshot.go
@@ -16,6 +16,7 @@
 // snapshot is a wrapper around LevelDB snapshot that implements
 // the store.Snapshot interface.
 type snapshot struct {
+	store.SnapshotSpecImpl
 	// mu protects the state of the snapshot.
 	mu        sync.RWMutex
 	node      *store.ResourceNode
@@ -39,13 +40,13 @@
 		cOpts:     cOpts,
 	}
 	parent.AddChild(s.node, func() {
-		s.Close()
+		s.Abort()
 	})
 	return s
 }
 
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.err != nil {
@@ -56,7 +57,7 @@
 	s.cOpts = nil
 	C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
 	s.cSnapshot = nil
-	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
+	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedSnapshot)
 	return nil
 }
 
diff --git a/x/ref/services/syncbase/store/memstore/snapshot.go b/x/ref/services/syncbase/store/memstore/snapshot.go
index 43beffd..128d127 100644
--- a/x/ref/services/syncbase/store/memstore/snapshot.go
+++ b/x/ref/services/syncbase/store/memstore/snapshot.go
@@ -12,6 +12,7 @@
 )
 
 type snapshot struct {
+	store.SnapshotSpecImpl
 	mu   sync.Mutex
 	node *store.ResourceNode
 	data map[string][]byte
@@ -31,20 +32,20 @@
 		data: dataCopy,
 	}
 	parent.AddChild(s.node, func() {
-		s.Close()
+		s.Abort()
 	})
 	return s
 }
 
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.err != nil {
 		return convertError(s.err)
 	}
 	s.node.Close()
-	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
+	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedSnapshot)
 	return nil
 }
 
diff --git a/x/ref/services/syncbase/store/memstore/store.go b/x/ref/services/syncbase/store/memstore/store.go
index 7e3f816..f4ef644 100644
--- a/x/ref/services/syncbase/store/memstore/store.go
+++ b/x/ref/services/syncbase/store/memstore/store.go
@@ -73,15 +73,15 @@
 
 // Put implements the store.StoreWriter interface.
 func (st *memstore) Put(key, value []byte) error {
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Put(key, value)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Put(key, value)
 	})
 }
 
 // Delete implements the store.StoreWriter interface.
 func (st *memstore) Delete(key []byte) error {
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Delete(key)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Delete(key)
 	})
 }
 
@@ -101,7 +101,7 @@
 	st.mu.Lock()
 	defer st.mu.Unlock()
 	if st.err != nil {
-		return &store.InvalidSnapshot{st.err}
+		return &store.InvalidSnapshot{Error: st.err}
 	}
 	return newSnapshot(st, st.node)
 }
diff --git a/x/ref/services/syncbase/store/model.go b/x/ref/services/syncbase/store/model.go
index a2c48f0..be7265d 100644
--- a/x/ref/services/syncbase/store/model.go
+++ b/x/ref/services/syncbase/store/model.go
@@ -37,15 +37,15 @@
 	Delete(key []byte) error
 }
 
-// StoreReadWriter combines StoreReader and StoreWriter.
-type StoreReadWriter interface {
+// storeReadWriter combines StoreReader and StoreWriter.
+type storeReadWriter interface {
 	StoreReader
 	StoreWriter
 }
 
 // Store is a CRUD-capable storage engine that supports transactions.
 type Store interface {
-	StoreReadWriter
+	storeReadWriter
 
 	// Close closes the store.
 	Close() error
@@ -59,6 +59,29 @@
 	NewSnapshot() Snapshot
 }
 
+// SnapshotOrTransaction represents a Snapshot or a Transaction.
+type SnapshotOrTransaction interface {
+	StoreReader
+
+	// Abort closes the snapshot or transaction.
+	// Any subsequent method calls will fail.
+	// NOTE: this method is also used to distinguish between StoreReader and
+	// SnapshotOrTransaction.
+	Abort() error
+}
+
+// Snapshot is a handle to particular state in time of a Store.
+//
+// All read operations are executed against a consistent view of Store commit
+// history. Snapshots don't acquire locks and thus don't block transactions.
+type Snapshot interface {
+	SnapshotOrTransaction
+
+	// __snapshotSpec is a utility method to distinguish between Snapshot and
+	// SnapshotOrTransaction. This is a no-op.
+	__snapshotSpec()
+}
+
 // Transaction provides a mechanism for atomic reads and writes. Instead of
 // calling this function directly, clients are encouraged to use the
 // RunInTransaction() helper function, which detects "concurrent transaction"
@@ -77,27 +100,13 @@
 // Once a transaction has been committed or aborted, subsequent method calls
 // will fail with no effect.
 type Transaction interface {
-	StoreReadWriter
+	SnapshotOrTransaction
+	StoreWriter
 
 	// Commit commits the transaction.
 	// Fails if writes from outside this transaction conflict with reads from
 	// within this transaction.
 	Commit() error
-
-	// Abort aborts the transaction.
-	Abort() error
-}
-
-// Snapshot is a handle to particular state in time of a Store.
-//
-// All read operations are executed against a consistent view of Store commit
-// history. Snapshots don't acquire locks and thus don't block transactions.
-type Snapshot interface {
-	StoreReader
-
-	// Close closes the snapshot.
-	// Any subsequent method calls will fail.
-	Close() error
 }
 
 // Stream is an interface for iterating through a collection of key-value pairs.
diff --git a/x/ref/services/syncbase/store/test/snapshot.go b/x/ref/services/syncbase/store/test/snapshot.go
index 4dbee5f..04dee18 100644
--- a/x/ref/services/syncbase/store/test/snapshot.go
+++ b/x/ref/services/syncbase/store/test/snapshot.go
@@ -26,12 +26,12 @@
 	verifyAdvance(t, s, key1, value1)
 	verifyAdvance(t, s, nil, nil)
 
-	// Test functions after Close.
-	if err := snapshot.Close(); err != nil {
-		t.Fatalf("can't close the snapshot: %v", err)
+	// Test functions after Abort.
+	if err := snapshot.Abort(); err != nil {
+		t.Fatalf("can't abort the snapshot: %v", err)
 	}
-	expectedErrMsg := store.ErrMsgClosedSnapshot
-	verifyError(t, snapshot.Close(), verror.ErrCanceled.ID, expectedErrMsg)
+	expectedErrMsg := store.ErrMsgAbortedSnapshot
+	verifyError(t, snapshot.Abort(), verror.ErrCanceled.ID, expectedErrMsg)
 
 	_, err := snapshot.Get(key1, nil)
 	verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
diff --git a/x/ref/services/syncbase/store/test/store.go b/x/ref/services/syncbase/store/test/store.go
index 2d6d9a1..48022f9 100644
--- a/x/ref/services/syncbase/store/test/store.go
+++ b/x/ref/services/syncbase/store/test/store.go
@@ -137,7 +137,7 @@
 	s.verify(t, st)
 	for i := 0; i < len(states); i++ {
 		states[i].verify(t, snapshots[i])
-		snapshots[i].Close()
+		snapshots[i].Abort()
 	}
 }
 
@@ -230,7 +230,7 @@
 	}
 	for _, snapshot := range snapshots {
 		_, err := snapshot.Get(key1, nil)
-		verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgClosedSnapshot)
+		verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgAbortedSnapshot)
 	}
 	for _, tx := range transactions {
 		_, err := tx.Get(key1, nil)
diff --git a/x/ref/services/syncbase/store/test/transaction.go b/x/ref/services/syncbase/store/test/transaction.go
index 3e1039b..6cf26e8 100644
--- a/x/ref/services/syncbase/store/test/transaction.go
+++ b/x/ref/services/syncbase/store/test/transaction.go
@@ -155,7 +155,7 @@
 		go func(idx int) {
 			rnd := rand.New(rand.NewSource(239017 * int64(idx)))
 			perm := rnd.Perm(n)
-			if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+			if err := store.RunInTransaction(st, func(tx store.Transaction) error {
 				for j := 0; j <= m; j++ {
 					var keystr string
 					if j < m {
@@ -164,7 +164,7 @@
 						keystr = fmt.Sprintf("%05d", n)
 					}
 					key := []byte(keystr)
-					val, err := st.Get(key, nil)
+					val, err := tx.Get(key, nil)
 					if err != nil {
 						return fmt.Errorf("can't get key %q: %v", key, err)
 					}
@@ -178,7 +178,7 @@
 					} else {
 						newValue = intValue + int64(m)
 					}
-					if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
+					if err := tx.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
 						return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
 					}
 				}
diff --git a/x/ref/services/syncbase/store/util.go b/x/ref/services/syncbase/store/util.go
index aac934b..72c4beb 100644
--- a/x/ref/services/syncbase/store/util.go
+++ b/x/ref/services/syncbase/store/util.go
@@ -8,9 +8,13 @@
 	"v.io/v23/verror"
 )
 
+type SnapshotSpecImpl struct{}
+
+func (s *SnapshotSpecImpl) __snapshotSpec() {}
+
 // RunInTransaction runs the given fn in a transaction, managing retries and
 // commit/abort.
-func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
+func RunInTransaction(st Store, fn func(tx Transaction) error) error {
 	// TODO(rogulenko): Make the number of attempts configurable.
 	// TODO(rogulenko): Change the default number of attempts to 3. Currently,
 	// some storage engine tests fail when the number of attempts is that low.
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 3850d5b..121f594 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -222,9 +222,7 @@
 
 // endBatch marks the end of a given batch.  The batch information is persisted
 // to the store and removed from the temporary in-memory entry.
-func (s *syncService) endBatch(ctx *context.T, tx store.StoreReadWriter, btid, count uint64) error {
-	_ = tx.(store.Transaction)
-
+func (s *syncService) endBatch(ctx *context.T, tx store.Transaction, btid, count uint64) error {
 	s.batchesLock.Lock()
 	defer s.batchesLock.Unlock()
 
@@ -271,9 +269,7 @@
 //
 // The grafting structure is not needed when nodes are being added locally by
 // the Watcher, passing a nil grafting structure.
-func (s *syncService) addNode(ctx *context.T, tx store.StoreReadWriter, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error {
-	_ = tx.(store.Transaction)
-
+func (s *syncService) addNode(ctx *context.T, tx store.Transaction, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error {
 	if parents != nil {
 		if len(parents) > 2 {
 			return verror.New(verror.ErrInternal, ctx, "cannot have more than 2 parents")
@@ -380,9 +376,7 @@
 // to track DAG attachements during a sync operation.  It is not needed if the
 // parent linkage is due to a local change (from conflict resolution selecting
 // an existing version).
-func (s *syncService) addParent(ctx *context.T, tx store.StoreReadWriter, oid, version, parent string, graft graftMap) error {
-	_ = tx.(store.Transaction)
-
+func (s *syncService) addParent(ctx *context.T, tx store.Transaction, oid, version, parent string, graft graftMap) error {
 	if version == parent {
 		return verror.New(verror.ErrInternal, ctx, "object", oid, version, "cannot be its own parent")
 	}
@@ -447,9 +441,7 @@
 }
 
 // moveHead moves the object head node in the DAG.
-func moveHead(ctx *context.T, tx store.StoreReadWriter, oid, head string) error {
-	_ = tx.(store.Transaction)
-
+func moveHead(ctx *context.T, tx store.Transaction, oid, head string) error {
 	// Verify that the node exists.
 	if ok, err := hasNode(ctx, tx, oid, head); err != nil {
 		return err
@@ -565,7 +557,7 @@
 
 // getObjectGraft returns the graftInfo for an object ID.  If the graftMap is
 // nil, a nil graftInfo is returned because grafting is not being tracked.
-func getObjectGraftInfo(ctx *context.T, st store.StoreReader, graft graftMap, oid string) *graftInfo {
+func getObjectGraftInfo(ctx *context.T, sntx store.SnapshotOrTransaction, graft graftMap, oid string) *graftInfo {
 	if graft == nil {
 		return nil
 	}
@@ -580,7 +572,7 @@
 	}
 
 	// If the object has a head node, include it in the set of new heads.
-	if head, err := getHead(ctx, st, oid); err == nil {
+	if head, err := getHead(ctx, sntx, oid); err == nil {
 		info.newHeads[head] = true
 		info.oldHeadSnap = head
 	}
@@ -636,9 +628,7 @@
 // The batch set passed is used to track batches affected by the deletion of DAG
 // objects across multiple calls to prune().  It is later given to pruneDone()
 // to do GC on these batches.
-func prune(ctx *context.T, tx store.StoreReadWriter, oid, version string, batches batchSet, delLogRec func(ctx *context.T, tx store.StoreReadWriter, logrec string) error) error {
-	_ = tx.(store.Transaction)
-
+func prune(ctx *context.T, tx store.Transaction, oid, version string, batches batchSet, delLogRec func(ctx *context.T, tx store.Transaction, logrec string) error) error {
 	if batches == nil {
 		return verror.New(verror.ErrInternal, ctx, "missing batch set")
 	}
@@ -690,9 +680,7 @@
 // pruneDone is called when object pruning is finished within a single pass of
 // the sync garbage collector.  It updates the batch sets affected by objects
 // deleted by prune().
-func pruneDone(ctx *context.T, tx store.StoreReadWriter, batches batchSet) error {
-	_ = tx.(store.Transaction)
-
+func pruneDone(ctx *context.T, tx store.Transaction, batches batchSet) error {
 	// Update batch sets by removing the pruned objects from them.
 	for btid, pruneInfo := range batches {
 		info, err := getBatch(ctx, tx, btid)
@@ -734,9 +722,7 @@
 }
 
 // setNode stores the DAG node entry.
-func setNode(ctx *context.T, tx store.StoreReadWriter, oid, version string, node *dagNode) error {
-	_ = tx.(store.Transaction)
-
+func setNode(ctx *context.T, tx store.Transaction, oid, version string, node *dagNode) error {
 	if version == NoVersion {
 		return verror.New(verror.ErrInternal, ctx, "invalid version", version)
 	}
@@ -759,9 +745,7 @@
 }
 
 // delNode deletes the DAG node entry.
-func delNode(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
-	_ = tx.(store.Transaction)
-
+func delNode(ctx *context.T, tx store.Transaction, oid, version string) error {
 	if version == NoVersion {
 		return verror.New(verror.ErrInternal, ctx, "invalid version", version)
 	}
@@ -787,9 +771,7 @@
 }
 
 // setHead stores version as the DAG object head.
-func setHead(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
-	_ = tx.(store.Transaction)
-
+func setHead(ctx *context.T, tx store.Transaction, oid, version string) error {
 	if version == NoVersion {
 		return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version))
 	}
@@ -808,8 +790,7 @@
 }
 
 // delHead deletes the DAG object head.
-func delHead(ctx *context.T, tx store.StoreReadWriter, oid string) error {
-	_ = tx.(store.Transaction)
+func delHead(ctx *context.T, tx store.Transaction, oid string) error {
 	return util.Delete(ctx, tx, headKey(oid))
 }
 
@@ -819,9 +800,7 @@
 }
 
 // setBatch stores the DAG batch entry.
-func setBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64, info *batchInfo) error {
-	_ = tx.(store.Transaction)
-
+func setBatch(ctx *context.T, tx store.Transaction, btid uint64, info *batchInfo) error {
 	if btid == NoBatchId {
 		return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
 	}
@@ -844,9 +823,7 @@
 }
 
 // delBatch deletes the DAG batch entry.
-func delBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64) error {
-	_ = tx.(store.Transaction)
-
+func delBatch(ctx *context.T, tx store.Transaction, btid uint64) error {
 	if btid == NoBatchId {
 		return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
 	}
diff --git a/x/ref/services/syncbase/vsync/dag_test.go b/x/ref/services/syncbase/vsync/dag_test.go
index 3728635..9ef43f7 100644
--- a/x/ref/services/syncbase/vsync/dag_test.go
+++ b/x/ref/services/syncbase/vsync/dag_test.go
@@ -790,7 +790,7 @@
 		tx := st.NewTransaction()
 		del := 0
 		err := prune(nil, tx, oid, version, batches,
-			func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+			func(ctx *context.T, tx store.Transaction, lr string) error {
 				del++
 				return nil
 			})
@@ -872,7 +872,7 @@
 	batches := newBatchPruning()
 	tx := st.NewTransaction()
 	err := prune(nil, tx, oid, version, batches,
-		func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+		func(ctx *context.T, tx store.Transaction, lr string) error {
 			del++
 			if lr == "logrec-03" {
 				return fmt.Errorf("refuse to delete %s", lr)
@@ -1574,7 +1574,7 @@
 			t.Errorf("cannot getHead() on object %s: %v", oid, err)
 		}
 		err = prune(nil, tx, oid, head, batches,
-			func(ctx *context.T, itx store.StoreReadWriter, lr string) error {
+			func(ctx *context.T, itx store.Transaction, lr string) error {
 				return nil
 			})
 		if err != nil {
@@ -1614,7 +1614,7 @@
 
 	batches = newBatchPruning()
 	err = prune(nil, tx, oid_c, "3", batches,
-		func(ctx *context.T, itx store.StoreReadWriter, lr string) error {
+		func(ctx *context.T, itx store.Transaction, lr string) error {
 			return nil
 		})
 	if err != nil {
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 6b4fad1..8ba9cd9 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -427,9 +427,9 @@
 
 // TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
 // loop.
-func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
+func getNextLogRec(ctx *context.T, st store.Store, dev uint64, r *genRange) (*localLogRec, error) {
 	for i := r.cur; i <= r.max; i++ {
-		rec, err := getLogRec(ctx, sn, dev, i)
+		rec, err := getLogRec(ctx, st, dev, i)
 		if err == nil {
 			r.cur = i + 1
 			return rec, nil
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index 6b017ee..ebb55f6 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -19,7 +19,7 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
-
+	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/verror"
@@ -119,16 +119,19 @@
 	}
 
 	data := &syncData{}
-	if err := util.Get(ctx, sv.St(), s.stKey(), data); err != nil {
-		if verror.ErrorID(err) != verror.ErrNoExist.ID {
-			return nil, err
+	if err := store.RunInTransaction(sv.St(), func(tx store.Transaction) error {
+		if err := util.Get(ctx, sv.St(), s.stKey(), data); err != nil {
+			if verror.ErrorID(err) != verror.ErrNoExist.ID {
+				return err
+			}
+			// First invocation of vsync.New().
+			// TODO(sadovsky): Maybe move guid generation and storage to serviceData.
+			data.Id = rand64()
+			return util.Put(ctx, tx, s.stKey(), data)
 		}
-		// First invocation of vsync.New().
-		// TODO(sadovsky): Maybe move guid generation and storage to serviceData.
-		data.Id = rand64()
-		if err := util.Put(ctx, sv.St(), s.stKey(), data); err != nil {
-			return nil, err
-		}
+		return nil
+	}); err != nil {
+		return nil, err
 	}
 
 	// data.Id is now guaranteed to be initialized.
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index 0c1adce..8195559 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -269,8 +269,7 @@
 }
 
 // putDbSyncState persists the sync state object for a given Database.
-func putDbSyncState(ctx *context.T, tx store.StoreReadWriter, ds *dbSyncState) error {
-	_ = tx.(store.Transaction)
+func putDbSyncState(ctx *context.T, tx store.Transaction, ds *dbSyncState) error {
 	return util.Put(ctx, tx, dbSyncStateKey(), ds)
 }
 
@@ -331,8 +330,7 @@
 }
 
 // putLogRec stores the log record.
-func putLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
-	_ = tx.(store.Transaction)
+func putLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
 	return util.Put(ctx, tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec)
 }
 
@@ -346,7 +344,6 @@
 }
 
 // delLogRec deletes the log record for a given (devid, gen).
-func delLogRec(ctx *context.T, tx store.StoreReadWriter, id, gen uint64) error {
-	_ = tx.(store.Transaction)
+func delLogRec(ctx *context.T, tx store.Transaction, id, gen uint64) error {
 	return util.Delete(ctx, tx, logRecKey(id, gen))
 }
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 961121c..bf4e3aa 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -110,9 +110,7 @@
 }
 
 // addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(ctx *context.T, tx store.StoreReadWriter, sg *interfaces.SyncGroup) error {
-	_ = tx.(store.Transaction)
-
+func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error {
 	// Verify SyncGroup before storing it since it may have been received
 	// from a remote peer.
 	if err := verifySyncGroup(ctx, sg); err != nil {
@@ -170,9 +168,7 @@
 }
 
 // delSyncGroupById deletes the SyncGroup given its ID.
-func delSyncGroupById(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId) error {
-	_ = tx.(store.Transaction)
-
+func delSyncGroupById(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
 	sg, err := getSyncGroupById(ctx, tx, gid)
 	if err != nil {
 		return err
@@ -184,9 +180,7 @@
 }
 
 // delSyncGroupByName deletes the SyncGroup given its name.
-func delSyncGroupByName(ctx *context.T, tx store.StoreReadWriter, name string) error {
-	_ = tx.(store.Transaction)
-
+func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
 	gid, err := getSyncGroupId(ctx, tx, name)
 	if err != nil {
 		return err
@@ -218,7 +212,7 @@
 		// For each database, fetch its SyncGroup data entries by scanning their
 		// prefix range.  Use a database snapshot for the scan.
 		sn := st.NewSnapshot()
-		defer sn.Close()
+		defer sn.Abort()
 		name := appDbName(appName, dbName)
 
 		forEachSyncGroup(sn, func(sg *interfaces.SyncGroup) bool {
@@ -317,10 +311,10 @@
 }
 
 // hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) (bool, error) {
+func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
 	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
 	var sg interfaces.SyncGroup
-	if err := util.Get(nil, st, sgDataKey(gid), &sg); err != nil {
+	if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil {
 		if verror.ErrorID(err) == verror.ErrNoExist.ID {
 			err = nil
 		}
@@ -330,10 +324,10 @@
 }
 
 // hasSGNameEntry returns true if the SyncGroup name entry exists.
-func hasSGNameEntry(st store.StoreReader, name string) (bool, error) {
+func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
 	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
 	var gid interfaces.GroupId
-	if err := util.Get(nil, st, sgNameKey(name), &gid); err != nil {
+	if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil {
 		if verror.ErrorID(err) == verror.ErrNoExist.ID {
 			err = nil
 		}
@@ -343,14 +337,12 @@
 }
 
 // setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
-	_ = tx.(store.Transaction)
+func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
 	return util.Put(ctx, tx, sgDataKey(gid), sg)
 }
 
 // setSGNameEntry stores the SyncGroup name entry.
-func setSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string, gid interfaces.GroupId) error {
-	_ = tx.(store.Transaction)
+func setSGNameEntry(ctx *context.T, tx store.Transaction, name string, gid interfaces.GroupId) error {
 	return util.Put(ctx, tx, sgNameKey(name), gid)
 }
 
@@ -373,14 +365,12 @@
 }
 
 // delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId) error {
-	_ = tx.(store.Transaction)
+func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
 	return util.Delete(ctx, tx, sgDataKey(gid))
 }
 
 // delSGNameEntry deletes the SyncGroup name to ID mapping.
-func delSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string) error {
-	_ = tx.(store.Transaction)
+func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
 	return util.Delete(ctx, tx, sgNameKey(name))
 }
 
@@ -392,7 +382,7 @@
 	vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
 	defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
 
-	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
 			return err
@@ -452,7 +442,7 @@
 	var sg *interfaces.SyncGroup
 	nullSpec := wire.SyncGroupSpec{}
 
-	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
 			return err
@@ -509,7 +499,7 @@
 		return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
 	}
 
-	err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 
 		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
 
@@ -540,7 +530,7 @@
 	defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
 
 	sn := sd.db.St().NewSnapshot()
-	defer sn.Close()
+	defer sn.Abort()
 
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
@@ -573,7 +563,7 @@
 	defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
 
 	sn := sd.db.St().NewSnapshot()
-	defer sn.Close()
+	defer sn.Abort()
 
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
@@ -598,7 +588,7 @@
 	defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
 
 	sn := sd.db.St().NewSnapshot()
-	defer sn.Close()
+	defer sn.Abort()
 
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
@@ -622,7 +612,7 @@
 	vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
 	defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
 
-	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
 			return err
@@ -673,7 +663,7 @@
 
 	// Publish rejected. Persist that to avoid retrying in the
 	// future and to remember the split universe scenario.
-	err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+	err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Ensure SG still exists.
 		sg, err := getSyncGroupByName(ctx, tx, sgName)
 		if err != nil {
@@ -694,7 +684,7 @@
 // be time consuming.  Consider doing it asynchronously and letting the server
 // reply to the client earlier.  However it must happen within the scope of this
 // transaction (and its snapshot view).
-func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.StoreReadWriter, prefixes []string) error {
+func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.Transaction, prefixes []string) error {
 	if len(prefixes) == 0 {
 		return verror.New(verror.ErrInternal, ctx, "no prefixes specified")
 	}
@@ -789,7 +779,7 @@
 		return err
 	}
 
-	err = store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
+	err = store.RunInTransaction(st, func(tx store.Transaction) error {
 		localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
 
 		if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
@@ -856,7 +846,7 @@
 	}
 
 	var sg *interfaces.SyncGroup
-	err = store.RunInTransaction(dbSt, func(tx store.StoreReadWriter) error {
+	err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
 		var err error
 		sg, err = getSyncGroupById(ctx, tx, gid)
 		if err != nil {
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 8f5f60c..6d80317 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -169,7 +169,7 @@
 	// Transactional processing of the batch: convert these syncable log
 	// records to a batch of sync log records, filling their parent versions
 	// from the DAG head nodes.
-	err := store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
+	err := store.RunInTransaction(st, func(tx store.Transaction) error {
 		batch := make([]*localLogRec, 0, len(logs))
 		for _, entry := range logs {
 			if rec, err := convertLogRecord(ctx, tx, entry); err != nil {
@@ -193,9 +193,7 @@
 
 // processBatch applies a single batch of changes (object mutations) received
 // from watching a particular Database.
-func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, appBatch bool, totalCount uint64, tx store.StoreReadWriter) error {
-	_ = tx.(store.Transaction)
-
+func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, appBatch bool, totalCount uint64, tx store.Transaction) error {
 	count := uint64(len(batch))
 	if count == 0 {
 		return nil
@@ -247,7 +245,7 @@
 
 // processLocalLogRec processes a local log record by adding to the Database and
 // suitably updating the DAG metadata.
-func (s *syncService) processLocalLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
+func (s *syncService) processLocalLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
 	// Insert the new log record into the log.
 	if err := putLogRec(ctx, tx, rec); err != nil {
 		return err
@@ -361,8 +359,7 @@
 // simplify the store-to-sync interaction.  A deleted key would still have a
 // version and its value entry would encode the "deleted" flag, either in the
 // key or probably in a value wrapper that would contain other metadata.
-func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) (*localLogRec, error) {
-	_ = tx.(store.Transaction)
+func convertLogRecord(ctx *context.T, tx store.Transaction, logEnt *watchable.LogEntry) (*localLogRec, error) {
 	var rec *localLogRec
 	timestamp := logEnt.CommitTimestamp
 
@@ -404,9 +401,7 @@
 // newLocalLogRec creates a local sync log record given its information: key,
 // version, deletion flag, and timestamp.  It retrieves the current DAG head
 // for the key (if one exists) to use as its parent (previous) version.
-func newLocalLogRec(ctx *context.T, tx store.StoreReadWriter, key, version []byte, deleted bool, timestamp int64) *localLogRec {
-	_ = tx.(store.Transaction)
-
+func newLocalLogRec(ctx *context.T, tx store.Transaction, key, version []byte, deleted bool, timestamp int64) *localLogRec {
 	rec := localLogRec{}
 	oid := string(key)
 
@@ -484,8 +479,7 @@
 }
 
 // setResMark stores the watcher resume marker for a database.
-func setResMark(ctx *context.T, tx store.StoreReadWriter, resMark string) error {
-	_ = tx.(store.Transaction)
+func setResMark(ctx *context.T, tx store.Transaction, resMark string) error {
 	return util.Put(ctx, tx, resMarkKey(), resMark)
 }