store/leveldb: adding mutexes

Wrapping db, snapshot and transaction with mutexes,
adding a bunch of tests.

This change also removes verrors from internal storage engine
for consistency: we should either use verrors everywhere or not
use them at all. As discussed with Adam offline, we should
probably start from the latter and switch to verrors later.

Change-Id: I38bf9760b35c40943225629c37e7d41ec8c30085
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index b15374d..672eb83 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -11,22 +11,29 @@
 // #include "syncbase_leveldb.h"
 import "C"
 import (
+	"errors"
 	"sync"
 	"unsafe"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
+var (
+	errClosedStore = errors.New("closed store")
+)
+
 // db is a wrapper around LevelDB that implements the store.Store interface.
-// TODO(rogulenko): ensure thread safety.
 type db struct {
+	// mu protects cDb.
+	mu  sync.RWMutex
 	cDb *C.leveldb_t
 	// Default read/write options.
 	readOptions  *C.leveldb_readoptions_t
 	writeOptions *C.leveldb_writeoptions_t
+	err          error
 	// Used to prevent concurrent transactions.
 	// TODO(rogulenko): improve concurrency.
-	mu sync.Mutex
+	txmu sync.Mutex
 }
 
 var _ store.Store = (*db)(nil)
@@ -58,9 +65,18 @@
 
 // Close implements the store.Store interface.
 func (d *db) Close() error {
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	if d.err != nil {
+		return d.err
+	}
 	C.leveldb_close(d.cDb)
+	d.cDb = nil
 	C.leveldb_readoptions_destroy(d.readOptions)
+	d.readOptions = nil
 	C.leveldb_writeoptions_destroy(d.writeOptions)
+	d.writeOptions = nil
+	d.err = errors.New("closed store")
 	return nil
 }
 
@@ -81,8 +97,13 @@
 }
 
 // Scan implements the store.StoreReader interface.
-func (d *db) Scan(start, end []byte) (store.Stream, error) {
-	return newStream(d, start, end, d.readOptions), nil
+func (d *db) Scan(start, end []byte) store.Stream {
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		return &store.InvalidStream{d.err}
+	}
+	return newStream(d, start, end, d.readOptions)
 }
 
 // Put implements the store.StoreWriter interface.
@@ -103,26 +124,44 @@
 
 // NewTransaction implements the store.Store interface.
 func (d *db) NewTransaction() store.Transaction {
+	// txmu is held until the transaction is successfully committed or aborted.
+	d.txmu.Lock()
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		d.txmu.Unlock()
+		return &store.InvalidTransaction{d.err}
+	}
 	return newTransaction(d)
 }
 
 // NewSnapshot implements the store.Store interface.
 func (d *db) NewSnapshot() store.Snapshot {
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		return &store.InvalidSnapshot{d.err}
+	}
 	return newSnapshot(d)
 }
 
 // getWithOpts returns the value for the given key.
 // cOpts may contain a pointer to a snapshot.
 func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		return valbuf, d.err
+	}
 	var cError *C.char
 	var valLen C.size_t
 	cStr, cLen := cSlice(key)
 	val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError)
 	if err := goError(cError); err != nil {
-		return nil, err
+		return valbuf, err
 	}
 	if val == nil {
-		return nil, &store.ErrUnknownKey{Key: string(key)}
+		return valbuf, &store.ErrUnknownKey{Key: string(key)}
 	}
 	defer C.leveldb_free(unsafe.Pointer(val))
 	return store.CopyBytes(valbuf, goBytes(val, valLen)), nil