store/leveldb: adding mutexes

Wrapping db, snapshot and transaction with mutexes,
adding a bunch of tests.

This change also removes verrors from internal storage engine
for consistency: we should either use verrors everywhere or not
use them at all. As discussed with Adam offline, we should
probably start from the latter and switch to verrors later.

Change-Id: I38bf9760b35c40943225629c37e7d41ec8c30085
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 67c79a7..07198d2 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -75,18 +75,15 @@
 func (t *table) Scan(ctx *context.T, call wire.TableScanServerCall, start, end string) error {
 	sn := t.d.st.NewSnapshot()
 	defer sn.Close()
-	it, err := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), start, end))
-	if err == nil {
-		sender := call.SendStream()
-		key, value := []byte{}, []byte{}
-		for it.Advance() {
-			key = it.Key(key)
-			parts := util.SplitKeyParts(string(key))
-			sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: it.Value(value)})
-		}
-		err = it.Err()
+	it := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), start, end))
+	sender := call.SendStream()
+	key, value := []byte{}, []byte{}
+	for it.Advance() {
+		key = it.Key(key)
+		parts := util.SplitKeyParts(string(key))
+		sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: it.Value(value)})
 	}
-	if err != nil {
+	if err := it.Err(); err != nil {
 		return verror.New(verror.ErrInternal, ctx, err)
 	}
 	return nil
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
index 750f4db..98ffd48 100644
--- a/services/syncbase/server/util/glob.go
+++ b/services/syncbase/server/util/glob.go
@@ -55,12 +55,7 @@
 			return nil, verror.New(verror.ErrInternal, ctx, err)
 		}
 	}
-	// TODO(sadovsky): Does Scan really need to return an error?
-	it, err := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
-	if err != nil {
-		sn.Close()
-		return nil, verror.New(verror.ErrInternal, ctx, err)
-	}
+	it := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
 	ch := make(chan naming.GlobReply)
 	go func() {
 		defer sn.Close()
diff --git a/services/syncbase/store/benchmark/benchmark.go b/services/syncbase/store/benchmark/benchmark.go
index a4700e6..c794fa9 100644
--- a/services/syncbase/store/benchmark/benchmark.go
+++ b/services/syncbase/store/benchmark/benchmark.go
@@ -110,7 +110,7 @@
 func ReadSequential(b *testing.B, config *Config) {
 	WriteSequential(b, config)
 	b.ResetTimer()
-	s, _ := config.St.Scan([]byte("0"), []byte("z"))
+	s := config.St.Scan([]byte("0"), []byte("z"))
 	var key, value []byte
 	for i := 0; i < b.N; i++ {
 		if !s.Advance() {
diff --git a/services/syncbase/store/invalid_types.go b/services/syncbase/store/invalid_types.go
new file mode 100644
index 0000000..05bfc13
--- /dev/null
+++ b/services/syncbase/store/invalid_types.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package store
+
+// InvalidSnapshot is a store.Snapshot for which all methods return errors.
+type InvalidSnapshot struct {
+	// Error is the error returned by every method call.
+	Error error
+}
+
+// Close implements the store.Snapshot interface.
+func (s *InvalidSnapshot) Close() error {
+	return s.Error
+}
+
+// Get implements the store.StoreReader interface.
+func (s *InvalidSnapshot) Get(key, valbuf []byte) ([]byte, error) {
+	return valbuf, s.Error
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *InvalidSnapshot) Scan(start, end []byte) Stream {
+	return &InvalidStream{s.Error}
+}
+
+// InvalidStream is a store.Stream for which all methods return errors.
+type InvalidStream struct {
+	// Error is the error returned by every method call.
+	Error error
+}
+
+// Advance implements the store.Stream interface.
+func (s *InvalidStream) Advance() bool {
+	return false
+}
+
+// Key implements the store.Stream interface.
+func (s *InvalidStream) Key(keybuf []byte) []byte {
+	panic(s.Error)
+}
+
+// Value implements the store.Stream interface.
+func (s *InvalidStream) Value(valbuf []byte) []byte {
+	panic(s.Error)
+}
+
+// Err implements the store.Stream interface.
+func (s *InvalidStream) Err() error {
+	return s.Error
+}
+
+// Cancel implements the store.Stream interface.
+func (s *InvalidStream) Cancel() {
+}
+
+// InvalidTransaction is a store.Transaction for which all methods return errors.
+type InvalidTransaction struct {
+	// Error is the error returned by every method call.
+	Error error
+}
+
+// ResetForRetry implements the store.Transaction interface.
+func (tx *InvalidTransaction) ResetForRetry() {
+	panic(tx.Error)
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *InvalidTransaction) Get(key, valbuf []byte) ([]byte, error) {
+	return valbuf, tx.Error
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *InvalidTransaction) Scan(start, end []byte) Stream {
+	return &InvalidStream{tx.Error}
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *InvalidTransaction) Put(key, value []byte) error {
+	return tx.Error
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *InvalidTransaction) Delete(key []byte) error {
+	return tx.Error
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *InvalidTransaction) Commit() error {
+	return tx.Error
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *InvalidTransaction) Abort() error {
+	return tx.Error
+}
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index b15374d..672eb83 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -11,22 +11,29 @@
 // #include "syncbase_leveldb.h"
 import "C"
 import (
+	"errors"
 	"sync"
 	"unsafe"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
+var (
+	errClosedStore = errors.New("closed store")
+)
+
 // db is a wrapper around LevelDB that implements the store.Store interface.
-// TODO(rogulenko): ensure thread safety.
 type db struct {
+	// mu protects cDb.
+	mu  sync.RWMutex
 	cDb *C.leveldb_t
 	// Default read/write options.
 	readOptions  *C.leveldb_readoptions_t
 	writeOptions *C.leveldb_writeoptions_t
+	err          error
 	// Used to prevent concurrent transactions.
 	// TODO(rogulenko): improve concurrency.
-	mu sync.Mutex
+	txmu sync.Mutex
 }
 
 var _ store.Store = (*db)(nil)
@@ -58,9 +65,18 @@
 
 // Close implements the store.Store interface.
 func (d *db) Close() error {
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	if d.err != nil {
+		return d.err
+	}
 	C.leveldb_close(d.cDb)
+	d.cDb = nil
 	C.leveldb_readoptions_destroy(d.readOptions)
+	d.readOptions = nil
 	C.leveldb_writeoptions_destroy(d.writeOptions)
+	d.writeOptions = nil
+	d.err = errors.New("closed store")
 	return nil
 }
 
@@ -81,8 +97,13 @@
 }
 
 // Scan implements the store.StoreReader interface.
-func (d *db) Scan(start, end []byte) (store.Stream, error) {
-	return newStream(d, start, end, d.readOptions), nil
+func (d *db) Scan(start, end []byte) store.Stream {
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		return &store.InvalidStream{d.err}
+	}
+	return newStream(d, start, end, d.readOptions)
 }
 
 // Put implements the store.StoreWriter interface.
@@ -103,26 +124,44 @@
 
 // NewTransaction implements the store.Store interface.
 func (d *db) NewTransaction() store.Transaction {
+	// txmu is held until the transaction is successfully committed or aborted.
+	d.txmu.Lock()
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		d.txmu.Unlock()
+		return &store.InvalidTransaction{d.err}
+	}
 	return newTransaction(d)
 }
 
 // NewSnapshot implements the store.Store interface.
 func (d *db) NewSnapshot() store.Snapshot {
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		return &store.InvalidSnapshot{d.err}
+	}
 	return newSnapshot(d)
 }
 
 // getWithOpts returns the value for the given key.
 // cOpts may contain a pointer to a snapshot.
 func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		return valbuf, d.err
+	}
 	var cError *C.char
 	var valLen C.size_t
 	cStr, cLen := cSlice(key)
 	val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError)
 	if err := goError(cError); err != nil {
-		return nil, err
+		return valbuf, err
 	}
 	if val == nil {
-		return nil, &store.ErrUnknownKey{Key: string(key)}
+		return valbuf, &store.ErrUnknownKey{Key: string(key)}
 	}
 	defer C.leveldb_free(unsafe.Pointer(val))
 	return store.CopyBytes(valbuf, goBytes(val, valLen)), nil
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
index fccad5f..666cc40 100644
--- a/services/syncbase/store/leveldb/db_test.go
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -19,27 +19,37 @@
 }
 
 func TestStream(t *testing.T) {
-	db, dbPath := newDB()
-	defer destroyDB(db, dbPath)
-	test.RunStreamTest(t, db)
+	runTest(t, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+	runTest(t, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+	runTest(t, test.RunStoreStateTest)
 }
 
 func TestReadWriteBasic(t *testing.T) {
-	st, path := newDB()
-	defer destroyDB(st, path)
-	test.RunReadWriteBasicTest(t, st)
+	runTest(t, test.RunReadWriteBasicTest)
 }
 
 func TestReadWriteRandom(t *testing.T) {
-	st, path := newDB()
-	defer destroyDB(st, path)
-	test.RunReadWriteRandomTest(t, st)
+	runTest(t, test.RunReadWriteRandomTest)
+}
+
+func TestTransactionState(t *testing.T) {
+	runTest(t, test.RunTransactionStateTest)
 }
 
 func TestTransactionsWithGet(t *testing.T) {
-	st, path := newDB()
-	defer destroyDB(st, path)
-	test.RunTransactionsWithGetTest(t, st)
+	runTest(t, test.RunTransactionsWithGetTest)
+}
+
+func runTest(t *testing.T, f func(t *testing.T, st store.Store)) {
+	st, dbPath := newDB()
+	defer destroyDB(st, dbPath)
+	f(t, st)
 }
 
 func newDB() (store.Store, string) {
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index afa9f25..7b82c7c 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -7,15 +7,21 @@
 // #include "leveldb/c.h"
 import "C"
 import (
+	"errors"
+	"sync"
+
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
 // snapshot is a wrapper around LevelDB snapshot that implements
 // the store.Snapshot interface.
 type snapshot struct {
+	// mu protects the state of the snapshot.
+	mu        sync.RWMutex
 	d         *db
 	cSnapshot *C.leveldb_snapshot_t
 	cOpts     *C.leveldb_readoptions_t
+	err       error
 }
 
 var _ store.Snapshot = (*snapshot)(nil)
@@ -26,25 +32,43 @@
 	C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
 	C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
 	return &snapshot{
-		d,
-		cSnapshot,
-		cOpts,
+		d:         d,
+		cSnapshot: cSnapshot,
+		cOpts:     cOpts,
 	}
 }
 
 // Close implements the store.Snapshot interface.
 func (s *snapshot) Close() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.err != nil {
+		return s.err
+	}
 	C.leveldb_readoptions_destroy(s.cOpts)
+	s.cOpts = nil
 	C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
+	s.cSnapshot = nil
+	s.err = errors.New("closed snapshot")
 	return nil
 }
 
 // Get implements the store.StoreReader interface.
 func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	if s.err != nil {
+		return valbuf, s.err
+	}
 	return s.d.getWithOpts(key, valbuf, s.cOpts)
 }
 
 // Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
-	return newStream(s.d, start, end, s.cOpts), nil
+func (s *snapshot) Scan(start, end []byte) store.Stream {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	if s.err != nil {
+		return &store.InvalidStream{s.err}
+	}
+	return newStream(s.d, start, end, s.cOpts)
 }
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index e4b1631..4705e59 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -9,10 +9,10 @@
 import "C"
 import (
 	"bytes"
+	"errors"
 	"sync"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/v23/verror"
 )
 
 // stream is a wrapper around LevelDB iterator that implements
@@ -39,7 +39,6 @@
 
 func newStream(d *db, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
 	cStr, size := cSlice(start)
-	// TODO(rogulenko): check if (db.cDb != nil) under a db-scoped mutex.
 	cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
 	return &stream{
 		cIter: cIter,
@@ -57,7 +56,7 @@
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	s.hasValue = false
-	if s.cIter == nil {
+	if s.err != nil {
 		return false
 	}
 	// The C iterator starts out initialized, pointing at the first value; we
@@ -113,10 +112,9 @@
 func (s *stream) Cancel() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	if s.cIter == nil {
+	if s.err != nil {
 		return
 	}
-	s.err = verror.New(verror.ErrCanceled, nil)
 	// s.hasValue might be false if Advance was never called.
 	if s.hasValue {
 		// We copy the key and the value from the C heap to the Go heap before
@@ -124,6 +122,7 @@
 		s.key = store.CopyBytes(nil, s.cKey())
 		s.value = store.CopyBytes(nil, s.cVal())
 	}
+	s.err = errors.New("canceled stream")
 	s.destroyLeveldbIter()
 }
 
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index d358f51..12c3a65 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -7,62 +7,85 @@
 // #include "leveldb/c.h"
 import "C"
 import (
+	"errors"
+	"sync"
+
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
 // transaction is a wrapper around LevelDB WriteBatch that implements
 // the store.Transaction interface.
-// TODO(rogulenko): handle incorrect usage.
 type transaction struct {
+	mu       sync.Mutex
 	d        *db
 	snapshot store.Snapshot
 	batch    *C.leveldb_writebatch_t
 	cOpts    *C.leveldb_writeoptions_t
+	err      error
 }
 
 var _ store.Transaction = (*transaction)(nil)
 
 func newTransaction(d *db) *transaction {
-	// The lock is held until the transaction is successfully
-	// committed or aborted.
-	d.mu.Lock()
 	return &transaction{
-		d,
-		d.NewSnapshot(),
-		C.leveldb_writebatch_create(),
-		d.writeOptions,
+		d:        d,
+		snapshot: d.NewSnapshot(),
+		batch:    C.leveldb_writebatch_create(),
+		cOpts:    d.writeOptions,
 	}
 }
 
 // close frees allocated C objects and releases acquired locks.
 func (tx *transaction) close() {
-	tx.d.mu.Unlock()
-	tx.snapshot.Close()
+	tx.d.txmu.Unlock()
 	C.leveldb_writebatch_destroy(tx.batch)
+	tx.batch = nil
 	if tx.cOpts != tx.d.writeOptions {
 		C.leveldb_writeoptions_destroy(tx.cOpts)
 	}
+	tx.cOpts = nil
 }
 
 // ResetForRetry implements the store.Transaction interface.
 func (tx *transaction) ResetForRetry() {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.batch == nil {
+		panic(tx.err)
+	}
 	tx.snapshot.Close()
 	tx.snapshot = tx.d.NewSnapshot()
+	tx.err = nil
 	C.leveldb_writebatch_clear(tx.batch)
 }
 
 // Get implements the store.StoreReader interface.
 func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return valbuf, tx.err
+	}
 	return tx.snapshot.Get(key, valbuf)
 }
 
 // Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) store.Stream {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return &store.InvalidStream{tx.err}
+	}
 	return tx.snapshot.Scan(start, end)
 }
 
 // Put implements the store.StoreWriter interface.
 func (tx *transaction) Put(key, value []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return tx.err
+	}
 	cKey, cKeyLen := cSlice(key)
 	cVal, cValLen := cSlice(value)
 	C.leveldb_writebatch_put(tx.batch, cKey, cKeyLen, cVal, cValLen)
@@ -71,6 +94,11 @@
 
 // Delete implements the store.StoreWriter interface.
 func (tx *transaction) Delete(key []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return tx.err
+	}
 	cKey, cKeyLen := cSlice(key)
 	C.leveldb_writebatch_delete(tx.batch, cKey, cKeyLen)
 	return nil
@@ -78,17 +106,32 @@
 
 // Commit implements the store.Transaction interface.
 func (tx *transaction) Commit() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return tx.err
+	}
+	tx.d.mu.Lock()
+	defer tx.d.mu.Unlock()
 	var cError *C.char
 	C.leveldb_write(tx.d.cDb, tx.cOpts, tx.batch, &cError)
 	if err := goError(cError); err != nil {
+		tx.err = errors.New("already attempted to commit transaction")
 		return err
 	}
+	tx.err = errors.New("committed transaction")
 	tx.close()
 	return nil
 }
 
 // Abort implements the store.Transaction interface.
 func (tx *transaction) Abort() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.batch == nil {
+		return tx.err
+	}
+	tx.err = errors.New("aborted transaction")
 	tx.close()
 	return nil
 }
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index 180cd14..fc7cf0a 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -52,21 +52,21 @@
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if err := s.error(); err != nil {
-		return nil, err
+		return valbuf, err
 	}
 	value, ok := s.data[string(key)]
 	if !ok {
-		return nil, &store.ErrUnknownKey{Key: string(key)}
+		return valbuf, &store.ErrUnknownKey{Key: string(key)}
 	}
 	return store.CopyBytes(valbuf, value), nil
 }
 
 // Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
+func (s *snapshot) Scan(start, end []byte) store.Stream {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if err := s.error(); err != nil {
-		return nil, err
+		return &store.InvalidStream{err}
 	}
-	return newStream(s, start, end), nil
+	return newStream(s, start, end)
 }
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index 36a0758..e844b20 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -40,16 +40,16 @@
 	defer st.mu.Unlock()
 	value, ok := st.data[string(key)]
 	if !ok {
-		return nil, &store.ErrUnknownKey{Key: string(key)}
+		return valbuf, &store.ErrUnknownKey{Key: string(key)}
 	}
 	return store.CopyBytes(valbuf, value), nil
 }
 
 // Scan implements the store.StoreReader interface.
-func (st *memstore) Scan(start, end []byte) (store.Stream, error) {
+func (st *memstore) Scan(start, end []byte) store.Stream {
 	st.mu.Lock()
 	defer st.mu.Unlock()
-	return newStream(newSnapshot(st), start, end), nil
+	return newStream(newSnapshot(st), start, end)
 }
 
 // Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index 1ba6e85..4006ce7 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -8,6 +8,7 @@
 	"runtime"
 	"testing"
 
+	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/test"
 )
 
@@ -16,27 +17,39 @@
 }
 
 func TestStream(t *testing.T) {
-	st := New()
-	defer st.Close()
-	test.RunStreamTest(t, st)
+	runTest(t, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+	runTest(t, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+	// TODO(rogulenko): Enable this test once memstore.Close causes memstore to
+	// disallow subsequent operations.
+	// runTest(t, test.RunStoreStateTest)
 }
 
 func TestReadWriteBasic(t *testing.T) {
-	st := New()
-	defer st.Close()
-	test.RunReadWriteBasicTest(t, st)
+	runTest(t, test.RunReadWriteBasicTest)
 }
 
 func TestReadWriteRandom(t *testing.T) {
-	st := New()
-	defer st.Close()
-	test.RunReadWriteRandomTest(t, st)
+	runTest(t, test.RunReadWriteRandomTest)
+}
+
+func TestTransactionState(t *testing.T) {
+	runTest(t, test.RunTransactionStateTest)
 }
 
 func TestTransactionsWithGet(t *testing.T) {
-	st := New()
-	defer st.Close()
 	// TODO(sadovsky): Enable this test once we've added a retry loop to
 	// RunInTransaction. Without that, concurrency makes the test fail.
-	//test.RunTransactionsWithGetTest(t, st)
+	// runTest(t, test.RunTransactionsWithGetTest)
+}
+
+func runTest(t *testing.T, f func(t *testing.T, st store.Store)) {
+	st := New()
+	defer st.Close()
+	f(t, st)
 }
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index 1e96c01..221d33c 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -5,11 +5,11 @@
 package memstore
 
 import (
+	"errors"
 	"sort"
 	"sync"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/v23/verror"
 )
 
 type stream struct {
@@ -89,5 +89,5 @@
 func (s *stream) Cancel() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	s.err = verror.New(verror.ErrCanceled, nil)
+	s.err = errors.New("canceled stream")
 }
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 3ef45aa..56dcbdd 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -80,19 +80,19 @@
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if err := tx.error(); err != nil {
-		return nil, err
+		return valbuf, err
 	}
 	return tx.sn.Get(key, valbuf)
 }
 
 // Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) store.Stream {
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if err := tx.error(); err != nil {
-		return nil, err
+		return &store.InvalidStream{err}
 	}
-	return newStream(tx.sn, start, end), nil
+	return newStream(tx.sn, start, end)
 }
 
 // Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index 926b66d..22a5c99 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -12,11 +12,12 @@
 	// sub-slice of valbuf if valbuf was large enough to hold the entire value.
 	// Otherwise, a newly allocated slice will be returned. It is valid to pass a
 	// nil valbuf.
-	// Fails if the given key is unknown (ErrUnknownKey).
+	// If the given key is unknown, valbuf is returned unchanged and the function
+	// fails with ErrUnknownKey.
 	Get(key, valbuf []byte) ([]byte, error)
 
 	// Scan returns all rows with keys in range [start, end).
-	Scan(start, end []byte) (Stream, error)
+	Scan(start, end []byte) Stream
 }
 
 // StoreWriter writes data to a CRUD-capable storage engine.
diff --git a/services/syncbase/store/test/snapshot.go b/services/syncbase/store/test/snapshot.go
new file mode 100644
index 0000000..0276fd1
--- /dev/null
+++ b/services/syncbase/store/test/snapshot.go
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+	"strings"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// RunSnapshotTest verifies store.Snapshot operations.
+func RunSnapshotTest(t *testing.T, st store.Store) {
+	key1, value1 := []byte("key1"), []byte("value1")
+	st.Put(key1, value1)
+	snapshot := st.NewSnapshot()
+	key2, value2 := []byte("key2"), []byte("value2")
+	st.Put(key2, value2)
+
+	// Test Get and Scan.
+	verifyGet(t, snapshot, key1, value1)
+	verifyGet(t, snapshot, key2, nil)
+	s := snapshot.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, key1, value1)
+	verifyAdvance(t, s, nil, nil)
+
+	// Test functions after Close.
+	if err := snapshot.Close(); err != nil {
+		t.Fatalf("can't close the snapshot: %v", err)
+	}
+	expectedErr := "closed snapshot"
+	if err := snapshot.Close(); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	if _, err := snapshot.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	s = snapshot.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, nil, nil)
+	if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+}
diff --git a/services/syncbase/store/test/stream.go b/services/syncbase/store/test/stream.go
index 295d683..4caeb7c 100644
--- a/services/syncbase/store/test/stream.go
+++ b/services/syncbase/store/test/stream.go
@@ -6,10 +6,10 @@
 
 import (
 	"bytes"
+	"strings"
 	"testing"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/v23/verror"
 )
 
 // RunStreamTest verifies store.Stream operations.
@@ -21,25 +21,14 @@
 	key3, value3 := []byte("key3"), []byte("value3")
 	st.Put(key3, value3)
 
-	s, _ := st.Scan([]byte("a"), []byte("z"))
-	if !s.Advance() {
-		t.Fatalf("can't advance the stream")
-	}
-	var key, value []byte
-	for i := 0; i < 2; i++ {
-		if key = s.Key(key); !bytes.Equal(key, key1) {
-			t.Fatalf("unexpected key: got %q, want %q", key, key1)
-		}
-		if value = s.Value(value); !bytes.Equal(value, value1) {
-			t.Fatalf("unexpected value: got %q, want %q", value, value1)
-		}
-	}
-
+	s := st.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, key1, value1)
 	if !s.Advance() {
 		t.Fatalf("can't advance the stream")
 	}
 	s.Cancel()
 	for i := 0; i < 2; i++ {
+		var key, value []byte
 		if key = s.Key(key); !bytes.Equal(key, key2) {
 			t.Fatalf("unexpected key: got %q, want %q", key, key2)
 		}
@@ -47,11 +36,8 @@
 			t.Fatalf("unexpected value: got %q, want %q", value, value2)
 		}
 	}
-
-	if s.Advance() {
-		t.Fatalf("advance returned true unexpectedly")
-	}
-	if verror.ErrorID(s.Err()) != verror.ErrCanceled.ID {
+	verifyAdvance(t, s, nil, nil)
+	if !strings.Contains(s.Err().Error(), "canceled stream") {
 		t.Fatalf("unexpected steam error: %v", s.Err())
 	}
 }
diff --git a/services/syncbase/store/test/test.go b/services/syncbase/store/test/test.go
index 0643bae..330f69a 100644
--- a/services/syncbase/store/test/test.go
+++ b/services/syncbase/store/test/test.go
@@ -2,17 +2,13 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// TODO(rogulenko): add more tests.
-
+// TODO(rogulenko): rename this file test.go -> store.go.
 package test
 
 import (
-	"bytes"
 	"fmt"
 	"math/rand"
-	"reflect"
-	"strconv"
-	"sync"
+	"strings"
 	"testing"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
@@ -81,22 +77,14 @@
 // verify checks that various read operations on store.Store and memtable return
 // the same results.
 func (s *storeState) verify(t *testing.T, st store.StoreReader) {
-	var key, value []byte
-	var err error
 	// Verify Get().
 	for i := 0; i < s.size; i++ {
 		keystr := fmt.Sprintf("%05d", i)
 		answer, ok := s.memtable[keystr]
-		key = []byte(keystr)
-		value, err = st.Get(key, value)
 		if ok {
-			if err != nil || !bytes.Equal(value, answer) {
-				t.Fatalf("unexpected get result for %q: got {%q, %v}, want {%q, nil}", keystr, value, err, answer)
-			}
+			verifyGet(t, st, []byte(keystr), answer)
 		} else {
-			if !reflect.DeepEqual(&store.ErrUnknownKey{Key: keystr}, err) {
-				t.Fatalf("unexpected get error for key %q: %v", keystr, err)
-			}
+			verifyGet(t, st, []byte(keystr), nil)
 		}
 	}
 	// Verify 10 random Scan() calls.
@@ -106,25 +94,12 @@
 			start, end = end, start
 		}
 		end++
-		stream, err := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
-		if err != nil {
-			t.Fatalf("can't create stream: %v", err)
-		}
-		for stream.Advance() {
-			start = s.lowerBound(start)
+		stream := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
+		for start = s.lowerBound(start); start < end; start = s.lowerBound(start + 1) {
 			keystr := fmt.Sprintf("%05d", start)
-			key, value = stream.Key(key), stream.Value(value)
-			if string(key) != keystr {
-				t.Fatalf("unexpected key during scan: got %q, want %q", key, keystr)
-			}
-			if !bytes.Equal(value, s.memtable[keystr]) {
-				t.Fatalf("unexpected value during scan: got %q, want %q", value, s.memtable[keystr])
-			}
-			start++
+			verifyAdvance(t, stream, []byte(keystr), s.memtable[keystr])
 		}
-		if start = s.lowerBound(start); start < end {
-			t.Fatalf("stream ended unexpectedly")
-		}
+		verifyAdvance(t, stream, nil, nil)
 	}
 }
 
@@ -192,85 +167,47 @@
 	runReadWriteTest(t, st, size, testcase)
 }
 
-// RunTransactionsWithGetTest tests transactions that use Put and Get
-// operations.
-// NOTE: consider setting GOMAXPROCS to something greater than 1.
-func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
-	// Invariant: value mapped to n is sum of values of 0..n-1.
-	// Each of k transactions takes m distinct random values from 0..n-1, adds 1
-	// to each and m to value mapped to n.
-	// The correctness of sums is checked after all transactions have been
-	// committed.
-	n, m, k := 10, 3, 100
-	for i := 0; i <= n; i++ {
-		if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
-			t.Fatalf("can't write to database")
-		}
+// RunStoreStateTest verifies operations that modify the state of a store.Store.
+func RunStoreStateTest(t *testing.T, st store.Store) {
+	key1, value1 := []byte("key1"), []byte("value1")
+	st.Put(key1, value1)
+	key2 := []byte("key2")
+
+	// Test Get and Scan.
+	verifyGet(t, st, key1, value1)
+	verifyGet(t, st, key2, nil)
+	s := st.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, key1, value1)
+	verifyAdvance(t, s, nil, nil)
+
+	// Test functions after Close.
+	if err := st.Close(); err != nil {
+		t.Fatalf("can't close the store: %v", err)
 	}
-	var wg sync.WaitGroup
-	wg.Add(k)
-	// TODO(sadovsky): This configuration creates huge resource contention.
-	// Perhaps we should add some random sleep's to reduce the contention.
-	for i := 0; i < k; i++ {
-		go func() {
-			rnd := rand.New(rand.NewSource(239017 * int64(i)))
-			perm := rnd.Perm(n)
-			if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-				for j := 0; j <= m; j++ {
-					var keystr string
-					if j < m {
-						keystr = fmt.Sprintf("%05d", perm[j])
-					} else {
-						keystr = fmt.Sprintf("%05d", n)
-					}
-					key := []byte(keystr)
-					val, err := st.Get(key, nil)
-					if err != nil {
-						return fmt.Errorf("can't get key %q: %v", key, err)
-					}
-					intValue, err := strconv.ParseInt(string(val), 10, 64)
-					if err != nil {
-						return fmt.Errorf("can't parse int from %q: %v", val, err)
-					}
-					var newValue int64
-					if j < m {
-						newValue = intValue + 1
-					} else {
-						newValue = intValue + int64(m)
-					}
-					if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
-						return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
-					}
-				}
-				return nil
-			}); err != nil {
-				panic(fmt.Errorf("can't commit transaction: %v", err))
-			}
-			wg.Done()
-		}()
+	expectedErr := "closed store"
+	if err := st.Close(); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
 	}
-	wg.Wait()
-	var sum int64
-	for j := 0; j <= n; j++ {
-		keystr := fmt.Sprintf("%05d", j)
-		key := []byte(keystr)
-		val, err := st.Get(key, nil)
-		if err != nil {
-			t.Fatalf("can't get key %q: %v", key, err)
-		}
-		intValue, err := strconv.ParseInt(string(val), 10, 64)
-		if err != nil {
-			t.Fatalf("can't parse int from %q: %v", val, err)
-		}
-		if j < n {
-			sum += intValue
-		} else {
-			if intValue != int64(m*k) {
-				t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
-			}
-		}
+	s = st.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, nil, nil)
+	if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
 	}
-	if sum != int64(m*k) {
-		t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
+	snapshot := st.NewSnapshot()
+	if _, err := snapshot.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	tx := st.NewTransaction()
+	if _, err := tx.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	if _, err := st.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	if err := st.Put(key1, value1); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	if err := st.Delete(key1); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
 	}
 }
diff --git a/services/syncbase/store/test/transaction.go b/services/syncbase/store/test/transaction.go
new file mode 100644
index 0000000..e078e49
--- /dev/null
+++ b/services/syncbase/store/test/transaction.go
@@ -0,0 +1,154 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+	"fmt"
+	"math/rand"
+	"strconv"
+	"strings"
+	"sync"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// RunTransactionTest verifies operations that modify the state of a
+// store.Transaction.
+func RunTransactionStateTest(t *testing.T, st store.Store) {
+	abortFunctions := []func(t *testing.T, tx store.Transaction) string{
+		func(t *testing.T, tx store.Transaction) string {
+			if err := tx.Abort(); err != nil {
+				t.Fatalf("can't abort the transaction: %v", err)
+			}
+			return "aborted transaction"
+		},
+		func(t *testing.T, tx store.Transaction) string {
+			if err := tx.Commit(); err != nil {
+				t.Fatalf("can't commit the transaction: %v", err)
+			}
+			return "committed transaction"
+		},
+	}
+	for _, fn := range abortFunctions {
+		key1, value1 := []byte("key1"), []byte("value1")
+		st.Put(key1, value1)
+		key2 := []byte("key2")
+		tx := st.NewTransaction()
+
+		// Test Get and Scan.
+		verifyGet(t, tx, key1, value1)
+		verifyGet(t, tx, key2, nil)
+		s := tx.Scan([]byte("a"), []byte("z"))
+		verifyAdvance(t, s, key1, value1)
+		verifyAdvance(t, s, nil, nil)
+
+		// Test functions after Abort.
+		expectedErr := fn(t, tx)
+		if err := tx.Abort(); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		if err := tx.Commit(); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		s = tx.Scan([]byte("a"), []byte("z"))
+		verifyAdvance(t, s, nil, nil)
+		if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		if _, err := tx.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		if err := tx.Put(key1, value1); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		if err := tx.Delete(key1); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+	}
+}
+
+// RunTransactionsWithGetTest tests transactions that use Put and Get
+// operations.
+// NOTE: consider setting GOMAXPROCS to something greater than 1.
+func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
+	// Invariant: value mapped to n is sum of values of 0..n-1.
+	// Each of k transactions takes m distinct random values from 0..n-1, adds 1
+	// to each and m to value mapped to n.
+	// The correctness of sums is checked after all transactions have been
+	// committed.
+	n, m, k := 10, 3, 100
+	for i := 0; i <= n; i++ {
+		if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
+			t.Fatalf("can't write to database")
+		}
+	}
+	var wg sync.WaitGroup
+	wg.Add(k)
+	// TODO(sadovsky): This configuration creates huge resource contention.
+	// Perhaps we should add some random sleep's to reduce the contention.
+	for i := 0; i < k; i++ {
+		go func() {
+			rnd := rand.New(rand.NewSource(239017 * int64(i)))
+			perm := rnd.Perm(n)
+			if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+				for j := 0; j <= m; j++ {
+					var keystr string
+					if j < m {
+						keystr = fmt.Sprintf("%05d", perm[j])
+					} else {
+						keystr = fmt.Sprintf("%05d", n)
+					}
+					key := []byte(keystr)
+					val, err := st.Get(key, nil)
+					if err != nil {
+						return fmt.Errorf("can't get key %q: %v", key, err)
+					}
+					intValue, err := strconv.ParseInt(string(val), 10, 64)
+					if err != nil {
+						return fmt.Errorf("can't parse int from %q: %v", val, err)
+					}
+					var newValue int64
+					if j < m {
+						newValue = intValue + 1
+					} else {
+						newValue = intValue + int64(m)
+					}
+					if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
+						return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
+					}
+				}
+				return nil
+			}); err != nil {
+				panic(fmt.Errorf("can't commit transaction: %v", err))
+			}
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+	var sum int64
+	for j := 0; j <= n; j++ {
+		keystr := fmt.Sprintf("%05d", j)
+		key := []byte(keystr)
+		val, err := st.Get(key, nil)
+		if err != nil {
+			t.Fatalf("can't get key %q: %v", key, err)
+		}
+		intValue, err := strconv.ParseInt(string(val), 10, 64)
+		if err != nil {
+			t.Fatalf("can't parse int from %q: %v", val, err)
+		}
+		if j < n {
+			sum += intValue
+		} else {
+			if intValue != int64(m*k) {
+				t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
+			}
+		}
+	}
+	if sum != int64(m*k) {
+		t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
+	}
+}
diff --git a/services/syncbase/store/test/util.go b/services/syncbase/store/test/util.go
new file mode 100644
index 0000000..3f8ca38
--- /dev/null
+++ b/services/syncbase/store/test/util.go
@@ -0,0 +1,68 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+	"bytes"
+	"reflect"
+	"runtime/debug"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// verifyGet verifies that st.Get(key) == value. If value is nil, verifies that
+// the key is not found.
+func verifyGet(t *testing.T, st store.StoreReader, key, value []byte) {
+	valbuf := []byte("tmp")
+	var err error
+	if value != nil {
+		if valbuf, err = st.Get(key, valbuf); err != nil {
+			Fatalf(t, "can't get value of %q: %v", key, err)
+		}
+		if !bytes.Equal(valbuf, value) {
+			Fatalf(t, "unexpected value: got %q, want %q", valbuf, value)
+		}
+	} else {
+		valbuf, err = st.Get(key, valbuf)
+		if !reflect.DeepEqual(&store.ErrUnknownKey{Key: string(key)}, err) {
+			Fatalf(t, "unexpected get error for key %q: %v", key, err)
+		}
+		valcopy := []byte("tmp")
+		// Verify that valbuf is not modified if the key is not found.
+		if !bytes.Equal(valbuf, valcopy) {
+			Fatalf(t, "unexpected value: got %q, want %q", valbuf, valcopy)
+		}
+	}
+}
+
+// verifyGet verifies the next key/value pair of the provided stream.
+// If key is nil, verifies that next Advance call on the stream returns false.
+func verifyAdvance(t *testing.T, s store.Stream, key, value []byte) {
+	ok := s.Advance()
+	if key == nil {
+		if ok {
+			Fatalf(t, "advance returned true unexpectedly")
+		}
+		return
+	}
+	if !ok {
+		Fatalf(t, "can't advance the stream")
+	}
+	var k, v []byte
+	for i := 0; i < 2; i++ {
+		if k = s.Key(k); !bytes.Equal(k, key) {
+			Fatalf(t, "unexpected key: got %q, want %q", k, key)
+		}
+		if v = s.Value(v); !bytes.Equal(v, value) {
+			Fatalf(t, "unexpected value: got %q, want %q", v, value)
+		}
+	}
+}
+
+func Fatalf(t *testing.T, format string, args ...interface{}) {
+	debug.PrintStack()
+	t.Fatalf(format, args...)
+}
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
index d36b260..82a8350 100644
--- a/services/syncbase/store/util.go
+++ b/services/syncbase/store/util.go
@@ -25,6 +25,7 @@
 // CopyBytes copies elements from a source slice into a destination slice.
 // The returned slice may be a sub-slice of dst if dst was large enough to hold
 // src. Otherwise, a newly allocated slice will be returned.
+// TODO(rogulenko): add some tests.
 func CopyBytes(dst, src []byte) []byte {
 	if cap(dst) < len(src) {
 		newlen := cap(dst)*2 + 2
@@ -33,6 +34,7 @@
 		}
 		dst = make([]byte, newlen)
 	}
+	dst = dst[:len(src)]
 	copy(dst, src)
-	return dst[:len(src)]
+	return dst
 }