store/leveldb: adding mutexes
Wrapping db, snapshot and transaction with mutexes,
adding a bunch of tests.
This change also removes verrors from internal storage engine
for consistency: we should either use verrors everywhere or not
use them at all. As discussed with Adam offline, we should
probably start from the latter and switch to verrors later.
Change-Id: I38bf9760b35c40943225629c37e7d41ec8c30085
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 67c79a7..07198d2 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -75,18 +75,15 @@
func (t *table) Scan(ctx *context.T, call wire.TableScanServerCall, start, end string) error {
sn := t.d.st.NewSnapshot()
defer sn.Close()
- it, err := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), start, end))
- if err == nil {
- sender := call.SendStream()
- key, value := []byte{}, []byte{}
- for it.Advance() {
- key = it.Key(key)
- parts := util.SplitKeyParts(string(key))
- sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: it.Value(value)})
- }
- err = it.Err()
+ it := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), start, end))
+ sender := call.SendStream()
+ key, value := []byte{}, []byte{}
+ for it.Advance() {
+ key = it.Key(key)
+ parts := util.SplitKeyParts(string(key))
+ sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: it.Value(value)})
}
- if err != nil {
+ if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
index 750f4db..98ffd48 100644
--- a/services/syncbase/server/util/glob.go
+++ b/services/syncbase/server/util/glob.go
@@ -55,12 +55,7 @@
return nil, verror.New(verror.ErrInternal, ctx, err)
}
}
- // TODO(sadovsky): Does Scan really need to return an error?
- it, err := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
- if err != nil {
- sn.Close()
- return nil, verror.New(verror.ErrInternal, ctx, err)
- }
+ it := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
ch := make(chan naming.GlobReply)
go func() {
defer sn.Close()
diff --git a/services/syncbase/store/benchmark/benchmark.go b/services/syncbase/store/benchmark/benchmark.go
index a4700e6..c794fa9 100644
--- a/services/syncbase/store/benchmark/benchmark.go
+++ b/services/syncbase/store/benchmark/benchmark.go
@@ -110,7 +110,7 @@
func ReadSequential(b *testing.B, config *Config) {
WriteSequential(b, config)
b.ResetTimer()
- s, _ := config.St.Scan([]byte("0"), []byte("z"))
+ s := config.St.Scan([]byte("0"), []byte("z"))
var key, value []byte
for i := 0; i < b.N; i++ {
if !s.Advance() {
diff --git a/services/syncbase/store/invalid_types.go b/services/syncbase/store/invalid_types.go
new file mode 100644
index 0000000..05bfc13
--- /dev/null
+++ b/services/syncbase/store/invalid_types.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package store
+
+// InvalidSnapshot is a store.Snapshot for which all methods return errors.
+type InvalidSnapshot struct {
+ // Error is the error returned by every method call.
+ Error error
+}
+
+// Close implements the store.Snapshot interface.
+func (s *InvalidSnapshot) Close() error {
+ return s.Error
+}
+
+// Get implements the store.StoreReader interface.
+func (s *InvalidSnapshot) Get(key, valbuf []byte) ([]byte, error) {
+ return valbuf, s.Error
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *InvalidSnapshot) Scan(start, end []byte) Stream {
+ return &InvalidStream{s.Error}
+}
+
+// InvalidStream is a store.Stream for which all methods return errors.
+type InvalidStream struct {
+ // Error is the error returned by every method call.
+ Error error
+}
+
+// Advance implements the store.Stream interface.
+func (s *InvalidStream) Advance() bool {
+ return false
+}
+
+// Key implements the store.Stream interface.
+func (s *InvalidStream) Key(keybuf []byte) []byte {
+ panic(s.Error)
+}
+
+// Value implements the store.Stream interface.
+func (s *InvalidStream) Value(valbuf []byte) []byte {
+ panic(s.Error)
+}
+
+// Err implements the store.Stream interface.
+func (s *InvalidStream) Err() error {
+ return s.Error
+}
+
+// Cancel implements the store.Stream interface.
+func (s *InvalidStream) Cancel() {
+}
+
+// InvalidTransaction is a store.Transaction for which all methods return errors.
+type InvalidTransaction struct {
+ // Error is the error returned by every method call.
+ Error error
+}
+
+// ResetForRetry implements the store.Transaction interface.
+func (tx *InvalidTransaction) ResetForRetry() {
+ panic(tx.Error)
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *InvalidTransaction) Get(key, valbuf []byte) ([]byte, error) {
+ return valbuf, tx.Error
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *InvalidTransaction) Scan(start, end []byte) Stream {
+ return &InvalidStream{tx.Error}
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *InvalidTransaction) Put(key, value []byte) error {
+ return tx.Error
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *InvalidTransaction) Delete(key []byte) error {
+ return tx.Error
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *InvalidTransaction) Commit() error {
+ return tx.Error
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *InvalidTransaction) Abort() error {
+ return tx.Error
+}
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index b15374d..672eb83 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -11,22 +11,29 @@
// #include "syncbase_leveldb.h"
import "C"
import (
+ "errors"
"sync"
"unsafe"
"v.io/syncbase/x/ref/services/syncbase/store"
)
+var (
+ errClosedStore = errors.New("closed store")
+)
+
// db is a wrapper around LevelDB that implements the store.Store interface.
-// TODO(rogulenko): ensure thread safety.
type db struct {
+ // mu protects cDb.
+ mu sync.RWMutex
cDb *C.leveldb_t
// Default read/write options.
readOptions *C.leveldb_readoptions_t
writeOptions *C.leveldb_writeoptions_t
+ err error
// Used to prevent concurrent transactions.
// TODO(rogulenko): improve concurrency.
- mu sync.Mutex
+ txmu sync.Mutex
}
var _ store.Store = (*db)(nil)
@@ -58,9 +65,18 @@
// Close implements the store.Store interface.
func (d *db) Close() error {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ if d.err != nil {
+ return d.err
+ }
C.leveldb_close(d.cDb)
+ d.cDb = nil
C.leveldb_readoptions_destroy(d.readOptions)
+ d.readOptions = nil
C.leveldb_writeoptions_destroy(d.writeOptions)
+ d.writeOptions = nil
+ d.err = errors.New("closed store")
return nil
}
@@ -81,8 +97,13 @@
}
// Scan implements the store.StoreReader interface.
-func (d *db) Scan(start, end []byte) (store.Stream, error) {
- return newStream(d, start, end, d.readOptions), nil
+func (d *db) Scan(start, end []byte) store.Stream {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ return &store.InvalidStream{d.err}
+ }
+ return newStream(d, start, end, d.readOptions)
}
// Put implements the store.StoreWriter interface.
@@ -103,26 +124,44 @@
// NewTransaction implements the store.Store interface.
func (d *db) NewTransaction() store.Transaction {
+ // txmu is held until the transaction is successfully committed or aborted.
+ d.txmu.Lock()
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ d.txmu.Unlock()
+ return &store.InvalidTransaction{d.err}
+ }
return newTransaction(d)
}
// NewSnapshot implements the store.Store interface.
func (d *db) NewSnapshot() store.Snapshot {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ return &store.InvalidSnapshot{d.err}
+ }
return newSnapshot(d)
}
// getWithOpts returns the value for the given key.
// cOpts may contain a pointer to a snapshot.
func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ return valbuf, d.err
+ }
var cError *C.char
var valLen C.size_t
cStr, cLen := cSlice(key)
val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError)
if err := goError(cError); err != nil {
- return nil, err
+ return valbuf, err
}
if val == nil {
- return nil, &store.ErrUnknownKey{Key: string(key)}
+ return valbuf, &store.ErrUnknownKey{Key: string(key)}
}
defer C.leveldb_free(unsafe.Pointer(val))
return store.CopyBytes(valbuf, goBytes(val, valLen)), nil
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
index fccad5f..666cc40 100644
--- a/services/syncbase/store/leveldb/db_test.go
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -19,27 +19,37 @@
}
func TestStream(t *testing.T) {
- db, dbPath := newDB()
- defer destroyDB(db, dbPath)
- test.RunStreamTest(t, db)
+ runTest(t, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+ runTest(t, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+ runTest(t, test.RunStoreStateTest)
}
func TestReadWriteBasic(t *testing.T) {
- st, path := newDB()
- defer destroyDB(st, path)
- test.RunReadWriteBasicTest(t, st)
+ runTest(t, test.RunReadWriteBasicTest)
}
func TestReadWriteRandom(t *testing.T) {
- st, path := newDB()
- defer destroyDB(st, path)
- test.RunReadWriteRandomTest(t, st)
+ runTest(t, test.RunReadWriteRandomTest)
+}
+
+func TestTransactionState(t *testing.T) {
+ runTest(t, test.RunTransactionStateTest)
}
func TestTransactionsWithGet(t *testing.T) {
- st, path := newDB()
- defer destroyDB(st, path)
- test.RunTransactionsWithGetTest(t, st)
+ runTest(t, test.RunTransactionsWithGetTest)
+}
+
+func runTest(t *testing.T, f func(t *testing.T, st store.Store)) {
+ st, dbPath := newDB()
+ defer destroyDB(st, dbPath)
+ f(t, st)
}
func newDB() (store.Store, string) {
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index afa9f25..7b82c7c 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -7,15 +7,21 @@
// #include "leveldb/c.h"
import "C"
import (
+ "errors"
+ "sync"
+
"v.io/syncbase/x/ref/services/syncbase/store"
)
// snapshot is a wrapper around LevelDB snapshot that implements
// the store.Snapshot interface.
type snapshot struct {
+ // mu protects the state of the snapshot.
+ mu sync.RWMutex
d *db
cSnapshot *C.leveldb_snapshot_t
cOpts *C.leveldb_readoptions_t
+ err error
}
var _ store.Snapshot = (*snapshot)(nil)
@@ -26,25 +32,43 @@
C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
return &snapshot{
- d,
- cSnapshot,
- cOpts,
+ d: d,
+ cSnapshot: cSnapshot,
+ cOpts: cOpts,
}
}
// Close implements the store.Snapshot interface.
func (s *snapshot) Close() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.err != nil {
+ return s.err
+ }
C.leveldb_readoptions_destroy(s.cOpts)
+ s.cOpts = nil
C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
+ s.cSnapshot = nil
+ s.err = errors.New("closed snapshot")
return nil
}
// Get implements the store.StoreReader interface.
func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if s.err != nil {
+ return valbuf, s.err
+ }
return s.d.getWithOpts(key, valbuf, s.cOpts)
}
// Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
- return newStream(s.d, start, end, s.cOpts), nil
+func (s *snapshot) Scan(start, end []byte) store.Stream {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if s.err != nil {
+ return &store.InvalidStream{s.err}
+ }
+ return newStream(s.d, start, end, s.cOpts)
}
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index e4b1631..4705e59 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -9,10 +9,10 @@
import "C"
import (
"bytes"
+ "errors"
"sync"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/verror"
)
// stream is a wrapper around LevelDB iterator that implements
@@ -39,7 +39,6 @@
func newStream(d *db, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
cStr, size := cSlice(start)
- // TODO(rogulenko): check if (db.cDb != nil) under a db-scoped mutex.
cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
return &stream{
cIter: cIter,
@@ -57,7 +56,7 @@
s.mu.Lock()
defer s.mu.Unlock()
s.hasValue = false
- if s.cIter == nil {
+ if s.err != nil {
return false
}
// The C iterator starts out initialized, pointing at the first value; we
@@ -113,10 +112,9 @@
func (s *stream) Cancel() {
s.mu.Lock()
defer s.mu.Unlock()
- if s.cIter == nil {
+ if s.err != nil {
return
}
- s.err = verror.New(verror.ErrCanceled, nil)
// s.hasValue might be false if Advance was never called.
if s.hasValue {
// We copy the key and the value from the C heap to the Go heap before
@@ -124,6 +122,7 @@
s.key = store.CopyBytes(nil, s.cKey())
s.value = store.CopyBytes(nil, s.cVal())
}
+ s.err = errors.New("canceled stream")
s.destroyLeveldbIter()
}
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index d358f51..12c3a65 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -7,62 +7,85 @@
// #include "leveldb/c.h"
import "C"
import (
+ "errors"
+ "sync"
+
"v.io/syncbase/x/ref/services/syncbase/store"
)
// transaction is a wrapper around LevelDB WriteBatch that implements
// the store.Transaction interface.
-// TODO(rogulenko): handle incorrect usage.
type transaction struct {
+ mu sync.Mutex
d *db
snapshot store.Snapshot
batch *C.leveldb_writebatch_t
cOpts *C.leveldb_writeoptions_t
+ err error
}
var _ store.Transaction = (*transaction)(nil)
func newTransaction(d *db) *transaction {
- // The lock is held until the transaction is successfully
- // committed or aborted.
- d.mu.Lock()
return &transaction{
- d,
- d.NewSnapshot(),
- C.leveldb_writebatch_create(),
- d.writeOptions,
+ d: d,
+ snapshot: d.NewSnapshot(),
+ batch: C.leveldb_writebatch_create(),
+ cOpts: d.writeOptions,
}
}
// close frees allocated C objects and releases acquired locks.
func (tx *transaction) close() {
- tx.d.mu.Unlock()
- tx.snapshot.Close()
+ tx.d.txmu.Unlock()
C.leveldb_writebatch_destroy(tx.batch)
+ tx.batch = nil
if tx.cOpts != tx.d.writeOptions {
C.leveldb_writeoptions_destroy(tx.cOpts)
}
+ tx.cOpts = nil
}
// ResetForRetry implements the store.Transaction interface.
func (tx *transaction) ResetForRetry() {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.batch == nil {
+ panic(tx.err)
+ }
tx.snapshot.Close()
tx.snapshot = tx.d.NewSnapshot()
+ tx.err = nil
C.leveldb_writebatch_clear(tx.batch)
}
// Get implements the store.StoreReader interface.
func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return valbuf, tx.err
+ }
return tx.snapshot.Get(key, valbuf)
}
// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) store.Stream {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return &store.InvalidStream{tx.err}
+ }
return tx.snapshot.Scan(start, end)
}
// Put implements the store.StoreWriter interface.
func (tx *transaction) Put(key, value []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return tx.err
+ }
cKey, cKeyLen := cSlice(key)
cVal, cValLen := cSlice(value)
C.leveldb_writebatch_put(tx.batch, cKey, cKeyLen, cVal, cValLen)
@@ -71,6 +94,11 @@
// Delete implements the store.StoreWriter interface.
func (tx *transaction) Delete(key []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return tx.err
+ }
cKey, cKeyLen := cSlice(key)
C.leveldb_writebatch_delete(tx.batch, cKey, cKeyLen)
return nil
@@ -78,17 +106,32 @@
// Commit implements the store.Transaction interface.
func (tx *transaction) Commit() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return tx.err
+ }
+ tx.d.mu.Lock()
+ defer tx.d.mu.Unlock()
var cError *C.char
C.leveldb_write(tx.d.cDb, tx.cOpts, tx.batch, &cError)
if err := goError(cError); err != nil {
+ tx.err = errors.New("already attempted to commit transaction")
return err
}
+ tx.err = errors.New("committed transaction")
tx.close()
return nil
}
// Abort implements the store.Transaction interface.
func (tx *transaction) Abort() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.batch == nil {
+ return tx.err
+ }
+ tx.err = errors.New("aborted transaction")
tx.close()
return nil
}
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index 180cd14..fc7cf0a 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -52,21 +52,21 @@
s.mu.Lock()
defer s.mu.Unlock()
if err := s.error(); err != nil {
- return nil, err
+ return valbuf, err
}
value, ok := s.data[string(key)]
if !ok {
- return nil, &store.ErrUnknownKey{Key: string(key)}
+ return valbuf, &store.ErrUnknownKey{Key: string(key)}
}
return store.CopyBytes(valbuf, value), nil
}
// Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
+func (s *snapshot) Scan(start, end []byte) store.Stream {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.error(); err != nil {
- return nil, err
+ return &store.InvalidStream{err}
}
- return newStream(s, start, end), nil
+ return newStream(s, start, end)
}
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index 36a0758..e844b20 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -40,16 +40,16 @@
defer st.mu.Unlock()
value, ok := st.data[string(key)]
if !ok {
- return nil, &store.ErrUnknownKey{Key: string(key)}
+ return valbuf, &store.ErrUnknownKey{Key: string(key)}
}
return store.CopyBytes(valbuf, value), nil
}
// Scan implements the store.StoreReader interface.
-func (st *memstore) Scan(start, end []byte) (store.Stream, error) {
+func (st *memstore) Scan(start, end []byte) store.Stream {
st.mu.Lock()
defer st.mu.Unlock()
- return newStream(newSnapshot(st), start, end), nil
+ return newStream(newSnapshot(st), start, end)
}
// Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index 1ba6e85..4006ce7 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -8,6 +8,7 @@
"runtime"
"testing"
+ "v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/test"
)
@@ -16,27 +17,39 @@
}
func TestStream(t *testing.T) {
- st := New()
- defer st.Close()
- test.RunStreamTest(t, st)
+ runTest(t, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+ runTest(t, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+ // TODO(rogulenko): Enable this test once memstore.Close causes memstore to
+ // disallow subsequent operations.
+ // runTest(t, test.RunStoreStateTest)
}
func TestReadWriteBasic(t *testing.T) {
- st := New()
- defer st.Close()
- test.RunReadWriteBasicTest(t, st)
+ runTest(t, test.RunReadWriteBasicTest)
}
func TestReadWriteRandom(t *testing.T) {
- st := New()
- defer st.Close()
- test.RunReadWriteRandomTest(t, st)
+ runTest(t, test.RunReadWriteRandomTest)
+}
+
+func TestTransactionState(t *testing.T) {
+ runTest(t, test.RunTransactionStateTest)
}
func TestTransactionsWithGet(t *testing.T) {
- st := New()
- defer st.Close()
// TODO(sadovsky): Enable this test once we've added a retry loop to
// RunInTransaction. Without that, concurrency makes the test fail.
- //test.RunTransactionsWithGetTest(t, st)
+ // runTest(t, test.RunTransactionsWithGetTest)
+}
+
+func runTest(t *testing.T, f func(t *testing.T, st store.Store)) {
+ st := New()
+ defer st.Close()
+ f(t, st)
}
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index 1e96c01..221d33c 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -5,11 +5,11 @@
package memstore
import (
+ "errors"
"sort"
"sync"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/verror"
)
type stream struct {
@@ -89,5 +89,5 @@
func (s *stream) Cancel() {
s.mu.Lock()
defer s.mu.Unlock()
- s.err = verror.New(verror.ErrCanceled, nil)
+ s.err = errors.New("canceled stream")
}
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 3ef45aa..56dcbdd 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -80,19 +80,19 @@
tx.mu.Lock()
defer tx.mu.Unlock()
if err := tx.error(); err != nil {
- return nil, err
+ return valbuf, err
}
return tx.sn.Get(key, valbuf)
}
// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) store.Stream {
tx.mu.Lock()
defer tx.mu.Unlock()
if err := tx.error(); err != nil {
- return nil, err
+ return &store.InvalidStream{err}
}
- return newStream(tx.sn, start, end), nil
+ return newStream(tx.sn, start, end)
}
// Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index 926b66d..22a5c99 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -12,11 +12,12 @@
// sub-slice of valbuf if valbuf was large enough to hold the entire value.
// Otherwise, a newly allocated slice will be returned. It is valid to pass a
// nil valbuf.
- // Fails if the given key is unknown (ErrUnknownKey).
+ // If the given key is unknown, valbuf is returned unchanged and the function
+ // fails with ErrUnknownKey.
Get(key, valbuf []byte) ([]byte, error)
// Scan returns all rows with keys in range [start, end).
- Scan(start, end []byte) (Stream, error)
+ Scan(start, end []byte) Stream
}
// StoreWriter writes data to a CRUD-capable storage engine.
diff --git a/services/syncbase/store/test/snapshot.go b/services/syncbase/store/test/snapshot.go
new file mode 100644
index 0000000..0276fd1
--- /dev/null
+++ b/services/syncbase/store/test/snapshot.go
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "strings"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// RunSnapshotTest verifies store.Snapshot operations.
+func RunSnapshotTest(t *testing.T, st store.Store) {
+ key1, value1 := []byte("key1"), []byte("value1")
+ st.Put(key1, value1)
+ snapshot := st.NewSnapshot()
+ key2, value2 := []byte("key2"), []byte("value2")
+ st.Put(key2, value2)
+
+ // Test Get and Scan.
+ verifyGet(t, snapshot, key1, value1)
+ verifyGet(t, snapshot, key2, nil)
+ s := snapshot.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, key1, value1)
+ verifyAdvance(t, s, nil, nil)
+
+ // Test functions after Close.
+ if err := snapshot.Close(); err != nil {
+ t.Fatalf("can't close the snapshot: %v", err)
+ }
+ expectedErr := "closed snapshot"
+ if err := snapshot.Close(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if _, err := snapshot.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ s = snapshot.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, nil, nil)
+ if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+}
diff --git a/services/syncbase/store/test/stream.go b/services/syncbase/store/test/stream.go
index 295d683..4caeb7c 100644
--- a/services/syncbase/store/test/stream.go
+++ b/services/syncbase/store/test/stream.go
@@ -6,10 +6,10 @@
import (
"bytes"
+ "strings"
"testing"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/verror"
)
// RunStreamTest verifies store.Stream operations.
@@ -21,25 +21,14 @@
key3, value3 := []byte("key3"), []byte("value3")
st.Put(key3, value3)
- s, _ := st.Scan([]byte("a"), []byte("z"))
- if !s.Advance() {
- t.Fatalf("can't advance the stream")
- }
- var key, value []byte
- for i := 0; i < 2; i++ {
- if key = s.Key(key); !bytes.Equal(key, key1) {
- t.Fatalf("unexpected key: got %q, want %q", key, key1)
- }
- if value = s.Value(value); !bytes.Equal(value, value1) {
- t.Fatalf("unexpected value: got %q, want %q", value, value1)
- }
- }
-
+ s := st.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, key1, value1)
if !s.Advance() {
t.Fatalf("can't advance the stream")
}
s.Cancel()
for i := 0; i < 2; i++ {
+ var key, value []byte
if key = s.Key(key); !bytes.Equal(key, key2) {
t.Fatalf("unexpected key: got %q, want %q", key, key2)
}
@@ -47,11 +36,8 @@
t.Fatalf("unexpected value: got %q, want %q", value, value2)
}
}
-
- if s.Advance() {
- t.Fatalf("advance returned true unexpectedly")
- }
- if verror.ErrorID(s.Err()) != verror.ErrCanceled.ID {
+ verifyAdvance(t, s, nil, nil)
+ if !strings.Contains(s.Err().Error(), "canceled stream") {
t.Fatalf("unexpected steam error: %v", s.Err())
}
}
diff --git a/services/syncbase/store/test/test.go b/services/syncbase/store/test/test.go
index 0643bae..330f69a 100644
--- a/services/syncbase/store/test/test.go
+++ b/services/syncbase/store/test/test.go
@@ -2,17 +2,13 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// TODO(rogulenko): add more tests.
-
+// TODO(rogulenko): rename this file test.go -> store.go.
package test
import (
- "bytes"
"fmt"
"math/rand"
- "reflect"
- "strconv"
- "sync"
+ "strings"
"testing"
"v.io/syncbase/x/ref/services/syncbase/store"
@@ -81,22 +77,14 @@
// verify checks that various read operations on store.Store and memtable return
// the same results.
func (s *storeState) verify(t *testing.T, st store.StoreReader) {
- var key, value []byte
- var err error
// Verify Get().
for i := 0; i < s.size; i++ {
keystr := fmt.Sprintf("%05d", i)
answer, ok := s.memtable[keystr]
- key = []byte(keystr)
- value, err = st.Get(key, value)
if ok {
- if err != nil || !bytes.Equal(value, answer) {
- t.Fatalf("unexpected get result for %q: got {%q, %v}, want {%q, nil}", keystr, value, err, answer)
- }
+ verifyGet(t, st, []byte(keystr), answer)
} else {
- if !reflect.DeepEqual(&store.ErrUnknownKey{Key: keystr}, err) {
- t.Fatalf("unexpected get error for key %q: %v", keystr, err)
- }
+ verifyGet(t, st, []byte(keystr), nil)
}
}
// Verify 10 random Scan() calls.
@@ -106,25 +94,12 @@
start, end = end, start
}
end++
- stream, err := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
- if err != nil {
- t.Fatalf("can't create stream: %v", err)
- }
- for stream.Advance() {
- start = s.lowerBound(start)
+ stream := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
+ for start = s.lowerBound(start); start < end; start = s.lowerBound(start + 1) {
keystr := fmt.Sprintf("%05d", start)
- key, value = stream.Key(key), stream.Value(value)
- if string(key) != keystr {
- t.Fatalf("unexpected key during scan: got %q, want %q", key, keystr)
- }
- if !bytes.Equal(value, s.memtable[keystr]) {
- t.Fatalf("unexpected value during scan: got %q, want %q", value, s.memtable[keystr])
- }
- start++
+ verifyAdvance(t, stream, []byte(keystr), s.memtable[keystr])
}
- if start = s.lowerBound(start); start < end {
- t.Fatalf("stream ended unexpectedly")
- }
+ verifyAdvance(t, stream, nil, nil)
}
}
@@ -192,85 +167,47 @@
runReadWriteTest(t, st, size, testcase)
}
-// RunTransactionsWithGetTest tests transactions that use Put and Get
-// operations.
-// NOTE: consider setting GOMAXPROCS to something greater than 1.
-func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
- // Invariant: value mapped to n is sum of values of 0..n-1.
- // Each of k transactions takes m distinct random values from 0..n-1, adds 1
- // to each and m to value mapped to n.
- // The correctness of sums is checked after all transactions have been
- // committed.
- n, m, k := 10, 3, 100
- for i := 0; i <= n; i++ {
- if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
- t.Fatalf("can't write to database")
- }
+// RunStoreStateTest verifies operations that modify the state of a store.Store.
+func RunStoreStateTest(t *testing.T, st store.Store) {
+ key1, value1 := []byte("key1"), []byte("value1")
+ st.Put(key1, value1)
+ key2 := []byte("key2")
+
+ // Test Get and Scan.
+ verifyGet(t, st, key1, value1)
+ verifyGet(t, st, key2, nil)
+ s := st.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, key1, value1)
+ verifyAdvance(t, s, nil, nil)
+
+ // Test functions after Close.
+ if err := st.Close(); err != nil {
+ t.Fatalf("can't close the store: %v", err)
}
- var wg sync.WaitGroup
- wg.Add(k)
- // TODO(sadovsky): This configuration creates huge resource contention.
- // Perhaps we should add some random sleep's to reduce the contention.
- for i := 0; i < k; i++ {
- go func() {
- rnd := rand.New(rand.NewSource(239017 * int64(i)))
- perm := rnd.Perm(n)
- if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- for j := 0; j <= m; j++ {
- var keystr string
- if j < m {
- keystr = fmt.Sprintf("%05d", perm[j])
- } else {
- keystr = fmt.Sprintf("%05d", n)
- }
- key := []byte(keystr)
- val, err := st.Get(key, nil)
- if err != nil {
- return fmt.Errorf("can't get key %q: %v", key, err)
- }
- intValue, err := strconv.ParseInt(string(val), 10, 64)
- if err != nil {
- return fmt.Errorf("can't parse int from %q: %v", val, err)
- }
- var newValue int64
- if j < m {
- newValue = intValue + 1
- } else {
- newValue = intValue + int64(m)
- }
- if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
- return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
- }
- }
- return nil
- }); err != nil {
- panic(fmt.Errorf("can't commit transaction: %v", err))
- }
- wg.Done()
- }()
+ expectedErr := "closed store"
+ if err := st.Close(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
}
- wg.Wait()
- var sum int64
- for j := 0; j <= n; j++ {
- keystr := fmt.Sprintf("%05d", j)
- key := []byte(keystr)
- val, err := st.Get(key, nil)
- if err != nil {
- t.Fatalf("can't get key %q: %v", key, err)
- }
- intValue, err := strconv.ParseInt(string(val), 10, 64)
- if err != nil {
- t.Fatalf("can't parse int from %q: %v", val, err)
- }
- if j < n {
- sum += intValue
- } else {
- if intValue != int64(m*k) {
- t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
- }
- }
+ s = st.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, nil, nil)
+ if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
}
- if sum != int64(m*k) {
- t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
+ snapshot := st.NewSnapshot()
+ if _, err := snapshot.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ tx := st.NewTransaction()
+ if _, err := tx.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if _, err := st.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := st.Put(key1, value1); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := st.Delete(key1); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
}
}
diff --git a/services/syncbase/store/test/transaction.go b/services/syncbase/store/test/transaction.go
new file mode 100644
index 0000000..e078e49
--- /dev/null
+++ b/services/syncbase/store/test/transaction.go
@@ -0,0 +1,154 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "fmt"
+ "math/rand"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// RunTransactionTest verifies operations that modify the state of a
+// store.Transaction.
+func RunTransactionStateTest(t *testing.T, st store.Store) {
+ abortFunctions := []func(t *testing.T, tx store.Transaction) string{
+ func(t *testing.T, tx store.Transaction) string {
+ if err := tx.Abort(); err != nil {
+ t.Fatalf("can't abort the transaction: %v", err)
+ }
+ return "aborted transaction"
+ },
+ func(t *testing.T, tx store.Transaction) string {
+ if err := tx.Commit(); err != nil {
+ t.Fatalf("can't commit the transaction: %v", err)
+ }
+ return "committed transaction"
+ },
+ }
+ for _, fn := range abortFunctions {
+ key1, value1 := []byte("key1"), []byte("value1")
+ st.Put(key1, value1)
+ key2 := []byte("key2")
+ tx := st.NewTransaction()
+
+ // Test Get and Scan.
+ verifyGet(t, tx, key1, value1)
+ verifyGet(t, tx, key2, nil)
+ s := tx.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, key1, value1)
+ verifyAdvance(t, s, nil, nil)
+
+ // Test functions after Abort.
+ expectedErr := fn(t, tx)
+ if err := tx.Abort(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := tx.Commit(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ s = tx.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, nil, nil)
+ if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if _, err := tx.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := tx.Put(key1, value1); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := tx.Delete(key1); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ }
+}
+
+// RunTransactionsWithGetTest tests transactions that use Put and Get
+// operations.
+// NOTE: consider setting GOMAXPROCS to something greater than 1.
+func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
+ // Invariant: value mapped to n is sum of values of 0..n-1.
+ // Each of k transactions takes m distinct random values from 0..n-1, adds 1
+ // to each and m to value mapped to n.
+ // The correctness of sums is checked after all transactions have been
+ // committed.
+ n, m, k := 10, 3, 100
+ for i := 0; i <= n; i++ {
+ if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
+ t.Fatalf("can't write to database")
+ }
+ }
+ var wg sync.WaitGroup
+ wg.Add(k)
+ // TODO(sadovsky): This configuration creates huge resource contention.
+ // Perhaps we should add some random sleep's to reduce the contention.
+ for i := 0; i < k; i++ {
+ go func() {
+ rnd := rand.New(rand.NewSource(239017 * int64(i)))
+ perm := rnd.Perm(n)
+ if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ for j := 0; j <= m; j++ {
+ var keystr string
+ if j < m {
+ keystr = fmt.Sprintf("%05d", perm[j])
+ } else {
+ keystr = fmt.Sprintf("%05d", n)
+ }
+ key := []byte(keystr)
+ val, err := st.Get(key, nil)
+ if err != nil {
+ return fmt.Errorf("can't get key %q: %v", key, err)
+ }
+ intValue, err := strconv.ParseInt(string(val), 10, 64)
+ if err != nil {
+ return fmt.Errorf("can't parse int from %q: %v", val, err)
+ }
+ var newValue int64
+ if j < m {
+ newValue = intValue + 1
+ } else {
+ newValue = intValue + int64(m)
+ }
+ if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
+ return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
+ }
+ }
+ return nil
+ }); err != nil {
+ panic(fmt.Errorf("can't commit transaction: %v", err))
+ }
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ var sum int64
+ for j := 0; j <= n; j++ {
+ keystr := fmt.Sprintf("%05d", j)
+ key := []byte(keystr)
+ val, err := st.Get(key, nil)
+ if err != nil {
+ t.Fatalf("can't get key %q: %v", key, err)
+ }
+ intValue, err := strconv.ParseInt(string(val), 10, 64)
+ if err != nil {
+ t.Fatalf("can't parse int from %q: %v", val, err)
+ }
+ if j < n {
+ sum += intValue
+ } else {
+ if intValue != int64(m*k) {
+ t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
+ }
+ }
+ }
+ if sum != int64(m*k) {
+ t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
+ }
+}
diff --git a/services/syncbase/store/test/util.go b/services/syncbase/store/test/util.go
new file mode 100644
index 0000000..3f8ca38
--- /dev/null
+++ b/services/syncbase/store/test/util.go
@@ -0,0 +1,68 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "bytes"
+ "reflect"
+ "runtime/debug"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// verifyGet verifies that st.Get(key) == value. If value is nil, verifies that
+// the key is not found.
+func verifyGet(t *testing.T, st store.StoreReader, key, value []byte) {
+ valbuf := []byte("tmp")
+ var err error
+ if value != nil {
+ if valbuf, err = st.Get(key, valbuf); err != nil {
+ Fatalf(t, "can't get value of %q: %v", key, err)
+ }
+ if !bytes.Equal(valbuf, value) {
+ Fatalf(t, "unexpected value: got %q, want %q", valbuf, value)
+ }
+ } else {
+ valbuf, err = st.Get(key, valbuf)
+ if !reflect.DeepEqual(&store.ErrUnknownKey{Key: string(key)}, err) {
+ Fatalf(t, "unexpected get error for key %q: %v", key, err)
+ }
+ valcopy := []byte("tmp")
+ // Verify that valbuf is not modified if the key is not found.
+ if !bytes.Equal(valbuf, valcopy) {
+ Fatalf(t, "unexpected value: got %q, want %q", valbuf, valcopy)
+ }
+ }
+}
+
+// verifyGet verifies the next key/value pair of the provided stream.
+// If key is nil, verifies that next Advance call on the stream returns false.
+func verifyAdvance(t *testing.T, s store.Stream, key, value []byte) {
+ ok := s.Advance()
+ if key == nil {
+ if ok {
+ Fatalf(t, "advance returned true unexpectedly")
+ }
+ return
+ }
+ if !ok {
+ Fatalf(t, "can't advance the stream")
+ }
+ var k, v []byte
+ for i := 0; i < 2; i++ {
+ if k = s.Key(k); !bytes.Equal(k, key) {
+ Fatalf(t, "unexpected key: got %q, want %q", k, key)
+ }
+ if v = s.Value(v); !bytes.Equal(v, value) {
+ Fatalf(t, "unexpected value: got %q, want %q", v, value)
+ }
+ }
+}
+
+func Fatalf(t *testing.T, format string, args ...interface{}) {
+ debug.PrintStack()
+ t.Fatalf(format, args...)
+}
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
index d36b260..82a8350 100644
--- a/services/syncbase/store/util.go
+++ b/services/syncbase/store/util.go
@@ -25,6 +25,7 @@
// CopyBytes copies elements from a source slice into a destination slice.
// The returned slice may be a sub-slice of dst if dst was large enough to hold
// src. Otherwise, a newly allocated slice will be returned.
+// TODO(rogulenko): add some tests.
func CopyBytes(dst, src []byte) []byte {
if cap(dst) < len(src) {
newlen := cap(dst)*2 + 2
@@ -33,6 +34,7 @@
}
dst = make([]byte, newlen)
}
+ dst = dst[:len(src)]
copy(dst, src)
- return dst[:len(src)]
+ return dst
}