store: refactor transactions
This change extracts transaction-specific logic from the leveldb
store and reuses it for memstore.
The change introduces another store API: store without transactions.
Now leveldb and memstore implement that new interface and a new
struct, transactions.manager, is used to wrap leveldb/memstore
with transcation functionality.
The main change - move some code from the leveldb/db.go to
transactions/manager.go. Other changes are mostly moving code around.
Change-Id: Ief56bb262ac660fbf0ea75f2651da90758906b17
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index 028b1ad..0430561 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -11,16 +11,17 @@
// #include "syncbase_leveldb.h"
import "C"
import (
- "container/list"
"fmt"
"sync"
"unsafe"
"v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/syncbase/x/ref/services/syncbase/store/transactions"
"v.io/v23/verror"
)
-// db is a wrapper around LevelDB that implements the store.Store interface.
+// db is a wrapper around LevelDB that implements the transactions.BatchStore
+// interface.
type db struct {
// mu protects the state of the db.
mu sync.RWMutex
@@ -30,21 +31,8 @@
readOptions *C.leveldb_readoptions_t
writeOptions *C.leveldb_writeoptions_t
err error
-
- // txmu protects the transaction-related variables below, and is also held
- // during transaction commits. It must always be acquired before mu.
- txmu sync.Mutex
- // txEvents is a queue of create/commit transaction events.
- txEvents *list.List
- txSequenceNumber uint64
- // txTable is a set of keys written by recent transactions. This set
- // includes all write sets of transactions committed after the oldest living
- // (in-flight) transaction.
- txTable *trie
}
-var _ store.Store = (*db)(nil)
-
type OpenOptions struct {
CreateIfMissing bool
ErrorIfExists bool
@@ -76,14 +64,12 @@
}
readOptions := C.leveldb_readoptions_create()
C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
- return &db{
+ return transactions.Wrap(&db{
node: store.NewResourceNode(),
cDb: cDb,
readOptions: readOptions,
writeOptions: C.leveldb_writeoptions_create(),
- txEvents: list.New(),
- txTable: newTrie(),
- }, nil
+ }), nil
}
// Close implements the store.Store interface.
@@ -91,7 +77,7 @@
d.mu.Lock()
defer d.mu.Unlock()
if d.err != nil {
- return convertError(d.err)
+ return store.ConvertError(d.err)
}
d.node.Close()
C.leveldb_close(d.cDb)
@@ -130,37 +116,6 @@
return newStream(d, d.node, start, limit, d.readOptions)
}
-// Put implements the store.StoreWriter interface.
-func (d *db) Put(key, value []byte) error {
- write := store.WriteOp{
- T: store.PutOp,
- Key: key,
- Value: value,
- }
- return d.write([]store.WriteOp{write}, d.writeOptions)
-}
-
-// Delete implements the store.StoreWriter interface.
-func (d *db) Delete(key []byte) error {
- write := store.WriteOp{
- T: store.DeleteOp,
- Key: key,
- }
- return d.write([]store.WriteOp{write}, d.writeOptions)
-}
-
-// NewTransaction implements the store.Store interface.
-func (d *db) NewTransaction() store.Transaction {
- d.txmu.Lock()
- defer d.txmu.Unlock()
- d.mu.RLock()
- defer d.mu.RUnlock()
- if d.err != nil {
- return &store.InvalidTransaction{Error: d.err}
- }
- return newTransaction(d, d.node)
-}
-
// NewSnapshot implements the store.Store interface.
func (d *db) NewSnapshot() store.Snapshot {
d.mu.RLock()
@@ -171,16 +126,8 @@
return newSnapshot(d, d.node)
}
-// write writes a batch and adds all written keys to txTable.
-// TODO(rogulenko): remove this method.
-func (d *db) write(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error {
- d.txmu.Lock()
- defer d.txmu.Unlock()
- return d.writeLocked(batch, cOpts)
-}
-
-// writeLocked is like write(), but it assumes txmu is held.
-func (d *db) writeLocked(batch []store.WriteOp, cOpts *C.leveldb_writeoptions_t) error {
+// WriteBatch implements the transactions.BatchStore interface.
+func (d *db) WriteBatch(batch ...transactions.WriteOp) error {
d.mu.Lock()
defer d.mu.Unlock()
if d.err != nil {
@@ -190,11 +137,11 @@
defer C.leveldb_writebatch_destroy(cBatch)
for _, write := range batch {
switch write.T {
- case store.PutOp:
+ case transactions.PutOp:
cKey, cKeyLen := cSlice(write.Key)
cVal, cValLen := cSlice(write.Value)
C.leveldb_writebatch_put(cBatch, cKey, cKeyLen, cVal, cValLen)
- case store.DeleteOp:
+ case transactions.DeleteOp:
cKey, cKeyLen := cSlice(write.Key)
C.leveldb_writebatch_delete(cBatch, cKey, cKeyLen)
default:
@@ -202,32 +149,8 @@
}
}
var cError *C.char
- C.leveldb_write(d.cDb, cOpts, cBatch, &cError)
- if err := goError(cError); err != nil {
- return err
- }
- if d.txEvents.Len() == 0 {
- return nil
- }
- d.trackBatch(batch)
- return nil
-}
-
-// trackBatch writes the batch to txTable and adds a commit event to txEvents.
-func (d *db) trackBatch(batch []store.WriteOp) {
- // TODO(rogulenko): do GC.
- d.txSequenceNumber++
- seq := d.txSequenceNumber
- var keys [][]byte
- for _, write := range batch {
- d.txTable.add(write.Key, seq)
- keys = append(keys, write.Key)
- }
- tx := &commitedTransaction{
- seq: seq,
- batch: keys,
- }
- d.txEvents.PushBack(tx)
+ C.leveldb_write(d.cDb, d.writeOptions, cBatch, &cError)
+ return goError(cError)
}
// getWithOpts returns the value for the given key.
@@ -236,7 +159,7 @@
d.mu.RLock()
defer d.mu.RUnlock()
if d.err != nil {
- return valbuf, convertError(d.err)
+ return valbuf, store.ConvertError(d.err)
}
var cError *C.char
var valLen C.size_t
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index f845fa6..a403127 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -50,7 +50,7 @@
s.mu.Lock()
defer s.mu.Unlock()
if s.err != nil {
- return convertError(s.err)
+ return store.ConvertError(s.err)
}
s.node.Close()
C.leveldb_readoptions_destroy(s.cOpts)
@@ -66,7 +66,7 @@
s.mu.RLock()
defer s.mu.RUnlock()
if s.err != nil {
- return valbuf, convertError(s.err)
+ return valbuf, store.ConvertError(s.err)
}
return s.d.getWithOpts(key, valbuf, s.cOpts)
}
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index 3102d74..2d592b4 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -113,7 +113,7 @@
func (s *stream) Err() error {
s.mu.Lock()
defer s.mu.Unlock()
- return convertError(s.err)
+ return store.ConvertError(s.err)
}
// Cancel implements the store.Stream interface.
diff --git a/services/syncbase/store/leveldb/util.go b/services/syncbase/store/leveldb/util.go
index 7d92527..dce69bb 100644
--- a/services/syncbase/store/leveldb/util.go
+++ b/services/syncbase/store/leveldb/util.go
@@ -44,7 +44,3 @@
})
return *(*[]byte)(ptr)
}
-
-func convertError(err error) error {
- return verror.Convert(verror.IDAction{}, nil, err)
-}
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index 7d0fff1..310f6e2 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -42,7 +42,7 @@
s.mu.Lock()
defer s.mu.Unlock()
if s.err != nil {
- return convertError(s.err)
+ return store.ConvertError(s.err)
}
s.node.Close()
s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedSnapshot)
@@ -54,7 +54,7 @@
s.mu.Lock()
defer s.mu.Unlock()
if s.err != nil {
- return valbuf, convertError(s.err)
+ return valbuf, store.ConvertError(s.err)
}
value, ok := s.data[string(key)]
if !ok {
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index e246fc4..15a2988 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -7,9 +7,11 @@
package memstore
import (
+ "fmt"
"sync"
"v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/syncbase/x/ref/services/syncbase/store/transactions"
"v.io/v23/verror"
)
@@ -18,20 +20,14 @@
node *store.ResourceNode
data map[string][]byte
err error
- // Most recent sequence number handed out.
- lastSeq uint64
- // Value of lastSeq at the time of the most recent commit.
- lastCommitSeq uint64
}
-var _ store.Store = (*memstore)(nil)
-
// New creates a new memstore.
func New() store.Store {
- return &memstore{
+ return transactions.Wrap(&memstore{
data: map[string][]byte{},
node: store.NewResourceNode(),
- }
+ })
}
// Close implements the store.Store interface.
@@ -39,7 +35,7 @@
st.mu.Lock()
defer st.mu.Unlock()
if st.err != nil {
- return convertError(st.err)
+ return store.ConvertError(st.err)
}
st.node.Close()
st.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
@@ -51,7 +47,7 @@
st.mu.Lock()
defer st.mu.Unlock()
if st.err != nil {
- return valbuf, convertError(st.err)
+ return valbuf, store.ConvertError(st.err)
}
value, ok := st.data[string(key)]
if !ok {
@@ -71,31 +67,6 @@
return newSnapshot(st, st.node).Scan(start, limit)
}
-// Put implements the store.StoreWriter interface.
-func (st *memstore) Put(key, value []byte) error {
- return store.RunInTransaction(st, func(tx store.Transaction) error {
- return tx.Put(key, value)
- })
-}
-
-// Delete implements the store.StoreWriter interface.
-func (st *memstore) Delete(key []byte) error {
- return store.RunInTransaction(st, func(tx store.Transaction) error {
- return tx.Delete(key)
- })
-}
-
-// NewTransaction implements the store.Store interface.
-func (st *memstore) NewTransaction() store.Transaction {
- st.mu.Lock()
- defer st.mu.Unlock()
- if st.err != nil {
- return &store.InvalidTransaction{Error: st.err}
- }
- st.lastSeq++
- return newTransaction(st, st.node, st.lastSeq)
-}
-
// NewSnapshot implements the store.Store interface.
func (st *memstore) NewSnapshot() store.Snapshot {
st.mu.Lock()
@@ -105,3 +76,23 @@
}
return newSnapshot(st, st.node)
}
+
+// WriteBatch implements the transactions.BatchStore interface.
+func (st *memstore) WriteBatch(batch ...transactions.WriteOp) error {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ if st.err != nil {
+ return store.ConvertError(st.err)
+ }
+ for _, write := range batch {
+ switch write.T {
+ case transactions.PutOp:
+ st.data[string(write.Key)] = write.Value
+ case transactions.DeleteOp:
+ delete(st.data, string(write.Key))
+ default:
+ panic(fmt.Sprintf("unknown write operation type: %v", write.T))
+ }
+ }
+ return nil
+}
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index fef07b1..0b04032 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -40,6 +40,10 @@
runTest(t, test.RunReadWriteRandomTest)
}
+func TestConcurrentTransactions(t *testing.T) {
+ runTest(t, test.RunConcurrentTransactionsTest)
+}
+
func TestTransactionState(t *testing.T) {
runTest(t, test.RunTransactionStateTest)
}
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index a8780be..345ea93 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -87,7 +87,7 @@
func (s *stream) Err() error {
s.mu.Lock()
defer s.mu.Unlock()
- return convertError(s.err)
+ return store.ConvertError(s.err)
}
// Cancel implements the store.Stream interface.
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
deleted file mode 100644
index 5095a5f..0000000
--- a/services/syncbase/store/memstore/transaction.go
+++ /dev/null
@@ -1,168 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package memstore
-
-import (
- "sync"
- "time"
-
- "v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/verror"
-)
-
-const (
- txnTimeout = time.Duration(5) * time.Second
-)
-
-type transaction struct {
- mu sync.Mutex
- node *store.ResourceNode
- st *memstore
- sn *snapshot
- // The following fields are used to determine whether method calls should
- // error out.
- err error
- seq uint64
- createdTime time.Time
- // The following fields track writes performed against this transaction.
- puts map[string][]byte
- deletes map[string]struct{}
-}
-
-var _ store.Transaction = (*transaction)(nil)
-
-func newTransaction(st *memstore, parent *store.ResourceNode, seq uint64) *transaction {
- node := store.NewResourceNode()
- tx := &transaction{
- node: node,
- st: st,
- sn: newSnapshot(st, node),
- seq: seq,
- createdTime: time.Now(),
- puts: map[string][]byte{},
- deletes: map[string]struct{}{},
- }
- parent.AddChild(tx.node, func() {
- tx.Abort()
- })
- return tx
-}
-
-func (tx *transaction) expired() bool {
- return time.Now().After(tx.createdTime.Add(txnTimeout))
-}
-
-func (tx *transaction) error() error {
- if tx.err != nil {
- return convertError(tx.err)
- }
- if tx.expired() {
- return verror.New(verror.ErrBadState, nil, store.ErrMsgExpiredTxn)
- }
- return nil
-}
-
-// Get implements the store.StoreReader interface.
-func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- if err := tx.error(); err != nil {
- return valbuf, err
- }
-
- // Reflect the state of the transaction: the "puts" and "deletes"
- // override the values in the transaction snapshot.
- keyStr := string(key)
- if val, ok := tx.puts[keyStr]; ok {
- return val, nil
- }
- if _, ok := tx.deletes[keyStr]; ok {
- return valbuf, verror.New(store.ErrUnknownKey, nil, keyStr)
- }
-
- return tx.sn.Get(key, valbuf)
-}
-
-// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, limit []byte) store.Stream {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- if err := tx.error(); err != nil {
- return &store.InvalidStream{Error: err}
- }
-
- // Create an array of store.WriteOps as it is needed to merge
- // the snaphot stream with the uncommitted changes.
- var writes []store.WriteOp
- for k, v := range tx.puts {
- writes = append(writes, store.WriteOp{T: store.PutOp, Key: []byte(k), Value: v})
- }
- for k, _ := range tx.deletes {
- writes = append(writes, store.WriteOp{T: store.DeleteOp, Key: []byte(k), Value: []byte{}})
- }
-
- // Return a stream which merges the snaphot stream with the uncommitted changes.
- return store.MergeWritesWithStream(tx.sn, writes, start, limit)
-}
-
-// Put implements the store.StoreWriter interface.
-func (tx *transaction) Put(key, value []byte) error {
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- if err := tx.error(); err != nil {
- return err
- }
- delete(tx.deletes, string(key))
- tx.puts[string(key)] = value
- return nil
-}
-
-// Delete implements the store.StoreWriter interface.
-func (tx *transaction) Delete(key []byte) error {
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- if err := tx.error(); err != nil {
- return err
- }
- delete(tx.puts, string(key))
- tx.deletes[string(key)] = struct{}{}
- return nil
-}
-
-// Commit implements the store.Transaction interface.
-func (tx *transaction) Commit() error {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- if err := tx.error(); err != nil {
- return err
- }
- tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
- tx.node.Close()
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- if tx.seq <= tx.st.lastCommitSeq {
- return store.NewErrConcurrentTransaction(nil)
- }
- for k, v := range tx.puts {
- tx.st.data[k] = v
- }
- for k := range tx.deletes {
- delete(tx.st.data, k)
- }
- tx.st.lastCommitSeq = tx.st.lastSeq
- return nil
-}
-
-// Abort implements the store.Transaction interface.
-func (tx *transaction) Abort() error {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- if err := tx.error(); err != nil {
- return err
- }
- tx.node.Close()
- tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
- return nil
-}
diff --git a/services/syncbase/store/memstore/util.go b/services/syncbase/store/memstore/util.go
deleted file mode 100644
index 8dc3a7a..0000000
--- a/services/syncbase/store/memstore/util.go
+++ /dev/null
@@ -1,13 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package memstore
-
-import (
- "v.io/v23/verror"
-)
-
-func convertError(err error) error {
- return verror.Convert(verror.IDAction{}, nil, err)
-}
diff --git a/services/syncbase/store/transactions/manager.go b/services/syncbase/store/transactions/manager.go
new file mode 100644
index 0000000..254812f
--- /dev/null
+++ b/services/syncbase/store/transactions/manager.go
@@ -0,0 +1,194 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package transactions
+
+import (
+ "container/list"
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
+)
+
+// BatchStore is a CRUD-capable storage engine that supports atomic batch
+// writes. BatchStore doesn't support transactions.
+// This interface is a Go version of the C++ LevelDB interface. It serves as
+// an intermediate interface between store.Store and the LevelDB interface.
+type BatchStore interface {
+ store.StoreReader
+
+ // WriteBatch atomically writes a list of write operations to the database.
+ WriteBatch(batch ...WriteOp) error
+
+ // Close closes the store.
+ Close() error
+
+ // NewSnapshot creates a snapshot.
+ NewSnapshot() store.Snapshot
+}
+
+// manager handles transaction-related operations of the store.
+type manager struct {
+ BatchStore
+ // mu protects the variables below, and is also held during transaction
+ // commits. It must always be acquired before the store-level lock.
+ mu sync.Mutex
+ // events is a queue of create/commit transaction events.
+ events *list.List
+ seq uint64
+ // txTable is a set of keys written by recent transactions. This set
+ // includes all write sets of transactions committed after the oldest living
+ // (in-flight) transaction.
+ txTable *trie
+}
+
+// commitedTransaction is only used as an element of manager.events.
+type commitedTransaction struct {
+ seq uint64
+ batch [][]byte
+}
+
+// Wrap wraps the BatchStore with transaction functionality.
+func Wrap(bs BatchStore) store.Store {
+ return &manager{
+ BatchStore: bs,
+ events: list.New(),
+ txTable: newTrie(),
+ }
+}
+
+// Close implements the store.Store interface.
+func (mg *manager) Close() error {
+ mg.mu.Lock()
+ defer mg.mu.Unlock()
+ if mg.txTable == nil {
+ return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
+ }
+ mg.BatchStore.Close()
+ for event := mg.events.Front(); event != nil; event = event.Next() {
+ if tx, ok := event.Value.(*transaction); ok {
+ // tx.Abort() internally removes tx from the mg.events list under
+ // the mg.mu lock. To brake the cyclic dependency, we set tx.event
+ // to nil.
+ tx.mu.Lock()
+ tx.event = nil
+ tx.mu.Unlock()
+ tx.Abort()
+ }
+ }
+ mg.events = nil
+ mg.txTable = nil
+ return nil
+}
+
+// NewTransaction implements the store.Store interface.
+func (mg *manager) NewTransaction() store.Transaction {
+ mg.mu.Lock()
+ defer mg.mu.Unlock()
+ if mg.txTable == nil {
+ return &store.InvalidTransaction{
+ Error: verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore),
+ }
+ }
+ return newTransaction(mg)
+}
+
+// Put implements the store.StoreWriter interface.
+func (mg *manager) Put(key, value []byte) error {
+ mg.mu.Lock()
+ defer mg.mu.Unlock()
+ if mg.txTable == nil {
+ return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
+ }
+ write := WriteOp{
+ T: PutOp,
+ Key: key,
+ Value: value,
+ }
+ if err := mg.BatchStore.WriteBatch(write); err != nil {
+ return err
+ }
+ mg.trackBatch(write)
+ return nil
+}
+
+// Delete implements the store.StoreWriter interface.
+func (mg *manager) Delete(key []byte) error {
+ mg.mu.Lock()
+ defer mg.mu.Unlock()
+ if mg.txTable == nil {
+ return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
+ }
+ write := WriteOp{
+ T: DeleteOp,
+ Key: key,
+ }
+ if err := mg.BatchStore.WriteBatch(write); err != nil {
+ return err
+ }
+ mg.trackBatch(write)
+ return nil
+}
+
+// trackBatch writes the batch to txTable and adds a commit event to
+// the events queue.
+// Assumes mu is held.
+func (mg *manager) trackBatch(batch ...WriteOp) {
+ if mg.events.Len() == 0 {
+ return
+ }
+ // TODO(rogulenko): do GC.
+ mg.seq++
+ var keys [][]byte
+ for _, write := range batch {
+ mg.txTable.add(write.Key, mg.seq)
+ keys = append(keys, write.Key)
+ }
+ tx := &commitedTransaction{
+ seq: mg.seq,
+ batch: keys,
+ }
+ mg.events.PushBack(tx)
+}
+
+//////////////////////////////////////////////////////////////
+// Read and Write types used for storing transcation reads
+// and uncommitted writes.
+
+type WriteType int
+
+const (
+ PutOp WriteType = iota
+ DeleteOp
+)
+
+type WriteOp struct {
+ T WriteType
+ Key []byte
+ Value []byte
+}
+
+type scanRange struct {
+ Start, Limit []byte
+}
+
+type readSet struct {
+ Keys [][]byte
+ Ranges []scanRange
+}
+
+type writeOpArray []WriteOp
+
+func (a writeOpArray) Len() int {
+ return len(a)
+}
+
+func (a writeOpArray) Less(i, j int) bool {
+ return string(a[i].Key) < string(a[j].Key)
+}
+
+func (a writeOpArray) Swap(i, j int) {
+ a[i], a[j] = a[j], a[i]
+}
diff --git a/services/syncbase/store/merged_stream.go b/services/syncbase/store/transactions/merged_stream.go
similarity index 89%
rename from services/syncbase/store/merged_stream.go
rename to services/syncbase/store/transactions/merged_stream.go
index e8c1058..4ab10be 100644
--- a/services/syncbase/store/merged_stream.go
+++ b/services/syncbase/store/transactions/merged_stream.go
@@ -2,10 +2,12 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package store
+package transactions
import (
"sort"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
)
//////////////////////////////////////////////////////////////
@@ -19,7 +21,7 @@
// changes to be passed in as an array of WriteOp.
// Create a new stream which merges a snapshot stream with an array of write operations.
-func MergeWritesWithStream(sn Snapshot, w []WriteOp, start, limit []byte) Stream {
+func mergeWritesWithStream(sn store.Snapshot, w []WriteOp, start, limit []byte) store.Stream {
// Collect writes with the range specified, then sort them.
// Note: Writes could contain more than one write for a given key.
// The last write is the current state.
@@ -29,7 +31,7 @@
writesMap[string(write.Key)] = write
}
}
- var writesArray WriteOpArray
+ var writesArray writeOpArray
for _, writeOp := range writesMap {
writesArray = append(writesArray, writeOp)
}
@@ -45,7 +47,7 @@
}
type mergedStream struct {
- snapshotStream Stream
+ snapshotStream store.Stream
writesArray []WriteOp
writesCursor int
unusedSnapshotValue bool
@@ -94,8 +96,8 @@
s.writesCursor++
return false
}
- s.key = CopyBytes(s.key, s.writesArray[s.writesCursor].Key)
- s.value = CopyBytes(s.value, s.writesArray[s.writesCursor].Value)
+ s.key = store.CopyBytes(s.key, s.writesArray[s.writesCursor].Key)
+ s.value = store.CopyBytes(s.value, s.writesArray[s.writesCursor].Value)
s.writesCursor++
return true
}
@@ -122,7 +124,7 @@
if !s.hasValue {
panic("nothing staged")
}
- return CopyBytes(keybuf, s.key)
+ return store.CopyBytes(keybuf, s.key)
}
// Value implements the Stream interface.
@@ -130,7 +132,7 @@
if !s.hasValue {
panic("nothing staged")
}
- return CopyBytes(valbuf, s.value)
+ return store.CopyBytes(valbuf, s.value)
}
// Err implements the Stream interface.
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/transactions/transaction.go
similarity index 67%
rename from services/syncbase/store/leveldb/transaction.go
rename to services/syncbase/store/transactions/transaction.go
index b8093f6..dcf7569 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/transactions/transaction.go
@@ -2,10 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package leveldb
+package transactions
-// #include "leveldb/c.h"
-import "C"
import (
"bytes"
"container/list"
@@ -16,59 +14,41 @@
"v.io/x/lib/vlog"
)
-// commitedTransaction is only used as an element of db.txEvents.
-type commitedTransaction struct {
- seq uint64
- batch [][]byte
-}
-
-// transaction is a wrapper around LevelDB WriteBatch that implements
-// the store.Transaction interface.
+// transaction is a wrapper on top of a BatchWriter and a store.Snapshot that
+// implements the store.Transaction interface.
type transaction struct {
// mu protects the state of the transaction.
mu sync.Mutex
- node *store.ResourceNode
- d *db
+ mg *manager
seq uint64
event *list.Element // pointer to element of db.txEvents
snapshot store.Snapshot
- reads store.ReadSet
- writes []store.WriteOp
- cOpts *C.leveldb_writeoptions_t
+ reads readSet
+ writes []WriteOp
err error
}
var _ store.Transaction = (*transaction)(nil)
-func newTransaction(d *db, parent *store.ResourceNode) *transaction {
- node := store.NewResourceNode()
- snapshot := newSnapshot(d, node)
+func newTransaction(mg *manager) *transaction {
tx := &transaction{
- node: node,
- d: d,
- snapshot: snapshot,
- seq: d.txSequenceNumber,
- cOpts: d.writeOptions,
+ mg: mg,
+ snapshot: mg.BatchStore.NewSnapshot(),
+ seq: mg.seq,
}
- tx.event = d.txEvents.PushFront(tx)
- parent.AddChild(tx.node, func() {
- tx.Abort()
- })
+ tx.event = mg.events.PushFront(tx)
return tx
}
-// close frees allocated C objects and releases acquired locks.
+// close removes this transaction from the mg.events queue and aborts
+// the underlying snapshot.
// Assumes mu is held.
func (tx *transaction) close() {
tx.removeEvent()
- tx.node.Close()
- if tx.cOpts != tx.d.writeOptions {
- C.leveldb_writeoptions_destroy(tx.cOpts)
- }
- tx.cOpts = nil
+ tx.snapshot.Abort()
}
-// removeEvent removes this transaction from the db.txEvents queue.
+// removeEvent removes this transaction from the mg.events queue.
// Assumes mu is held.
func (tx *transaction) removeEvent() {
// This can happen if the transaction was committed, since Commit()
@@ -76,9 +56,9 @@
if tx.event == nil {
return
}
- tx.d.txmu.Lock()
- tx.d.txEvents.Remove(tx.event)
- tx.d.txmu.Unlock()
+ tx.mg.mu.Lock()
+ tx.mg.events.Remove(tx.event)
+ tx.mg.mu.Unlock()
tx.event = nil
}
@@ -87,7 +67,7 @@
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
- return valbuf, convertError(tx.err)
+ return valbuf, store.ConvertError(tx.err)
}
tx.reads.Keys = append(tx.reads.Keys, key)
@@ -99,7 +79,7 @@
for i := len(tx.writes) - 1; i >= 0; i-- {
op := &tx.writes[i]
if bytes.Equal(op.Key, key) {
- if op.T == store.PutOp {
+ if op.T == PutOp {
return op.Value, nil
}
return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
@@ -117,13 +97,13 @@
return &store.InvalidStream{Error: tx.err}
}
- tx.reads.Ranges = append(tx.reads.Ranges, store.ScanRange{
+ tx.reads.Ranges = append(tx.reads.Ranges, scanRange{
Start: start,
Limit: limit,
})
// Return a stream which merges the snaphot stream with the uncommitted changes.
- return store.MergeWritesWithStream(tx.snapshot, tx.writes, start, limit)
+ return mergeWritesWithStream(tx.snapshot, tx.writes, start, limit)
}
// Put implements the store.StoreWriter interface.
@@ -131,10 +111,10 @@
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
- return convertError(tx.err)
+ return store.ConvertError(tx.err)
}
- tx.writes = append(tx.writes, store.WriteOp{
- T: store.PutOp,
+ tx.writes = append(tx.writes, WriteOp{
+ T: PutOp,
Key: key,
Value: value,
})
@@ -146,10 +126,10 @@
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
- return convertError(tx.err)
+ return store.ConvertError(tx.err)
}
- tx.writes = append(tx.writes, store.WriteOp{
- T: store.DeleteOp,
+ tx.writes = append(tx.writes, WriteOp{
+ T: DeleteOp,
Key: key,
})
return nil
@@ -157,16 +137,16 @@
// validateReadSet returns true iff the read set of this transaction has not
// been invalidated by other transactions.
-// Assumes tx.d.txmu is held.
+// Assumes tx.mg.mu is held.
func (tx *transaction) validateReadSet() bool {
for _, key := range tx.reads.Keys {
- if tx.d.txTable.get(key) > tx.seq {
+ if tx.mg.txTable.get(key) > tx.seq {
vlog.VI(3).Infof("key conflict: %q", key)
return false
}
}
for _, r := range tx.reads.Ranges {
- if tx.d.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
+ if tx.mg.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
vlog.VI(3).Infof("range conflict: {%q, %q}", r.Start, r.Limit)
return false
}
@@ -180,7 +160,7 @@
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
- return convertError(tx.err)
+ return store.ConvertError(tx.err)
}
tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
// Explicitly remove this transaction from the event queue. If this was the
@@ -188,12 +168,16 @@
// not add this transaction's write set to txTable.
tx.removeEvent()
defer tx.close()
- tx.d.txmu.Lock()
- defer tx.d.txmu.Unlock()
+ tx.mg.mu.Lock()
+ defer tx.mg.mu.Unlock()
if !tx.validateReadSet() {
return store.NewErrConcurrentTransaction(nil)
}
- return tx.d.writeLocked(tx.writes, tx.cOpts)
+ if err := tx.mg.BatchStore.WriteBatch(tx.writes...); err != nil {
+ return err
+ }
+ tx.mg.trackBatch(tx.writes...)
+ return nil
}
// Abort implements the store.Transaction interface.
@@ -201,7 +185,7 @@
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.err != nil {
- return convertError(tx.err)
+ return store.ConvertError(tx.err)
}
tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
tx.close()
diff --git a/services/syncbase/store/leveldb/trie.go b/services/syncbase/store/transactions/trie.go
similarity index 98%
rename from services/syncbase/store/leveldb/trie.go
rename to services/syncbase/store/transactions/trie.go
index 78a6982..51a99a3 100644
--- a/services/syncbase/store/leveldb/trie.go
+++ b/services/syncbase/store/transactions/trie.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package leveldb
+package transactions
import (
"fmt"
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
index 72c4beb..19695ed 100644
--- a/services/syncbase/store/util.go
+++ b/services/syncbase/store/util.go
@@ -55,42 +55,7 @@
return dst
}
-//////////////////////////////////////////////////////////////
-// Read and Write types used for storing transcation reads
-// and uncommitted writes.
-
-type ScanRange struct {
- Start, Limit []byte
-}
-
-type ReadSet struct {
- Keys [][]byte
- Ranges []ScanRange
-}
-
-type WriteType int
-
-const (
- PutOp WriteType = iota
- DeleteOp
-)
-
-type WriteOp struct {
- T WriteType
- Key []byte
- Value []byte
-}
-
-type WriteOpArray []WriteOp
-
-func (a WriteOpArray) Len() int {
- return len(a)
-}
-
-func (a WriteOpArray) Less(i, j int) bool {
- return string(a[i].Key) < string(a[j].Key)
-}
-
-func (a WriteOpArray) Swap(i, j int) {
- a[i], a[j] = a[j], a[i]
+// ConvertError returns a copy of the verror, appending the current stack to it.
+func ConvertError(err error) error {
+ return verror.Convert(verror.IDAction{}, nil, err)
}