syncbase.store: memstore stream and snapshot

Change-Id: I10bd79f30838e6a0ab04dec2dc455f419f083b6f
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index 57556c1..b15374d 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -2,6 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
+// Package leveldb provides a LevelDB-based implementation of store.Store.
 package leveldb
 
 // #cgo LDFLAGS: -lleveldb
@@ -16,9 +17,9 @@
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
-// DB is a wrapper around LevelDB that implements the store.Store interface.
+// db is a wrapper around LevelDB that implements the store.Store interface.
 // TODO(rogulenko): ensure thread safety.
-type DB struct {
+type db struct {
 	cDb *C.leveldb_t
 	// Default read/write options.
 	readOptions  *C.leveldb_readoptions_t
@@ -28,11 +29,11 @@
 	mu sync.Mutex
 }
 
-var _ store.Store = (*DB)(nil)
+var _ store.Store = (*db)(nil)
 
 // Open opens the database located at the given path, creating it if it doesn't
 // exist.
-func Open(path string) (*DB, error) {
+func Open(path string) (store.Store, error) {
 	var cError *C.char
 	cPath := C.CString(path)
 	defer C.free(unsafe.Pointer(cPath))
@@ -48,7 +49,7 @@
 	}
 	readOptions := C.leveldb_readoptions_create()
 	C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
-	return &DB{
+	return &db{
 		cDb:          cDb,
 		readOptions:  readOptions,
 		writeOptions: C.leveldb_writeoptions_create(),
@@ -56,10 +57,10 @@
 }
 
 // Close implements the store.Store interface.
-func (db *DB) Close() error {
-	C.leveldb_close(db.cDb)
-	C.leveldb_readoptions_destroy(db.readOptions)
-	C.leveldb_writeoptions_destroy(db.writeOptions)
+func (d *db) Close() error {
+	C.leveldb_close(d.cDb)
+	C.leveldb_readoptions_destroy(d.readOptions)
+	C.leveldb_writeoptions_destroy(d.writeOptions)
 	return nil
 }
 
@@ -74,49 +75,49 @@
 	return goError(cError)
 }
 
-// NewTransaction implements the store.Store interface.
-func (db *DB) NewTransaction() store.Transaction {
-	return newTransaction(db)
-}
-
-// NewSnapshot implements the store.Store interface.
-func (db *DB) NewSnapshot() store.Snapshot {
-	return newSnapshot(db)
+// Get implements the store.StoreReader interface.
+func (d *db) Get(key, valbuf []byte) ([]byte, error) {
+	return d.getWithOpts(key, valbuf, d.readOptions)
 }
 
 // Scan implements the store.StoreReader interface.
-func (db *DB) Scan(start, end []byte) (store.Stream, error) {
-	return newStream(db, start, end, db.readOptions), nil
-}
-
-// Get implements the store.StoreReader interface.
-func (db *DB) Get(key, valbuf []byte) ([]byte, error) {
-	return db.getWithOpts(key, valbuf, db.readOptions)
+func (d *db) Scan(start, end []byte) (store.Stream, error) {
+	return newStream(d, start, end, d.readOptions), nil
 }
 
 // Put implements the store.StoreWriter interface.
-func (db *DB) Put(key, value []byte) error {
+func (d *db) Put(key, value []byte) error {
 	// TODO(rogulenko): improve performance.
-	return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(d, func(st store.StoreReadWriter) error {
 		return st.Put(key, value)
 	})
 }
 
 // Delete implements the store.StoreWriter interface.
-func (db *DB) Delete(key []byte) error {
+func (d *db) Delete(key []byte) error {
 	// TODO(rogulenko): improve performance.
-	return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
+	return store.RunInTransaction(d, func(st store.StoreReadWriter) error {
 		return st.Delete(key)
 	})
 }
 
+// NewTransaction implements the store.Store interface.
+func (d *db) NewTransaction() store.Transaction {
+	return newTransaction(d)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (d *db) NewSnapshot() store.Snapshot {
+	return newSnapshot(d)
+}
+
 // getWithOpts returns the value for the given key.
 // cOpts may contain a pointer to a snapshot.
-func (db *DB) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
 	var cError *C.char
 	var valLen C.size_t
 	cStr, cLen := cSlice(key)
-	val := C.leveldb_get(db.cDb, cOpts, cStr, cLen, &valLen, &cError)
+	val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError)
 	if err := goError(cError); err != nil {
 		return nil, err
 	}
@@ -124,5 +125,5 @@
 		return nil, &store.ErrUnknownKey{Key: string(key)}
 	}
 	defer C.leveldb_free(unsafe.Pointer(val))
-	return copyAll(valbuf, goBytes(val, valLen)), nil
+	return store.CopyBytes(valbuf, goBytes(val, valLen)), nil
 }
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
index d9801cf..c076a4e 100644
--- a/services/syncbase/store/leveldb/db_test.go
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -10,6 +10,7 @@
 	"runtime"
 	"testing"
 
+	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/test"
 )
 
@@ -18,38 +19,38 @@
 }
 
 func TestReadWriteBasic(t *testing.T) {
-	db, dbPath := newDB()
-	defer destroyDB(db, dbPath)
-	test.RunReadWriteBasicTest(t, db)
+	st, path := newDB()
+	defer destroyDB(st, path)
+	test.RunReadWriteBasicTest(t, st)
 }
 
 func TestReadWriteRandom(t *testing.T) {
-	db, dbPath := newDB()
-	defer destroyDB(db, dbPath)
-	test.RunReadWriteRandomTest(t, db)
+	st, path := newDB()
+	defer destroyDB(st, path)
+	test.RunReadWriteRandomTest(t, st)
 }
 
 func TestTransactionsWithGet(t *testing.T) {
-	db, dbPath := newDB()
-	defer destroyDB(db, dbPath)
-	test.RunTransactionsWithGetTest(t, db)
+	st, path := newDB()
+	defer destroyDB(st, path)
+	test.RunTransactionsWithGetTest(t, st)
 }
 
-func newDB() (*DB, string) {
-	dbPath, err := ioutil.TempDir("", "syncbase_leveldb")
+func newDB() (store.Store, string) {
+	path, err := ioutil.TempDir("", "syncbase_leveldb")
 	if err != nil {
 		panic(fmt.Sprintf("can't create temp dir: %v", err))
 	}
-	db, err := Open(dbPath)
+	st, err := Open(path)
 	if err != nil {
-		panic(fmt.Sprintf("can't open db in %v: %v", dbPath, err))
+		panic(fmt.Sprintf("can't open db at %v: %v", path, err))
 	}
-	return db, dbPath
+	return st, path
 }
 
-func destroyDB(db *DB, dbPath string) {
-	db.Close()
-	if err := Destroy(dbPath); err != nil {
-		panic(fmt.Sprintf("can't destroy db located in %v: %v", dbPath, err))
+func destroyDB(st store.Store, path string) {
+	st.Close()
+	if err := Destroy(path); err != nil {
+		panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
 	}
 }
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index eda5802..afa9f25 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -13,20 +13,20 @@
 // snapshot is a wrapper around LevelDB snapshot that implements
 // the store.Snapshot interface.
 type snapshot struct {
-	db        *DB
+	d         *db
 	cSnapshot *C.leveldb_snapshot_t
 	cOpts     *C.leveldb_readoptions_t
 }
 
 var _ store.Snapshot = (*snapshot)(nil)
 
-func newSnapshot(db *DB) *snapshot {
-	cSnapshot := C.leveldb_create_snapshot(db.cDb)
+func newSnapshot(d *db) *snapshot {
+	cSnapshot := C.leveldb_create_snapshot(d.cDb)
 	cOpts := C.leveldb_readoptions_create()
 	C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
 	C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
 	return &snapshot{
-		db,
+		d,
 		cSnapshot,
 		cOpts,
 	}
@@ -35,16 +35,16 @@
 // Close implements the store.Snapshot interface.
 func (s *snapshot) Close() error {
 	C.leveldb_readoptions_destroy(s.cOpts)
-	C.leveldb_release_snapshot(s.db.cDb, s.cSnapshot)
+	C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
 	return nil
 }
 
-// Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
-	return newStream(s.db, start, end, s.cOpts), nil
-}
-
 // Get implements the store.StoreReader interface.
 func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
-	return s.db.getWithOpts(key, valbuf, s.cOpts)
+	return s.d.getWithOpts(key, valbuf, s.cOpts)
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
+	return newStream(s.d, start, end, s.cOpts), nil
 }
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index f1bddf7..c19bfa5 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -12,7 +12,6 @@
 	"errors"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/x/lib/vlog"
 )
 
 // stream is a wrapper around LevelDB iterator that implements
@@ -22,15 +21,15 @@
 	cIter *C.syncbase_leveldb_iterator_t
 	end   []byte
 
-	advancedOnce bool
-	err          error
+	hasAdvanced bool
+	err         error
 }
 
 var _ store.Stream = (*stream)(nil)
 
-func newStream(db *DB, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
+func newStream(d *db, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
 	cStr, size := cSlice(start)
-	cIter := C.syncbase_leveldb_create_iterator(db.cDb, cOpts, cStr, size)
+	cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
 	return &stream{
 		cIter: cIter,
 		end:   end,
@@ -47,10 +46,10 @@
 	if it.cIter == nil {
 		return false
 	}
-	// C iterator is already initialized after creation; we shouldn't move
-	// it during first Advance() call.
-	if !it.advancedOnce {
-		it.advancedOnce = true
+	// The C iterator starts out initialized, pointing at the first value; we
+	// shouldn't move it during the first Advance() call.
+	if !it.hasAdvanced {
+		it.hasAdvanced = true
 	} else {
 		C.syncbase_leveldb_iter_next(it.cIter)
 	}
@@ -67,24 +66,24 @@
 
 // Key implements the store.Stream interface.
 func (it *stream) Key(keybuf []byte) []byte {
-	if !it.advancedOnce {
-		vlog.Fatal("stream has never been advanced")
+	if !it.hasAdvanced {
+		panic("stream has never been advanced")
 	}
 	if it.cIter == nil {
-		vlog.Fatal("illegal state")
+		panic("illegal state")
 	}
-	return copyAll(keybuf, it.cKey())
+	return store.CopyBytes(keybuf, it.cKey())
 }
 
 // Value implements the store.Stream interface.
 func (it *stream) Value(valbuf []byte) []byte {
-	if !it.advancedOnce {
-		vlog.Fatal("stream has never been advanced")
+	if !it.hasAdvanced {
+		panic("stream has never been advanced")
 	}
 	if it.cIter == nil {
-		vlog.Fatal("illegal state")
+		panic("illegal state")
 	}
-	return copyAll(valbuf, it.cVal())
+	return store.CopyBytes(valbuf, it.cVal())
 }
 
 // Err implements the store.Stream interface.
@@ -101,14 +100,16 @@
 	it.destroyLeveldbIter()
 }
 
-// cKey returns the key. The key points to buffer allocated on C heap.
-// The data is valid until the next call to Advance or Cancel.
+// cKey returns the current key.
+// The returned []byte points to a buffer allocated on the C heap. This buffer
+// is valid until the next call to Advance or Cancel.
 func (it *stream) cKey() []byte {
 	return goBytes(it.cIter.key, it.cIter.key_len)
 }
 
-// cVal returns the value. The value points to buffer allocated on C heap.
-// The data is valid until the next call to Advance or Cancel.
+// cVal returns the current value.
+// The returned []byte points to a buffer allocated on the C heap. This buffer
+// is valid until the next call to Advance or Cancel.
 func (it *stream) cVal() []byte {
 	return goBytes(it.cIter.val, it.cIter.val_len)
 }
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index 6aa079d..d358f51 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -14,7 +14,7 @@
 // the store.Transaction interface.
 // TODO(rogulenko): handle incorrect usage.
 type transaction struct {
-	db       *DB
+	d        *db
 	snapshot store.Snapshot
 	batch    *C.leveldb_writebatch_t
 	cOpts    *C.leveldb_writeoptions_t
@@ -22,62 +22,45 @@
 
 var _ store.Transaction = (*transaction)(nil)
 
-func newTransaction(db *DB) *transaction {
+func newTransaction(d *db) *transaction {
 	// The lock is held until the transaction is successfully
 	// committed or aborted.
-	db.mu.Lock()
+	d.mu.Lock()
 	return &transaction{
-		db,
-		db.NewSnapshot(),
+		d,
+		d.NewSnapshot(),
 		C.leveldb_writebatch_create(),
-		db.writeOptions,
+		d.writeOptions,
 	}
 }
 
 // close frees allocated C objects and releases acquired locks.
 func (tx *transaction) close() {
-	tx.db.mu.Unlock()
+	tx.d.mu.Unlock()
 	tx.snapshot.Close()
 	C.leveldb_writebatch_destroy(tx.batch)
-	if tx.cOpts != tx.db.writeOptions {
+	if tx.cOpts != tx.d.writeOptions {
 		C.leveldb_writeoptions_destroy(tx.cOpts)
 	}
 }
 
-// Abort implements the store.Transaction interface.
-func (tx *transaction) Abort() error {
-	tx.close()
-	return nil
-}
-
-// Commit implements the store.Transaction interface.
-func (tx *transaction) Commit() error {
-	var cError *C.char
-	C.leveldb_write(tx.db.cDb, tx.cOpts, tx.batch, &cError)
-	if err := goError(cError); err != nil {
-		return err
-	}
-	tx.close()
-	return nil
-}
-
 // ResetForRetry implements the store.Transaction interface.
 func (tx *transaction) ResetForRetry() {
 	tx.snapshot.Close()
-	tx.snapshot = tx.db.NewSnapshot()
+	tx.snapshot = tx.d.NewSnapshot()
 	C.leveldb_writebatch_clear(tx.batch)
 }
 
-// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
-	return tx.snapshot.Scan(start, end)
-}
-
 // Get implements the store.StoreReader interface.
 func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
 	return tx.snapshot.Get(key, valbuf)
 }
 
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+	return tx.snapshot.Scan(start, end)
+}
+
 // Put implements the store.StoreWriter interface.
 func (tx *transaction) Put(key, value []byte) error {
 	cKey, cKeyLen := cSlice(key)
@@ -92,3 +75,20 @@
 	C.leveldb_writebatch_delete(tx.batch, cKey, cKeyLen)
 	return nil
 }
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+	var cError *C.char
+	C.leveldb_write(tx.d.cDb, tx.cOpts, tx.batch, &cError)
+	if err := goError(cError); err != nil {
+		return err
+	}
+	tx.close()
+	return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+	tx.close()
+	return nil
+}
diff --git a/services/syncbase/store/leveldb/util.go b/services/syncbase/store/leveldb/util.go
index 333743f..af08336 100644
--- a/services/syncbase/store/leveldb/util.go
+++ b/services/syncbase/store/leveldb/util.go
@@ -23,9 +23,8 @@
 }
 
 // cSlice converts Go []byte to C string without copying the data.
-//
-// This function behaves similarly to standard Go slice copying or sub-slicing;
-// the caller need not worry about ownership or garbage collection.
+// This function behaves similarly to standard Go slice copying or sub-slicing,
+// in that the caller need not worry about ownership or garbage collection.
 func cSlice(str []byte) (*C.char, C.size_t) {
 	if len(str) == 0 {
 		return nil, 0
@@ -44,18 +43,3 @@
 	})
 	return *(*[]byte)(ptr)
 }
-
-// copyAll copies elements from a source slice into a destination slice.
-// The returned slice may be a sub-slice of dst if dst was large enough to hold
-// src. Otherwise, a newly allocated slice will be returned.
-func copyAll(dst, src []byte) []byte {
-	if cap(dst) < len(src) {
-		newlen := cap(dst)*2 + 2
-		if newlen < len(src) {
-			newlen = len(src)
-		}
-		dst = make([]byte, newlen)
-	}
-	copy(dst, src)
-	return dst[:len(src)]
-}
diff --git a/services/syncbase/store/memstore/memstore.go b/services/syncbase/store/memstore/memstore.go
deleted file mode 100644
index 99c551e..0000000
--- a/services/syncbase/store/memstore/memstore.go
+++ /dev/null
@@ -1,207 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package memstore provides a simple, in-memory implementation of
-// store.TransactableStore. Since it's a prototype implementation, it makes no
-// attempt to be performant.
-package memstore
-
-import (
-	"errors"
-	"sync"
-	"time"
-
-	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/x/lib/vlog"
-)
-
-var (
-	txnTimeout       = time.Duration(5) * time.Second
-	errExpiredTxn    = errors.New("expired transaction")
-	errCommittedTxn  = errors.New("committed transaction")
-	errAbortedTxn    = errors.New("aborted transaction")
-	errConcurrentTxn = errors.New("concurrent transaction")
-)
-
-type transaction struct {
-	st *memstore
-	// The following fields are used to determine whether method calls should
-	// error out.
-	err         error
-	seq         uint64
-	createdTime time.Time
-	// The following fields track writes performed against this transaction.
-	puts    map[string][]byte
-	deletes map[string]struct{}
-}
-
-var _ store.Transaction = (*transaction)(nil)
-
-type memstore struct {
-	mu   sync.Mutex
-	data map[string][]byte
-	// Most recent sequence number handed out.
-	lastSeq uint64
-	// Value of lastSeq at the time of the most recent commit.
-	lastCommitSeq uint64
-}
-
-var _ store.Store = (*memstore)(nil)
-
-func New() store.Store {
-	return &memstore{data: map[string][]byte{}}
-}
-
-////////////////////////////////////////
-// transaction methods
-
-func newTxn(st *memstore, seq uint64) *transaction {
-	return &transaction{
-		st:          st,
-		seq:         seq,
-		createdTime: time.Now(),
-		puts:        map[string][]byte{},
-		deletes:     map[string]struct{}{},
-	}
-}
-
-func (tx *transaction) expired() bool {
-	return time.Now().After(tx.createdTime.Add(txnTimeout))
-}
-
-func (tx *transaction) checkError() error {
-	if tx.err != nil {
-		return tx.err
-	}
-	if tx.expired() {
-		return errExpiredTxn
-	}
-	if tx.seq <= tx.st.lastCommitSeq {
-		return errConcurrentTxn
-	}
-	return nil
-}
-
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
-	vlog.Fatal("not implemented")
-	return nil, nil
-}
-
-func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	if err := tx.checkError(); err != nil {
-		return nil, err
-	}
-	v, ok := tx.st.data[string(key)]
-	if !ok {
-		return nil, &store.ErrUnknownKey{Key: string(key)}
-	}
-	return v, nil
-}
-
-func (tx *transaction) Put(key, value []byte) error {
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	if err := tx.checkError(); err != nil {
-		return err
-	}
-	delete(tx.deletes, string(key))
-	tx.puts[string(key)] = value
-	return nil
-}
-
-func (tx *transaction) Delete(key []byte) error {
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	if err := tx.checkError(); err != nil {
-		return err
-	}
-	delete(tx.puts, string(key))
-	tx.deletes[string(key)] = struct{}{}
-	return nil
-}
-
-func (tx *transaction) Commit() error {
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	if err := tx.checkError(); err != nil {
-		return err
-	}
-	tx.err = errCommittedTxn
-	for k, v := range tx.puts {
-		tx.st.data[k] = v
-	}
-	for k := range tx.deletes {
-		delete(tx.st.data, k)
-	}
-	tx.st.lastCommitSeq = tx.st.lastSeq
-	return nil
-}
-
-func (tx *transaction) Abort() error {
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	if err := tx.checkError(); err != nil {
-		return err
-	}
-	tx.err = errAbortedTxn
-	return nil
-}
-
-func (tx *transaction) ResetForRetry() {
-	tx.puts = make(map[string][]byte)
-	tx.deletes = make(map[string]struct{})
-	tx.err = nil
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	tx.st.lastSeq++
-	tx.seq = tx.st.lastSeq
-}
-
-////////////////////////////////////////
-// memstore methods
-
-func (st *memstore) Scan(start, end []byte) (store.Stream, error) {
-	vlog.Fatal("not implemented")
-	return nil, nil
-}
-
-func (st *memstore) Get(key, valbuf []byte) ([]byte, error) {
-	st.mu.Lock()
-	defer st.mu.Unlock()
-	v, ok := st.data[string(key)]
-	if !ok {
-		return nil, &store.ErrUnknownKey{Key: string(key)}
-	}
-	return v, nil
-}
-
-func (st *memstore) Put(key, value []byte) error {
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Put(key, value)
-	})
-}
-
-func (st *memstore) Delete(key []byte) error {
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Delete(key)
-	})
-}
-
-func (st *memstore) NewTransaction() store.Transaction {
-	st.mu.Lock()
-	defer st.mu.Unlock()
-	st.lastSeq++
-	return newTxn(st, st.lastSeq)
-}
-
-func (st *memstore) NewSnapshot() store.Snapshot {
-	vlog.Fatal("not implemented")
-	return nil
-}
-
-func (st *memstore) Close() error {
-	return nil
-}
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
new file mode 100644
index 0000000..180cd14
--- /dev/null
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -0,0 +1,72 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package memstore
+
+import (
+	"errors"
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+var (
+	errClosedSnapshot = errors.New("closed snapshot")
+)
+
+type snapshot struct {
+	mu   sync.Mutex
+	data map[string][]byte
+	err  error
+}
+
+var _ store.Snapshot = (*snapshot)(nil)
+
+// Assumes st lock is held.
+func newSnapshot(st *memstore) *snapshot {
+	dataCopy := map[string][]byte{}
+	for k, v := range st.data {
+		dataCopy[k] = v
+	}
+	return &snapshot{data: dataCopy}
+}
+
+func (s *snapshot) error() error {
+	return s.err
+}
+
+// Close implements the store.Snapshot interface.
+func (s *snapshot) Close() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if err := s.error(); err != nil {
+		return err
+	}
+	s.err = errClosedSnapshot
+	return nil
+}
+
+// Get implements the store.StoreReader interface.
+func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if err := s.error(); err != nil {
+		return nil, err
+	}
+	value, ok := s.data[string(key)]
+	if !ok {
+		return nil, &store.ErrUnknownKey{Key: string(key)}
+	}
+	return store.CopyBytes(valbuf, value), nil
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if err := s.error(); err != nil {
+		return nil, err
+	}
+	return newStream(s, start, end), nil
+}
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
new file mode 100644
index 0000000..36a0758
--- /dev/null
+++ b/services/syncbase/store/memstore/store.go
@@ -0,0 +1,82 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package memstore provides a simple, in-memory implementation of store.Store.
+// Since it's a prototype implementation, it makes no attempt to be performant.
+package memstore
+
+import (
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type memstore struct {
+	mu   sync.Mutex
+	data map[string][]byte
+	// Most recent sequence number handed out.
+	lastSeq uint64
+	// Value of lastSeq at the time of the most recent commit.
+	lastCommitSeq uint64
+}
+
+var _ store.Store = (*memstore)(nil)
+
+// New creates a new memstore.
+func New() store.Store {
+	return &memstore{data: map[string][]byte{}}
+}
+
+// Close implements the store.Store interface.
+func (st *memstore) Close() error {
+	// TODO(sadovsky): Make all subsequent method calls and stream advances fail.
+	return nil
+}
+
+// Get implements the store.StoreReader interface.
+func (st *memstore) Get(key, valbuf []byte) ([]byte, error) {
+	st.mu.Lock()
+	defer st.mu.Unlock()
+	value, ok := st.data[string(key)]
+	if !ok {
+		return nil, &store.ErrUnknownKey{Key: string(key)}
+	}
+	return store.CopyBytes(valbuf, value), nil
+}
+
+// Scan implements the store.StoreReader interface.
+func (st *memstore) Scan(start, end []byte) (store.Stream, error) {
+	st.mu.Lock()
+	defer st.mu.Unlock()
+	return newStream(newSnapshot(st), start, end), nil
+}
+
+// Put implements the store.StoreWriter interface.
+func (st *memstore) Put(key, value []byte) error {
+	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+		return st.Put(key, value)
+	})
+}
+
+// Delete implements the store.StoreWriter interface.
+func (st *memstore) Delete(key []byte) error {
+	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+		return st.Delete(key)
+	})
+}
+
+// NewTransaction implements the store.Store interface.
+func (st *memstore) NewTransaction() store.Transaction {
+	st.mu.Lock()
+	defer st.mu.Unlock()
+	st.lastSeq++
+	return newTransaction(st, st.lastSeq)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (st *memstore) NewSnapshot() store.Snapshot {
+	st.mu.Lock()
+	defer st.mu.Unlock()
+	return newSnapshot(st)
+}
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
new file mode 100644
index 0000000..d6b0371
--- /dev/null
+++ b/services/syncbase/store/memstore/store_test.go
@@ -0,0 +1,36 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package memstore
+
+import (
+	"runtime"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store/test"
+)
+
+func init() {
+	runtime.GOMAXPROCS(10)
+}
+
+func TestReadWriteBasic(t *testing.T) {
+	st := New()
+	defer st.Close()
+	test.RunReadWriteBasicTest(t, st)
+}
+
+func TestReadWriteRandom(t *testing.T) {
+	st := New()
+	defer st.Close()
+	test.RunReadWriteRandomTest(t, st)
+}
+
+func TestTransactionsWithGet(t *testing.T) {
+	st := New()
+	defer st.Close()
+	// TODO(sadovsky): Enable this test once we've added a retry loop to
+	// RunInTransaction. Without that, concurrency makes the test fail.
+	//test.RunTransactionsWithGetTest(t, st)
+}
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
new file mode 100644
index 0000000..1de276a
--- /dev/null
+++ b/services/syncbase/store/memstore/stream.go
@@ -0,0 +1,93 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package memstore
+
+import (
+	"sort"
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
+)
+
+type stream struct {
+	mu        sync.Mutex
+	sn        *snapshot
+	keys      []string
+	currIndex int
+	currKey   *string
+	err       error
+}
+
+var _ store.Stream = (*stream)(nil)
+
+func newStream(sn *snapshot, start, end []byte) *stream {
+	keys := []string{}
+	for k := range sn.data {
+		if k >= string(start) && k < string(end) {
+			keys = append(keys, k)
+		}
+	}
+	sort.Strings(keys)
+	return &stream{
+		sn:        sn,
+		keys:      keys,
+		currIndex: -1,
+	}
+}
+
+// Advance implements the store.Stream interface.
+func (s *stream) Advance() bool {
+	// TODO(sadovsky): Advance should return false and Err should return a non-nil
+	// error if the Store was closed, or if the Snapshot was closed, or if the
+	// Transaction was committed or aborted (or timed out).
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.err != nil {
+		s.currKey = nil
+	} else {
+		s.currIndex++
+		if s.currIndex < len(s.keys) {
+			s.currKey = &s.keys[s.currIndex]
+		} else {
+			s.currKey = nil
+		}
+	}
+	return s.currKey != nil
+}
+
+// Key implements the store.Stream interface.
+func (s *stream) Key(keybuf []byte) []byte {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.currKey == nil {
+		panic("nothing staged")
+	}
+	return store.CopyBytes(keybuf, []byte(*s.currKey))
+}
+
+// Value implements the store.Stream interface.
+func (s *stream) Value(valbuf []byte) []byte {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.currKey == nil {
+		panic("nothing staged")
+	}
+	return store.CopyBytes(valbuf, s.sn.data[*s.currKey])
+}
+
+// Err implements the store.Stream interface.
+func (s *stream) Err() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.err
+}
+
+// Cancel implements the store.Stream interface.
+func (s *stream) Cancel() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.err = verror.New(verror.ErrAborted, nil)
+}
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
new file mode 100644
index 0000000..3ef45aa
--- /dev/null
+++ b/services/syncbase/store/memstore/transaction.go
@@ -0,0 +1,159 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package memstore
+
+import (
+	"errors"
+	"sync"
+	"time"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+var (
+	txnTimeout         = time.Duration(5) * time.Second
+	errExpiredTxn      = errors.New("expired transaction")
+	errCommittedTxn    = errors.New("committed transaction")
+	errAbortedTxn      = errors.New("aborted transaction")
+	errAttemptedCommit = errors.New("already attempted to commit transaction")
+)
+
+type transaction struct {
+	mu sync.Mutex
+	st *memstore
+	sn *snapshot
+	// The following fields are used to determine whether method calls should
+	// error out.
+	err         error
+	seq         uint64
+	createdTime time.Time
+	// The following fields track writes performed against this transaction.
+	puts    map[string][]byte
+	deletes map[string]struct{}
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(st *memstore, seq uint64) *transaction {
+	return &transaction{
+		st:          st,
+		sn:          newSnapshot(st),
+		seq:         seq,
+		createdTime: time.Now(),
+		puts:        map[string][]byte{},
+		deletes:     map[string]struct{}{},
+	}
+}
+
+func (tx *transaction) expired() bool {
+	return time.Now().After(tx.createdTime.Add(txnTimeout))
+}
+
+func (tx *transaction) error() error {
+	if tx.err != nil {
+		return tx.err
+	}
+	if tx.expired() {
+		return errExpiredTxn
+	}
+	return nil
+}
+
+// ResetForRetry implements the store.Transaction interface.
+func (tx *transaction) ResetForRetry() {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	tx.puts = make(map[string][]byte)
+	tx.deletes = make(map[string]struct{})
+	tx.err = nil
+	tx.st.mu.Lock()
+	defer tx.st.mu.Unlock() // note, defer is last-in-first-out
+	tx.st.lastSeq++
+	tx.seq = tx.st.lastSeq
+	tx.sn = newSnapshot(tx.st)
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if err := tx.error(); err != nil {
+		return nil, err
+	}
+	return tx.sn.Get(key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if err := tx.error(); err != nil {
+		return nil, err
+	}
+	return newStream(tx.sn, start, end), nil
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key, value []byte) error {
+	tx.st.mu.Lock()
+	defer tx.st.mu.Unlock()
+	if err := tx.error(); err != nil {
+		return err
+	}
+	delete(tx.deletes, string(key))
+	tx.puts[string(key)] = value
+	return nil
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key []byte) error {
+	tx.st.mu.Lock()
+	defer tx.st.mu.Unlock()
+	if err := tx.error(); err != nil {
+		return err
+	}
+	delete(tx.puts, string(key))
+	tx.deletes[string(key)] = struct{}{}
+	return nil
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if err := tx.error(); err != nil {
+		return err
+	}
+	tx.sn.Close()
+	tx.st.mu.Lock()
+	defer tx.st.mu.Unlock() // note, defer is last-in-first-out
+	if tx.seq <= tx.st.lastCommitSeq {
+		// Once Commit() has failed with store.ErrConcurrentTransaction, subsequent
+		// ops on the transaction will fail with errAttemptedCommit.
+		tx.err = errAttemptedCommit
+		return &store.ErrConcurrentTransaction{}
+	}
+	tx.err = errCommittedTxn
+	for k, v := range tx.puts {
+		tx.st.data[k] = v
+	}
+	for k := range tx.deletes {
+		delete(tx.st.data, k)
+	}
+	tx.st.lastCommitSeq = tx.st.lastSeq
+	return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if err := tx.error(); err != nil {
+		return err
+	}
+	tx.sn.Close()
+	tx.err = errAbortedTxn
+	return nil
+}
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index ee02ac3..926b66d 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -8,15 +8,15 @@
 
 // StoreReader reads data from a CRUD-capable storage engine.
 type StoreReader interface {
-	// Scan returns all rows with keys in range [start, end).
-	Scan(start, end []byte) (Stream, error)
-
 	// Get returns the value for the given key. The returned slice may be a
 	// sub-slice of valbuf if valbuf was large enough to hold the entire value.
-	// Otherwise, a newly allocated slice will be returned. It is valid to pass
-	// a nil valbuf.
+	// Otherwise, a newly allocated slice will be returned. It is valid to pass a
+	// nil valbuf.
 	// Fails if the given key is unknown (ErrUnknownKey).
 	Get(key, valbuf []byte) ([]byte, error)
+
+	// Scan returns all rows with keys in range [start, end).
+	Scan(start, end []byte) (Stream, error)
 }
 
 // StoreWriter writes data to a CRUD-capable storage engine.
@@ -39,6 +39,9 @@
 type Store interface {
 	StoreReadWriter
 
+	// Close closes the store.
+	Close() error
+
 	// NewTransaction creates a transaction.
 	// TODO(rogulenko): add transaction options.
 	NewTransaction() Transaction
@@ -46,24 +49,19 @@
 	// NewSnapshot creates a snapshot.
 	// TODO(rogulenko): add snapshot options.
 	NewSnapshot() Snapshot
-
-	// Close closes the store.
-	Close() error
 }
 
 // Transaction provides a mechanism for atomic reads and writes.
-// Reads don't reflect writes performed inside this transaction. (This
-// limitation is imposed for API parity with Spanner.)
+//
+// Reads don't reflect writes performed within this transaction.
 // Once a transaction has been committed or aborted, subsequent method calls
 // will fail with no effect.
-//
-// Operations on a transaction may start failing with ErrConcurrentTransaction
-// if writes from a newly-committed transaction conflict with reads or writes
-// from this transaction.
 type Transaction interface {
 	StoreReadWriter
 
 	// Commit commits the transaction.
+	// Fails if writes from outside this transaction conflict with reads from
+	// within this transaction.
 	Commit() error
 
 	// Abort aborts the transaction.
@@ -75,9 +73,9 @@
 }
 
 // Snapshot is a handle to particular state in time of a Store.
-// All read operations are executed against a consistent snapshot of Store
-// commit history. Snapshots don't acquire locks and thus don't block
-// transactions.
+//
+// All read operations are executed against a consistent view of Store commit
+// history. Snapshots don't acquire locks and thus don't block transactions.
 type Snapshot interface {
 	StoreReader
 
@@ -124,24 +122,6 @@
 	Cancel()
 }
 
-// TODO(sadovsky): Add retry loop.
-func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
-	tx := st.NewTransaction()
-	if err := fn(tx); err != nil {
-		tx.Abort()
-		return err
-	}
-	if err := tx.Commit(); err != nil {
-		// TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
-		// failure or ErrConcurrentTransaction. Depending on the cause of failure,
-		// it may be desirable to retry the Commit() and/or to call Abort(). For
-		// now, we always abort on a failed commit.
-		tx.Abort()
-		return err
-	}
-	return nil
-}
-
 type ErrConcurrentTransaction struct{}
 
 func (e *ErrConcurrentTransaction) Error() string {
diff --git a/services/syncbase/store/test/test.go b/services/syncbase/store/test/test.go
index 6f46d50..0643bae 100644
--- a/services/syncbase/store/test/test.go
+++ b/services/syncbase/store/test/test.go
@@ -38,26 +38,25 @@
 	return res
 }
 
-// dbState is the in-memory representation of the database state.
-type dbState struct {
-	// We assume that the database has keys from range [0..dbSize).
-	dbSize   int
+// storeState is the in-memory representation of the store state.
+type storeState struct {
+	// We assume that the database has keys [0..size).
+	size     int
 	rnd      *rand.Rand
 	memtable map[string][]byte
 }
 
-func newDBState(dbSize int) *dbState {
-	s := &dbState{
-		dbSize,
+func newStoreState(size int) *storeState {
+	return &storeState{
+		size,
 		rand.New(rand.NewSource(239017)),
 		make(map[string][]byte),
 	}
-	return s
 }
 
-func (s *dbState) clone() *dbState {
-	other := &dbState{
-		s.dbSize,
+func (s *storeState) clone() *storeState {
+	other := &storeState{
+		s.size,
 		s.rnd,
 		make(map[string][]byte),
 	}
@@ -67,10 +66,10 @@
 	return other
 }
 
-// nextKey returns minimal key in the database that is not less than the
-// provided key. In case of no such key, returns dbSize.
-func (s *dbState) lowerBound(key int) int {
-	for key < s.dbSize {
+// nextKey returns the smallest key in the store that is not less than the
+// provided key. If there is no such key, returns size.
+func (s *storeState) lowerBound(key int) int {
+	for key < s.size {
 		if _, ok := s.memtable[fmt.Sprintf("%05d", key)]; ok {
 			return key
 		}
@@ -79,13 +78,13 @@
 	return key
 }
 
-// verify ensures that various read operation on store.Store and memtable
-// return the same result.
-func (s *dbState) verify(t *testing.T, st store.StoreReader) {
+// verify checks that various read operations on store.Store and memtable return
+// the same results.
+func (s *storeState) verify(t *testing.T, st store.StoreReader) {
 	var key, value []byte
 	var err error
 	// Verify Get().
-	for i := 0; i < s.dbSize; i++ {
+	for i := 0; i < s.size; i++ {
 		keystr := fmt.Sprintf("%05d", i)
 		answer, ok := s.memtable[keystr]
 		key = []byte(keystr)
@@ -102,7 +101,7 @@
 	}
 	// Verify 10 random Scan() calls.
 	for i := 0; i < 10; i++ {
-		start, end := s.rnd.Intn(s.dbSize), s.rnd.Intn(s.dbSize)
+		start, end := s.rnd.Intn(s.size), s.rnd.Intn(s.size)
 		if start > end {
 			start, end = end, start
 		}
@@ -130,15 +129,15 @@
 }
 
 // runReadWriteTest verifies read/write/snapshot operations.
-func runReadWriteTest(t *testing.T, st store.Store, dbSize int, steps []testStep) {
-	s := newDBState(dbSize)
-	// We verify database state not more than ~100 times to prevent the test
-	// from being slow.
+func runReadWriteTest(t *testing.T, st store.Store, size int, steps []testStep) {
+	s := newStoreState(size)
+	// We verify database state no more than ~100 times to prevent the test from
+	// being slow.
 	frequency := (len(steps) + 99) / 100
-	var states []*dbState
+	var states []*storeState
 	var snapshots []store.Snapshot
 	for i, step := range steps {
-		if step.key < 0 || step.key >= s.dbSize {
+		if step.key < 0 || step.key >= s.size {
 			t.Fatalf("invalid test step %v", step)
 		}
 		switch step.op {
@@ -186,21 +185,22 @@
 func RunReadWriteRandomTest(t *testing.T, st store.Store) {
 	rnd := rand.New(rand.NewSource(239017))
 	var testcase []testStep
-	dbSize := 50
+	size := 50
 	for i := 0; i < 10000; i++ {
-		testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(dbSize)})
+		testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
 	}
-	runReadWriteTest(t, st, dbSize, testcase)
+	runReadWriteTest(t, st, size, testcase)
 }
 
 // RunTransactionsWithGetTest tests transactions that use Put and Get
 // operations.
 // NOTE: consider setting GOMAXPROCS to something greater than 1.
 func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
-	// Invariant: value mapped to n stores sum of values of 0..n-1.
-	// Each of k transactions takes m distinct random values from 0..n-1,
-	// adds 1 to each and m to value mapped to n.
-	// The correctness of sums is checked after all transactions are committed.
+	// Invariant: value mapped to n is sum of values of 0..n-1.
+	// Each of k transactions takes m distinct random values from 0..n-1, adds 1
+	// to each and m to value mapped to n.
+	// The correctness of sums is checked after all transactions have been
+	// committed.
 	n, m, k := 10, 3, 100
 	for i := 0; i <= n; i++ {
 		if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
@@ -209,6 +209,8 @@
 	}
 	var wg sync.WaitGroup
 	wg.Add(k)
+	// TODO(sadovsky): This configuration creates huge resource contention.
+	// Perhaps we should add some random sleep's to reduce the contention.
 	for i := 0; i < k; i++ {
 		go func() {
 			rnd := rand.New(rand.NewSource(239017 * int64(i)))
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
new file mode 100644
index 0000000..d36b260
--- /dev/null
+++ b/services/syncbase/store/util.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package store
+
+// TODO(sadovsky): Add retry loop.
+func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
+	tx := st.NewTransaction()
+	if err := fn(tx); err != nil {
+		tx.Abort()
+		return err
+	}
+	if err := tx.Commit(); err != nil {
+		// TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
+		// failure or ErrConcurrentTransaction. Depending on the cause of failure,
+		// it may be desirable to retry the Commit() and/or to call Abort(). For
+		// now, we always abort on a failed commit.
+		tx.Abort()
+		return err
+	}
+	return nil
+}
+
+// CopyBytes copies elements from a source slice into a destination slice.
+// The returned slice may be a sub-slice of dst if dst was large enough to hold
+// src. Otherwise, a newly allocated slice will be returned.
+func CopyBytes(dst, src []byte) []byte {
+	if cap(dst) < len(src) {
+		newlen := cap(dst)*2 + 2
+		if newlen < len(src) {
+			newlen = len(src)
+		}
+		dst = make([]byte, newlen)
+	}
+	copy(dst, src)
+	return dst[:len(src)]
+}