store: refactor transactions

This change extracts transaction-specific logic from the leveldb
store and reuses it for memstore.

The change introduces another store API: store without transactions.
Now leveldb and memstore implement that new interface and a new
struct, transactions.manager, is used to wrap leveldb/memstore
with transcation functionality.

The main change - move some code from the leveldb/db.go to
transactions/manager.go. Other changes are mostly moving code around.

Change-Id: Ief56bb262ac660fbf0ea75f2651da90758906b17
diff --git a/x/ref/services/syncbase/store/leveldb/db.go b/x/ref/services/syncbase/store/leveldb/db.go
index 028b1ad..0430561 100644
--- a/x/ref/services/syncbase/store/leveldb/db.go
+++ b/x/ref/services/syncbase/store/leveldb/db.go
@@ -11,16 +11,17 @@
 // #include "syncbase_leveldb.h"
 import "C"
 import (
-	"container/list"
 	"fmt"
 	"sync"
 	"unsafe"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/syncbase/x/ref/services/syncbase/store/transactions"
 	"v.io/v23/verror"
 )
 
-// db is a wrapper around LevelDB that implements the store.Store interface.
+// db is a wrapper around LevelDB that implements the transactions.BatchStore
+// interface.
 type db struct {
 	// mu protects the state of the db.
 	mu   sync.RWMutex
@@ -30,21 +31,8 @@
 	readOptions  *C.leveldb_readoptions_t
 	writeOptions *C.leveldb_writeoptions_t
 	err          error
-
-	// txmu protects the transaction-related variables below, and is also held
-	// during transaction commits. It must always be acquired before mu.
-	txmu sync.Mutex
-	// txEvents is a queue of create/commit transaction events.
-	txEvents         *list.List
-	txSequenceNumber uint64
-	// txTable is a set of keys written by recent transactions. This set
-	// includes all write sets of transactions committed after the oldest living
-	// (in-flight) transaction.
-	txTable *trie
 }
 
-var _ store.Store = (*db)(nil)
-
 type OpenOptions struct {
 	CreateIfMissing bool
 	ErrorIfExists   bool
@@ -76,14 +64,12 @@
 	}
 	readOptions := C.leveldb_readoptions_create()
 	C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
-	return &db{
+	return transactions.Wrap(&db{
 		node:         store.NewResourceNode(),
 		cDb:          cDb,
 		readOptions:  readOptions,
 		writeOptions: C.leveldb_writeoptions_create(),
-		txEvents:     list.New(),
-		txTable:      newTrie(),
-	}, nil
+	}), nil
 }
 
 // Close implements the store.Store interface.
@@ -91,7 +77,7 @@
 	d.mu.Lock()
 	defer d.mu.Unlock()
 	if d.err != nil {
-		return convertError(d.err)
+		return store.ConvertError(d.err)
 	}
 	d.node.Close()
 	C.leveldb_close(d.cDb)
@@ -130,37 +116,6 @@
 	return newStream(d, d.node, start, limit, d.readOptions)
 }
 
-// Put implements the store.StoreWriter interface.
-func (d *db) Put(key, value []byte) error {
-	write := store.WriteOp{
-		T:     store.PutOp,
-		Key:   key,
-		Value: value,
-	}
-	return d.write([]store.WriteOp{write}, d.writeOptions)
-}
-
-// Delete implements the store.StoreWriter interface.
-func (d *db) Delete(key []byte) error {
-	write := store.WriteOp{
-		T:   store.DeleteOp,
-		Key: key,
-	}
-	return d.write([]store.WriteOp{write}, d.writeOptions)
-}
-
-// NewTransaction implements the store.Store interface.
-func (d *db) NewTransaction() store.Transaction {
-	d.txmu.Lock()
-	defer d.txmu.Unlock()
-	d.mu.RLock()
-	defer d.mu.RUnlock()
-	if d.err != nil {
-		return &store.InvalidTransaction{Error: d.err}
-	}
-	return newTransaction(d, d.node)
-}
-
 // NewSnapshot implements the store.Store interface.
 func (d *db) NewSnapshot() store.Snapshot {
 	d.mu.RLock()
@@ -171,16 +126,8 @@
 	return newSnapshot(d, d.node)
 }
 
-// write writes a batch and adds all written keys to txTable.
-// TODO(rogulenko): remove this method.
-func (d *db) write(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error {
-	d.txmu.Lock()
-	defer d.txmu.Unlock()
-	return d.writeLocked(batch, cOpts)
-}
-
-// writeLocked is like write(), but it assumes txmu is held.
-func (d *db) writeLocked(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error {
+// WriteBatch implements the transactions.BatchStore interface.
+func (d *db) WriteBatch(batch ...transactions.WriteOp) error {
 	d.mu.Lock()
 	defer d.mu.Unlock()
 	if d.err != nil {
@@ -190,11 +137,11 @@
 	defer C.leveldb_writebatch_destroy(cBatch)
 	for _, write := range batch {
 		switch write.T {
-		case store.PutOp:
+		case transactions.PutOp:
 			cKey, cKeyLen := cSlice(write.Key)
 			cVal, cValLen := cSlice(write.Value)
 			C.leveldb_writebatch_put(cBatch, cKey, cKeyLen, cVal, cValLen)
-		case store.DeleteOp:
+		case transactions.DeleteOp:
 			cKey, cKeyLen := cSlice(write.Key)
 			C.leveldb_writebatch_delete(cBatch, cKey, cKeyLen)
 		default:
@@ -202,32 +149,8 @@
 		}
 	}
 	var cError *C.char
-	C.leveldb_write(d.cDb, cOpts, cBatch, &cError)
-	if err := goError(cError); err != nil {
-		return err
-	}
-	if d.txEvents.Len() == 0 {
-		return nil
-	}
-	d.trackBatch(batch)
-	return nil
-}
-
-// trackBatch writes the batch to txTable and adds a commit event to txEvents.
-func (d *db) trackBatch(batch []store.WriteOp) {
-	// TODO(rogulenko): do GC.
-	d.txSequenceNumber++
-	seq := d.txSequenceNumber
-	var keys [][]byte
-	for _, write := range batch {
-		d.txTable.add(write.Key, seq)
-		keys = append(keys, write.Key)
-	}
-	tx := &commitedTransaction{
-		seq:   seq,
-		batch: keys,
-	}
-	d.txEvents.PushBack(tx)
+	C.leveldb_write(d.cDb, d.writeOptions, cBatch, &cError)
+	return goError(cError)
 }
 
 // getWithOpts returns the value for the given key.
@@ -236,7 +159,7 @@
 	d.mu.RLock()
 	defer d.mu.RUnlock()
 	if d.err != nil {
-		return valbuf, convertError(d.err)
+		return valbuf, store.ConvertError(d.err)
 	}
 	var cError *C.char
 	var valLen C.size_t
diff --git a/x/ref/services/syncbase/store/leveldb/snapshot.go b/x/ref/services/syncbase/store/leveldb/snapshot.go
index f845fa6..a403127 100644
--- a/x/ref/services/syncbase/store/leveldb/snapshot.go
+++ b/x/ref/services/syncbase/store/leveldb/snapshot.go
@@ -50,7 +50,7 @@
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.err != nil {
-		return convertError(s.err)
+		return store.ConvertError(s.err)
 	}
 	s.node.Close()
 	C.leveldb_readoptions_destroy(s.cOpts)
@@ -66,7 +66,7 @@
 	s.mu.RLock()
 	defer s.mu.RUnlock()
 	if s.err != nil {
-		return valbuf, convertError(s.err)
+		return valbuf, store.ConvertError(s.err)
 	}
 	return s.d.getWithOpts(key, valbuf, s.cOpts)
 }
diff --git a/x/ref/services/syncbase/store/leveldb/stream.go b/x/ref/services/syncbase/store/leveldb/stream.go
index 3102d74..2d592b4 100644
--- a/x/ref/services/syncbase/store/leveldb/stream.go
+++ b/x/ref/services/syncbase/store/leveldb/stream.go
@@ -113,7 +113,7 @@
 func (s *stream) Err() error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	return convertError(s.err)
+	return store.ConvertError(s.err)
 }
 
 // Cancel implements the store.Stream interface.
diff --git a/x/ref/services/syncbase/store/leveldb/util.go b/x/ref/services/syncbase/store/leveldb/util.go
index 7d92527..dce69bb 100644
--- a/x/ref/services/syncbase/store/leveldb/util.go
+++ b/x/ref/services/syncbase/store/leveldb/util.go
@@ -44,7 +44,3 @@
 	})
 	return *(*[]byte)(ptr)
 }
-
-func convertError(err error) error {
-	return verror.Convert(verror.IDAction{}, nil, err)
-}
diff --git a/x/ref/services/syncbase/store/memstore/snapshot.go b/x/ref/services/syncbase/store/memstore/snapshot.go
index 7d0fff1..310f6e2 100644
--- a/x/ref/services/syncbase/store/memstore/snapshot.go
+++ b/x/ref/services/syncbase/store/memstore/snapshot.go
@@ -42,7 +42,7 @@
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.err != nil {
-		return convertError(s.err)
+		return store.ConvertError(s.err)
 	}
 	s.node.Close()
 	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedSnapshot)
@@ -54,7 +54,7 @@
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.err != nil {
-		return valbuf, convertError(s.err)
+		return valbuf, store.ConvertError(s.err)
 	}
 	value, ok := s.data[string(key)]
 	if !ok {
diff --git a/x/ref/services/syncbase/store/memstore/store.go b/x/ref/services/syncbase/store/memstore/store.go
index e246fc4..15a2988 100644
--- a/x/ref/services/syncbase/store/memstore/store.go
+++ b/x/ref/services/syncbase/store/memstore/store.go
@@ -7,9 +7,11 @@
 package memstore
 
 import (
+	"fmt"
 	"sync"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/syncbase/x/ref/services/syncbase/store/transactions"
 	"v.io/v23/verror"
 )
 
@@ -18,20 +20,14 @@
 	node *store.ResourceNode
 	data map[string][]byte
 	err  error
-	// Most recent sequence number handed out.
-	lastSeq uint64
-	// Value of lastSeq at the time of the most recent commit.
-	lastCommitSeq uint64
 }
 
-var _ store.Store = (*memstore)(nil)
-
 // New creates a new memstore.
 func New() store.Store {
-	return &memstore{
+	return transactions.Wrap(&memstore{
 		data: map[string][]byte{},
 		node: store.NewResourceNode(),
-	}
+	})
 }
 
 // Close implements the store.Store interface.
@@ -39,7 +35,7 @@
 	st.mu.Lock()
 	defer st.mu.Unlock()
 	if st.err != nil {
-		return convertError(st.err)
+		return store.ConvertError(st.err)
 	}
 	st.node.Close()
 	st.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
@@ -51,7 +47,7 @@
 	st.mu.Lock()
 	defer st.mu.Unlock()
 	if st.err != nil {
-		return valbuf, convertError(st.err)
+		return valbuf, store.ConvertError(st.err)
 	}
 	value, ok := st.data[string(key)]
 	if !ok {
@@ -71,31 +67,6 @@
 	return newSnapshot(st, st.node).Scan(start, limit)
 }
 
-// Put implements the store.StoreWriter interface.
-func (st *memstore) Put(key, value []byte) error {
-	return store.RunInTransaction(st, func(tx store.Transaction) error {
-		return tx.Put(key, value)
-	})
-}
-
-// Delete implements the store.StoreWriter interface.
-func (st *memstore) Delete(key []byte) error {
-	return store.RunInTransaction(st, func(tx store.Transaction) error {
-		return tx.Delete(key)
-	})
-}
-
-// NewTransaction implements the store.Store interface.
-func (st *memstore) NewTransaction() store.Transaction {
-	st.mu.Lock()
-	defer st.mu.Unlock()
-	if st.err != nil {
-		return &store.InvalidTransaction{Error: st.err}
-	}
-	st.lastSeq++
-	return newTransaction(st, st.node, st.lastSeq)
-}
-
 // NewSnapshot implements the store.Store interface.
 func (st *memstore) NewSnapshot() store.Snapshot {
 	st.mu.Lock()
@@ -105,3 +76,23 @@
 	}
 	return newSnapshot(st, st.node)
 }
+
+// WriteBatch implements the transactions.BatchStore interface.
+func (st *memstore) WriteBatch(batch ...transactions.WriteOp) error {
+	st.mu.Lock()
+	defer st.mu.Unlock()
+	if st.err != nil {
+		return store.ConvertError(st.err)
+	}
+	for _, write := range batch {
+		switch write.T {
+		case transactions.PutOp:
+			st.data[string(write.Key)] = write.Value
+		case transactions.DeleteOp:
+			delete(st.data, string(write.Key))
+		default:
+			panic(fmt.Sprintf("unknown write operation type: %v", write.T))
+		}
+	}
+	return nil
+}
diff --git a/x/ref/services/syncbase/store/memstore/store_test.go b/x/ref/services/syncbase/store/memstore/store_test.go
index fef07b1..0b04032 100644
--- a/x/ref/services/syncbase/store/memstore/store_test.go
+++ b/x/ref/services/syncbase/store/memstore/store_test.go
@@ -40,6 +40,10 @@
 	runTest(t, test.RunReadWriteRandomTest)
 }
 
+func TestConcurrentTransactions(t *testing.T) {
+	runTest(t, test.RunConcurrentTransactionsTest)
+}
+
 func TestTransactionState(t *testing.T) {
 	runTest(t, test.RunTransactionStateTest)
 }
diff --git a/x/ref/services/syncbase/store/memstore/stream.go b/x/ref/services/syncbase/store/memstore/stream.go
index a8780be..345ea93 100644
--- a/x/ref/services/syncbase/store/memstore/stream.go
+++ b/x/ref/services/syncbase/store/memstore/stream.go
@@ -87,7 +87,7 @@
 func (s *stream) Err() error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	return convertError(s.err)
+	return store.ConvertError(s.err)
 }
 
 // Cancel implements the store.Stream interface.
diff --git a/x/ref/services/syncbase/store/memstore/transaction.go b/x/ref/services/syncbase/store/memstore/transaction.go
deleted file mode 100644
index 5095a5f..0000000
--- a/x/ref/services/syncbase/store/memstore/transaction.go
+++ /dev/null
@@ -1,168 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package memstore
-
-import (
-	"sync"
-	"time"
-
-	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/v23/verror"
-)
-
-const (
-	txnTimeout = time.Duration(5) * time.Second
-)
-
-type transaction struct {
-	mu   sync.Mutex
-	node *store.ResourceNode
-	st   *memstore
-	sn   *snapshot
-	// The following fields are used to determine whether method calls should
-	// error out.
-	err         error
-	seq         uint64
-	createdTime time.Time
-	// The following fields track writes performed against this transaction.
-	puts    map[string][]byte
-	deletes map[string]struct{}
-}
-
-var _ store.Transaction = (*transaction)(nil)
-
-func newTransaction(st *memstore, parent *store.ResourceNode, seq uint64) *transaction {
-	node := store.NewResourceNode()
-	tx := &transaction{
-		node:        node,
-		st:          st,
-		sn:          newSnapshot(st, node),
-		seq:         seq,
-		createdTime: time.Now(),
-		puts:        map[string][]byte{},
-		deletes:     map[string]struct{}{},
-	}
-	parent.AddChild(tx.node, func() {
-		tx.Abort()
-	})
-	return tx
-}
-
-func (tx *transaction) expired() bool {
-	return time.Now().After(tx.createdTime.Add(txnTimeout))
-}
-
-func (tx *transaction) error() error {
-	if tx.err != nil {
-		return convertError(tx.err)
-	}
-	if tx.expired() {
-		return verror.New(verror.ErrBadState, nil, store.ErrMsgExpiredTxn)
-	}
-	return nil
-}
-
-// Get implements the store.StoreReader interface.
-func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	if err := tx.error(); err != nil {
-		return valbuf, err
-	}
-
-	// Reflect the state of the transaction: the "puts" and "deletes"
-	// override the values in the transaction snapshot.
-	keyStr := string(key)
-	if val, ok := tx.puts[keyStr]; ok {
-		return val, nil
-	}
-	if _, ok := tx.deletes[keyStr]; ok {
-		return valbuf, verror.New(store.ErrUnknownKey, nil, keyStr)
-	}
-
-	return tx.sn.Get(key, valbuf)
-}
-
-// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, limit []byte) store.Stream {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	if err := tx.error(); err != nil {
-		return &store.InvalidStream{Error: err}
-	}
-
-	// Create an array of store.WriteOps as it is needed to merge
-	// the snaphot stream with the uncommitted changes.
-	var writes []store.WriteOp
-	for k, v := range tx.puts {
-		writes = append(writes, store.WriteOp{T: store.PutOp, Key: []byte(k), Value: v})
-	}
-	for k, _ := range tx.deletes {
-		writes = append(writes, store.WriteOp{T: store.DeleteOp, Key: []byte(k), Value: []byte{}})
-	}
-
-	// Return a stream which merges the snaphot stream with the uncommitted changes.
-	return store.MergeWritesWithStream(tx.sn, writes, start, limit)
-}
-
-// Put implements the store.StoreWriter interface.
-func (tx *transaction) Put(key, value []byte) error {
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	if err := tx.error(); err != nil {
-		return err
-	}
-	delete(tx.deletes, string(key))
-	tx.puts[string(key)] = value
-	return nil
-}
-
-// Delete implements the store.StoreWriter interface.
-func (tx *transaction) Delete(key []byte) error {
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	if err := tx.error(); err != nil {
-		return err
-	}
-	delete(tx.puts, string(key))
-	tx.deletes[string(key)] = struct{}{}
-	return nil
-}
-
-// Commit implements the store.Transaction interface.
-func (tx *transaction) Commit() error {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	if err := tx.error(); err != nil {
-		return err
-	}
-	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
-	tx.node.Close()
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock()
-	if tx.seq <= tx.st.lastCommitSeq {
-		return store.NewErrConcurrentTransaction(nil)
-	}
-	for k, v := range tx.puts {
-		tx.st.data[k] = v
-	}
-	for k := range tx.deletes {
-		delete(tx.st.data, k)
-	}
-	tx.st.lastCommitSeq = tx.st.lastSeq
-	return nil
-}
-
-// Abort implements the store.Transaction interface.
-func (tx *transaction) Abort() error {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	if err := tx.error(); err != nil {
-		return err
-	}
-	tx.node.Close()
-	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
-	return nil
-}
diff --git a/x/ref/services/syncbase/store/memstore/util.go b/x/ref/services/syncbase/store/memstore/util.go
deleted file mode 100644
index 8dc3a7a..0000000
--- a/x/ref/services/syncbase/store/memstore/util.go
+++ /dev/null
@@ -1,13 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package memstore
-
-import (
-	"v.io/v23/verror"
-)
-
-func convertError(err error) error {
-	return verror.Convert(verror.IDAction{}, nil, err)
-}
diff --git a/x/ref/services/syncbase/store/transactions/manager.go b/x/ref/services/syncbase/store/transactions/manager.go
new file mode 100644
index 0000000..254812f
--- /dev/null
+++ b/x/ref/services/syncbase/store/transactions/manager.go
@@ -0,0 +1,194 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package transactions
+
+import (
+	"container/list"
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
+)
+
+// BatchStore is a CRUD-capable storage engine that supports atomic batch
+// writes. BatchStore doesn't support transactions.
+// This interface is a Go version of the C++ LevelDB interface. It serves as
+// an intermediate interface between store.Store and the LevelDB interface.
+type BatchStore interface {
+	store.StoreReader
+
+	// WriteBatch atomically writes a list of write operations to the database.
+	WriteBatch(batch ...WriteOp) error
+
+	// Close closes the store.
+	Close() error
+
+	// NewSnapshot creates a snapshot.
+	NewSnapshot() store.Snapshot
+}
+
+// manager handles transaction-related operations of the store.
+type manager struct {
+	BatchStore
+	// mu protects the variables below, and is also held during transaction
+	// commits. It must always be acquired before the store-level lock.
+	mu sync.Mutex
+	// events is a queue of create/commit transaction events.
+	events *list.List
+	seq    uint64
+	// txTable is a set of keys written by recent transactions. This set
+	// includes all write sets of transactions committed after the oldest living
+	// (in-flight) transaction.
+	txTable *trie
+}
+
+// commitedTransaction is only used as an element of manager.events.
+type commitedTransaction struct {
+	seq   uint64
+	batch [][]byte
+}
+
+// Wrap wraps the BatchStore with transaction functionality.
+func Wrap(bs BatchStore) store.Store {
+	return &manager{
+		BatchStore: bs,
+		events:     list.New(),
+		txTable:    newTrie(),
+	}
+}
+
+// Close implements the store.Store interface.
+func (mg *manager) Close() error {
+	mg.mu.Lock()
+	defer mg.mu.Unlock()
+	if mg.txTable == nil {
+		return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
+	}
+	mg.BatchStore.Close()
+	for event := mg.events.Front(); event != nil; event = event.Next() {
+		if tx, ok := event.Value.(*transaction); ok {
+			// tx.Abort() internally removes tx from the mg.events list under
+			// the mg.mu lock. To brake the cyclic dependency, we set tx.event
+			// to nil.
+			tx.mu.Lock()
+			tx.event = nil
+			tx.mu.Unlock()
+			tx.Abort()
+		}
+	}
+	mg.events = nil
+	mg.txTable = nil
+	return nil
+}
+
+// NewTransaction implements the store.Store interface.
+func (mg *manager) NewTransaction() store.Transaction {
+	mg.mu.Lock()
+	defer mg.mu.Unlock()
+	if mg.txTable == nil {
+		return &store.InvalidTransaction{
+			Error: verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore),
+		}
+	}
+	return newTransaction(mg)
+}
+
+// Put implements the store.StoreWriter interface.
+func (mg *manager) Put(key, value []byte) error {
+	mg.mu.Lock()
+	defer mg.mu.Unlock()
+	if mg.txTable == nil {
+		return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
+	}
+	write := WriteOp{
+		T:     PutOp,
+		Key:   key,
+		Value: value,
+	}
+	if err := mg.BatchStore.WriteBatch(write); err != nil {
+		return err
+	}
+	mg.trackBatch(write)
+	return nil
+}
+
+// Delete implements the store.StoreWriter interface.
+func (mg *manager) Delete(key []byte) error {
+	mg.mu.Lock()
+	defer mg.mu.Unlock()
+	if mg.txTable == nil {
+		return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
+	}
+	write := WriteOp{
+		T:   DeleteOp,
+		Key: key,
+	}
+	if err := mg.BatchStore.WriteBatch(write); err != nil {
+		return err
+	}
+	mg.trackBatch(write)
+	return nil
+}
+
+// trackBatch writes the batch to txTable and adds a commit event to
+// the events queue.
+// Assumes mu is held.
+func (mg *manager) trackBatch(batch ...WriteOp) {
+	if mg.events.Len() == 0 {
+		return
+	}
+	// TODO(rogulenko): do GC.
+	mg.seq++
+	var keys [][]byte
+	for _, write := range batch {
+		mg.txTable.add(write.Key, mg.seq)
+		keys = append(keys, write.Key)
+	}
+	tx := &commitedTransaction{
+		seq:   mg.seq,
+		batch: keys,
+	}
+	mg.events.PushBack(tx)
+}
+
+//////////////////////////////////////////////////////////////
+// Read and Write types used for storing transcation reads
+// and uncommitted writes.
+
+type WriteType int
+
+const (
+	PutOp WriteType = iota
+	DeleteOp
+)
+
+type WriteOp struct {
+	T     WriteType
+	Key   []byte
+	Value []byte
+}
+
+type scanRange struct {
+	Start, Limit []byte
+}
+
+type readSet struct {
+	Keys   [][]byte
+	Ranges []scanRange
+}
+
+type writeOpArray []WriteOp
+
+func (a writeOpArray) Len() int {
+	return len(a)
+}
+
+func (a writeOpArray) Less(i, j int) bool {
+	return string(a[i].Key) < string(a[j].Key)
+}
+
+func (a writeOpArray) Swap(i, j int) {
+	a[i], a[j] = a[j], a[i]
+}
diff --git a/x/ref/services/syncbase/store/merged_stream.go b/x/ref/services/syncbase/store/transactions/merged_stream.go
similarity index 89%
rename from x/ref/services/syncbase/store/merged_stream.go
rename to x/ref/services/syncbase/store/transactions/merged_stream.go
index e8c1058..4ab10be 100644
--- a/x/ref/services/syncbase/store/merged_stream.go
+++ b/x/ref/services/syncbase/store/transactions/merged_stream.go
@@ -2,10 +2,12 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package store
+package transactions
 
 import (
 	"sort"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
 //////////////////////////////////////////////////////////////
@@ -19,7 +21,7 @@
 // changes to be passed in as an array of WriteOp.
 
 // Create a new stream which merges a snapshot stream with an array of write operations.
-func MergeWritesWithStream(sn Snapshot, w []WriteOp, start, limit []byte) Stream {
+func mergeWritesWithStream(sn store.Snapshot, w []WriteOp, start, limit []byte) store.Stream {
 	// Collect writes with the range specified, then sort them.
 	// Note: Writes could contain more than one write for a given key.
 	//       The last write is the current state.
@@ -29,7 +31,7 @@
 			writesMap[string(write.Key)] = write
 		}
 	}
-	var writesArray WriteOpArray
+	var writesArray writeOpArray
 	for _, writeOp := range writesMap {
 		writesArray = append(writesArray, writeOp)
 	}
@@ -45,7 +47,7 @@
 }
 
 type mergedStream struct {
-	snapshotStream      Stream
+	snapshotStream      store.Stream
 	writesArray         []WriteOp
 	writesCursor        int
 	unusedSnapshotValue bool
@@ -94,8 +96,8 @@
 		s.writesCursor++
 		return false
 	}
-	s.key = CopyBytes(s.key, s.writesArray[s.writesCursor].Key)
-	s.value = CopyBytes(s.value, s.writesArray[s.writesCursor].Value)
+	s.key = store.CopyBytes(s.key, s.writesArray[s.writesCursor].Key)
+	s.value = store.CopyBytes(s.value, s.writesArray[s.writesCursor].Value)
 	s.writesCursor++
 	return true
 }
@@ -122,7 +124,7 @@
 	if !s.hasValue {
 		panic("nothing staged")
 	}
-	return CopyBytes(keybuf, s.key)
+	return store.CopyBytes(keybuf, s.key)
 }
 
 // Value implements the Stream interface.
@@ -130,7 +132,7 @@
 	if !s.hasValue {
 		panic("nothing staged")
 	}
-	return CopyBytes(valbuf, s.value)
+	return store.CopyBytes(valbuf, s.value)
 }
 
 // Err implements the Stream interface.
diff --git a/x/ref/services/syncbase/store/leveldb/transaction.go b/x/ref/services/syncbase/store/transactions/transaction.go
similarity index 67%
rename from x/ref/services/syncbase/store/leveldb/transaction.go
rename to x/ref/services/syncbase/store/transactions/transaction.go
index b8093f6..dcf7569 100644
--- a/x/ref/services/syncbase/store/leveldb/transaction.go
+++ b/x/ref/services/syncbase/store/transactions/transaction.go
@@ -2,10 +2,8 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package leveldb
+package transactions
 
-// #include "leveldb/c.h"
-import "C"
 import (
 	"bytes"
 	"container/list"
@@ -16,59 +14,41 @@
 	"v.io/x/lib/vlog"
 )
 
-// commitedTransaction is only used as an element of db.txEvents.
-type commitedTransaction struct {
-	seq   uint64
-	batch [][]byte
-}
-
-// transaction is a wrapper around LevelDB WriteBatch that implements
-// the store.Transaction interface.
+// transaction is a wrapper on top of a BatchWriter and a store.Snapshot that
+// implements the store.Transaction interface.
 type transaction struct {
 	// mu protects the state of the transaction.
 	mu       sync.Mutex
-	node     *store.ResourceNode
-	d        *db
+	mg       *manager
 	seq      uint64
 	event    *list.Element // pointer to element of db.txEvents
 	snapshot store.Snapshot
-	reads    store.ReadSet
-	writes   []store.WriteOp
-	cOpts    *C.leveldb_writeoptions_t
+	reads    readSet
+	writes   []WriteOp
 	err      error
 }
 
 var _ store.Transaction = (*transaction)(nil)
 
-func newTransaction(d *db, parent *store.ResourceNode) *transaction {
-	node := store.NewResourceNode()
-	snapshot := newSnapshot(d, node)
+func newTransaction(mg *manager) *transaction {
 	tx := &transaction{
-		node:     node,
-		d:        d,
-		snapshot: snapshot,
-		seq:      d.txSequenceNumber,
-		cOpts:    d.writeOptions,
+		mg:       mg,
+		snapshot: mg.BatchStore.NewSnapshot(),
+		seq:      mg.seq,
 	}
-	tx.event = d.txEvents.PushFront(tx)
-	parent.AddChild(tx.node, func() {
-		tx.Abort()
-	})
+	tx.event = mg.events.PushFront(tx)
 	return tx
 }
 
-// close frees allocated C objects and releases acquired locks.
+// close removes this transaction from the mg.events queue and aborts
+// the underlying snapshot.
 // Assumes mu is held.
 func (tx *transaction) close() {
 	tx.removeEvent()
-	tx.node.Close()
-	if tx.cOpts != tx.d.writeOptions {
-		C.leveldb_writeoptions_destroy(tx.cOpts)
-	}
-	tx.cOpts = nil
+	tx.snapshot.Abort()
 }
 
-// removeEvent removes this transaction from the db.txEvents queue.
+// removeEvent removes this transaction from the mg.events queue.
 // Assumes mu is held.
 func (tx *transaction) removeEvent() {
 	// This can happen if the transaction was committed, since Commit()
@@ -76,9 +56,9 @@
 	if tx.event == nil {
 		return
 	}
-	tx.d.txmu.Lock()
-	tx.d.txEvents.Remove(tx.event)
-	tx.d.txmu.Unlock()
+	tx.mg.mu.Lock()
+	tx.mg.events.Remove(tx.event)
+	tx.mg.mu.Unlock()
 	tx.event = nil
 }
 
@@ -87,7 +67,7 @@
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if tx.err != nil {
-		return valbuf, convertError(tx.err)
+		return valbuf, store.ConvertError(tx.err)
 	}
 	tx.reads.Keys = append(tx.reads.Keys, key)
 
@@ -99,7 +79,7 @@
 	for i := len(tx.writes) - 1; i >= 0; i-- {
 		op := &tx.writes[i]
 		if bytes.Equal(op.Key, key) {
-			if op.T == store.PutOp {
+			if op.T == PutOp {
 				return op.Value, nil
 			}
 			return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
@@ -117,13 +97,13 @@
 		return &store.InvalidStream{Error: tx.err}
 	}
 
-	tx.reads.Ranges = append(tx.reads.Ranges, store.ScanRange{
+	tx.reads.Ranges = append(tx.reads.Ranges, scanRange{
 		Start: start,
 		Limit: limit,
 	})
 
 	// Return a stream which merges the snaphot stream with the uncommitted changes.
-	return store.MergeWritesWithStream(tx.snapshot, tx.writes, start, limit)
+	return mergeWritesWithStream(tx.snapshot, tx.writes, start, limit)
 }
 
 // Put implements the store.StoreWriter interface.
@@ -131,10 +111,10 @@
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if tx.err != nil {
-		return convertError(tx.err)
+		return store.ConvertError(tx.err)
 	}
-	tx.writes = append(tx.writes, store.WriteOp{
-		T:     store.PutOp,
+	tx.writes = append(tx.writes, WriteOp{
+		T:     PutOp,
 		Key:   key,
 		Value: value,
 	})
@@ -146,10 +126,10 @@
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if tx.err != nil {
-		return convertError(tx.err)
+		return store.ConvertError(tx.err)
 	}
-	tx.writes = append(tx.writes, store.WriteOp{
-		T:   store.DeleteOp,
+	tx.writes = append(tx.writes, WriteOp{
+		T:   DeleteOp,
 		Key: key,
 	})
 	return nil
@@ -157,16 +137,16 @@
 
 // validateReadSet returns true iff the read set of this transaction has not
 // been invalidated by other transactions.
-// Assumes tx.d.txmu is held.
+// Assumes tx.mg.mu is held.
 func (tx *transaction) validateReadSet() bool {
 	for _, key := range tx.reads.Keys {
-		if tx.d.txTable.get(key) > tx.seq {
+		if tx.mg.txTable.get(key) > tx.seq {
 			vlog.VI(3).Infof("key conflict: %q", key)
 			return false
 		}
 	}
 	for _, r := range tx.reads.Ranges {
-		if tx.d.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
+		if tx.mg.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
 			vlog.VI(3).Infof("range conflict: {%q, %q}", r.Start, r.Limit)
 			return false
 		}
@@ -180,7 +160,7 @@
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if tx.err != nil {
-		return convertError(tx.err)
+		return store.ConvertError(tx.err)
 	}
 	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
 	// Explicitly remove this transaction from the event queue. If this was the
@@ -188,12 +168,16 @@
 	// not add this transaction's write set to txTable.
 	tx.removeEvent()
 	defer tx.close()
-	tx.d.txmu.Lock()
-	defer tx.d.txmu.Unlock()
+	tx.mg.mu.Lock()
+	defer tx.mg.mu.Unlock()
 	if !tx.validateReadSet() {
 		return store.NewErrConcurrentTransaction(nil)
 	}
-	return tx.d.writeLocked(tx.writes, tx.cOpts)
+	if err := tx.mg.BatchStore.WriteBatch(tx.writes...); err != nil {
+		return err
+	}
+	tx.mg.trackBatch(tx.writes...)
+	return nil
 }
 
 // Abort implements the store.Transaction interface.
@@ -201,7 +185,7 @@
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if tx.err != nil {
-		return convertError(tx.err)
+		return store.ConvertError(tx.err)
 	}
 	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
 	tx.close()
diff --git a/x/ref/services/syncbase/store/leveldb/trie.go b/x/ref/services/syncbase/store/transactions/trie.go
similarity index 98%
rename from x/ref/services/syncbase/store/leveldb/trie.go
rename to x/ref/services/syncbase/store/transactions/trie.go
index 78a6982..51a99a3 100644
--- a/x/ref/services/syncbase/store/leveldb/trie.go
+++ b/x/ref/services/syncbase/store/transactions/trie.go
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package leveldb
+package transactions
 
 import (
 	"fmt"
diff --git a/x/ref/services/syncbase/store/util.go b/x/ref/services/syncbase/store/util.go
index 72c4beb..19695ed 100644
--- a/x/ref/services/syncbase/store/util.go
+++ b/x/ref/services/syncbase/store/util.go
@@ -55,42 +55,7 @@
 	return dst
 }
 
-//////////////////////////////////////////////////////////////
-// Read and Write types used for storing transcation reads
-// and uncommitted writes.
-
-type ScanRange struct {
-	Start, Limit []byte
-}
-
-type ReadSet struct {
-	Keys   [][]byte
-	Ranges []ScanRange
-}
-
-type WriteType int
-
-const (
-	PutOp WriteType = iota
-	DeleteOp
-)
-
-type WriteOp struct {
-	T     WriteType
-	Key   []byte
-	Value []byte
-}
-
-type WriteOpArray []WriteOp
-
-func (a WriteOpArray) Len() int {
-	return len(a)
-}
-
-func (a WriteOpArray) Less(i, j int) bool {
-	return string(a[i].Key) < string(a[j].Key)
-}
-
-func (a WriteOpArray) Swap(i, j int) {
-	a[i], a[j] = a[j], a[i]
+// ConvertError returns a copy of the verror, appending the current stack to it.
+func ConvertError(err error) error {
+	return verror.Convert(verror.IDAction{}, nil, err)
 }