store/leveldb: adding mutexes
Wrapping db, snapshot and transaction with mutexes,
adding a bunch of tests.
This change also removes verrors from internal storage engine
for consistency: we should either use verrors everywhere or not
use them at all. As discussed with Adam offline, we should
probably start from the latter and switch to verrors later.
Change-Id: I38bf9760b35c40943225629c37e7d41ec8c30085
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index b15374d..672eb83 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -11,22 +11,29 @@
// #include "syncbase_leveldb.h"
import "C"
import (
+ "errors"
"sync"
"unsafe"
"v.io/syncbase/x/ref/services/syncbase/store"
)
+var (
+ errClosedStore = errors.New("closed store")
+)
+
// db is a wrapper around LevelDB that implements the store.Store interface.
-// TODO(rogulenko): ensure thread safety.
type db struct {
+ // mu protects cDb.
+ mu sync.RWMutex
cDb *C.leveldb_t
// Default read/write options.
readOptions *C.leveldb_readoptions_t
writeOptions *C.leveldb_writeoptions_t
+ err error
// Used to prevent concurrent transactions.
// TODO(rogulenko): improve concurrency.
- mu sync.Mutex
+ txmu sync.Mutex
}
var _ store.Store = (*db)(nil)
@@ -58,9 +65,18 @@
// Close implements the store.Store interface.
func (d *db) Close() error {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ if d.err != nil {
+ return d.err
+ }
C.leveldb_close(d.cDb)
+ d.cDb = nil
C.leveldb_readoptions_destroy(d.readOptions)
+ d.readOptions = nil
C.leveldb_writeoptions_destroy(d.writeOptions)
+ d.writeOptions = nil
+ d.err = errors.New("closed store")
return nil
}
@@ -81,8 +97,13 @@
}
// Scan implements the store.StoreReader interface.
-func (d *db) Scan(start, end []byte) (store.Stream, error) {
- return newStream(d, start, end, d.readOptions), nil
+func (d *db) Scan(start, end []byte) store.Stream {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ return &store.InvalidStream{d.err}
+ }
+ return newStream(d, start, end, d.readOptions)
}
// Put implements the store.StoreWriter interface.
@@ -103,26 +124,44 @@
// NewTransaction implements the store.Store interface.
func (d *db) NewTransaction() store.Transaction {
+ // txmu is held until the transaction is successfully committed or aborted.
+ d.txmu.Lock()
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ d.txmu.Unlock()
+ return &store.InvalidTransaction{d.err}
+ }
return newTransaction(d)
}
// NewSnapshot implements the store.Store interface.
func (d *db) NewSnapshot() store.Snapshot {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ return &store.InvalidSnapshot{d.err}
+ }
return newSnapshot(d)
}
// getWithOpts returns the value for the given key.
// cOpts may contain a pointer to a snapshot.
func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ return valbuf, d.err
+ }
var cError *C.char
var valLen C.size_t
cStr, cLen := cSlice(key)
val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError)
if err := goError(cError); err != nil {
- return nil, err
+ return valbuf, err
}
if val == nil {
- return nil, &store.ErrUnknownKey{Key: string(key)}
+ return valbuf, &store.ErrUnknownKey{Key: string(key)}
}
defer C.leveldb_free(unsafe.Pointer(val))
return store.CopyBytes(valbuf, goBytes(val, valLen)), nil