store: initial implementations of internal API on top of LevelDB
The initial implementations supports all operations defined in the
internal storage engine API. The biggest short-cut is having only one
active transaction at a time (others are blocked by a mutex).
This implementation doesn't use levigo layer at all as the signature
of most levigo methods is different from internal storage API.
Change-Id: I7c254a3c5528c92570c2cac097db5b3293af345a
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
new file mode 100644
index 0000000..bd251c3
--- /dev/null
+++ b/services/syncbase/store/leveldb/db.go
@@ -0,0 +1,128 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #cgo LDFLAGS: -lleveldb
+// #include <stdlib.h>
+// #include "leveldb/c.h"
+// #include "syncbase_leveldb.h"
+import "C"
+import (
+ "sync"
+ "unsafe"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// DB is a wrapper around LevelDB that implements the store.Store interface.
+// TODO(rogulenko): ensure thread safety.
+type DB struct {
+ cDb *C.leveldb_t
+ // Default read/write options.
+ readOptions *C.leveldb_readoptions_t
+ writeOptions *C.leveldb_writeoptions_t
+ // Used to prevent concurrent transactions.
+ // TODO(rogulenko): improve concurrency.
+ mu sync.Mutex
+}
+
+var _ store.Store = (*DB)(nil)
+
+// Open opens the database located at the given path, creating it if it doesn't
+// exist.
+func Open(path string) (*DB, error) {
+ var cError *C.char
+ cPath := C.CString(path)
+ defer C.free(unsafe.Pointer(cPath))
+
+ cOpts := C.leveldb_options_create()
+ C.leveldb_options_set_create_if_missing(cOpts, 1)
+ C.leveldb_options_set_paranoid_checks(cOpts, 1)
+ defer C.leveldb_options_destroy(cOpts)
+
+ cDb := C.leveldb_open(cOpts, cPath, &cError)
+ if err := goError(cError); err != nil {
+ return nil, err
+ }
+ readOptions := C.leveldb_readoptions_create()
+ C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
+ return &DB{
+ cDb: cDb,
+ readOptions: readOptions,
+ writeOptions: C.leveldb_writeoptions_create(),
+ }, nil
+}
+
+// Close implements the store.Store interface.
+func (db *DB) Close() error {
+ C.leveldb_close(db.cDb)
+ C.leveldb_readoptions_destroy(db.readOptions)
+ C.leveldb_writeoptions_destroy(db.writeOptions)
+ return nil
+}
+
+// Destroy removes all physical data of the database located at the given path.
+func Destroy(path string) error {
+ var cError *C.char
+ cPath := C.CString(path)
+ defer C.free(unsafe.Pointer(cPath))
+ cOpts := C.leveldb_options_create()
+ defer C.leveldb_options_destroy(cOpts)
+ C.leveldb_destroy_db(cOpts, cPath, &cError)
+ return goError(cError)
+}
+
+// NewTransaction implements the store.Store interface.
+func (db *DB) NewTransaction() store.Transaction {
+ return newTransaction(db)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (db *DB) NewSnapshot() store.Snapshot {
+ return newSnapshot(db)
+}
+
+// Scan implements the store.StoreReader interface.
+func (db *DB) Scan(start, end string) (store.Stream, error) {
+ return newStream(db, start, end, db.readOptions), nil
+}
+
+// Get implements the store.StoreReader interface.
+func (db *DB) Get(key string) ([]byte, error) {
+ return db.getWithOpts(key, db.readOptions)
+}
+
+// Put implements the store.StoreWriter interface.
+func (db *DB) Put(key string, v []byte) error {
+ // TODO(rogulenko): improve performance.
+ return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
+ return st.Put(key, v)
+ })
+}
+
+// Delete implements the store.StoreWriter interface.
+func (db *DB) Delete(key string) error {
+ // TODO(rogulenko): improve performance.
+ return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
+ return st.Delete(key)
+ })
+}
+
+// getWithOpts returns the value for the given key.
+// cOpts may contain a pointer to a snapshot.
+func (db *DB) getWithOpts(key string, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+ var cError *C.char
+ var valLen C.size_t
+ cStr, cLen := cSlice(key)
+ val := C.leveldb_get(db.cDb, cOpts, cStr, cLen, &valLen, &cError)
+ if err := goError(cError); err != nil {
+ return nil, err
+ }
+ if val == nil {
+ return nil, &store.ErrUnknownKey{Key: key}
+ }
+ defer C.leveldb_free(unsafe.Pointer(val))
+ return C.GoBytes(unsafe.Pointer(val), C.int(valLen)), nil
+}
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
new file mode 100644
index 0000000..d9801cf
--- /dev/null
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -0,0 +1,55 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+import (
+ "fmt"
+ "io/ioutil"
+ "runtime"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store/test"
+)
+
+func init() {
+ runtime.GOMAXPROCS(10)
+}
+
+func TestReadWriteBasic(t *testing.T) {
+ db, dbPath := newDB()
+ defer destroyDB(db, dbPath)
+ test.RunReadWriteBasicTest(t, db)
+}
+
+func TestReadWriteRandom(t *testing.T) {
+ db, dbPath := newDB()
+ defer destroyDB(db, dbPath)
+ test.RunReadWriteRandomTest(t, db)
+}
+
+func TestTransactionsWithGet(t *testing.T) {
+ db, dbPath := newDB()
+ defer destroyDB(db, dbPath)
+ test.RunTransactionsWithGetTest(t, db)
+}
+
+func newDB() (*DB, string) {
+ dbPath, err := ioutil.TempDir("", "syncbase_leveldb")
+ if err != nil {
+ panic(fmt.Sprintf("can't create temp dir: %v", err))
+ }
+ db, err := Open(dbPath)
+ if err != nil {
+ panic(fmt.Sprintf("can't open db in %v: %v", dbPath, err))
+ }
+ return db, dbPath
+}
+
+func destroyDB(db *DB, dbPath string) {
+ db.Close()
+ if err := Destroy(dbPath); err != nil {
+ panic(fmt.Sprintf("can't destroy db located in %v: %v", dbPath, err))
+ }
+}
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
new file mode 100644
index 0000000..802e67c
--- /dev/null
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -0,0 +1,50 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #include "leveldb/c.h"
+import "C"
+import (
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// snapshot is a wrapper around LevelDB snapshot that implements
+// the store.Snapshot interface.
+type snapshot struct {
+ db *DB
+ cSnapshot *C.leveldb_snapshot_t
+ cOpts *C.leveldb_readoptions_t
+}
+
+var _ store.Snapshot = (*snapshot)(nil)
+
+func newSnapshot(db *DB) *snapshot {
+ cSnapshot := C.leveldb_create_snapshot(db.cDb)
+ cOpts := C.leveldb_readoptions_create()
+ C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
+ C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
+ return &snapshot{
+ db,
+ cSnapshot,
+ cOpts,
+ }
+}
+
+// Close implements the store.Snapshot interface.
+func (s *snapshot) Close() error {
+ C.leveldb_readoptions_destroy(s.cOpts)
+ C.leveldb_release_snapshot(s.db.cDb, s.cSnapshot)
+ return nil
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, end string) (store.Stream, error) {
+ return newStream(s.db, start, end, s.cOpts), nil
+}
+
+// Get implements the store.StoreReader interface.
+func (s *snapshot) Get(key string) ([]byte, error) {
+ return s.db.getWithOpts(key, s.cOpts)
+}
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
new file mode 100644
index 0000000..1f3baa7
--- /dev/null
+++ b/services/syncbase/store/leveldb/stream.go
@@ -0,0 +1,107 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #include "leveldb/c.h"
+// #include "syncbase_leveldb.h"
+import "C"
+import (
+ "errors"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/x/lib/vlog"
+)
+
+// stream is a wrapper around LevelDB iterator that implements
+// the store.Stream interface.
+// TODO(rogulenko): ensure thread safety.
+type stream struct {
+ cIter *C.syncbase_leveldb_iterator_t
+ end string
+
+ advancedOnce bool
+ err error
+}
+
+var _ store.Stream = (*stream)(nil)
+
+func newStream(db *DB, start, end string, cOpts *C.leveldb_readoptions_t) *stream {
+ cStr, size := cSlice(start)
+ cIter := C.syncbase_leveldb_create_iterator(db.cDb, cOpts, cStr, size)
+ return &stream{
+ cIter: cIter,
+ end: end,
+ }
+}
+
+func (it *stream) destroyLeveldbIter() {
+ C.syncbase_leveldb_iter_destroy(it.cIter)
+ it.cIter = nil
+}
+
+// Advance implements the store.Stream interface.
+func (it *stream) Advance() bool {
+ if it.cIter == nil {
+ return false
+ }
+ // C iterator is already initialized after creation; we shouldn't move
+ // it during first Advance() call.
+ if !it.advancedOnce {
+ it.advancedOnce = true
+ } else {
+ C.syncbase_leveldb_iter_next(it.cIter)
+ }
+ if it.cIter.is_valid != 0 && it.end > it.key() {
+ return true
+ }
+
+ var cError *C.char
+ C.syncbase_leveldb_iter_get_error(it.cIter, &cError)
+ it.err = goError(cError)
+ it.destroyLeveldbIter()
+ return false
+}
+
+// Value implements the store.Stream interface.
+// The returned values point to buffers allocated on C heap.
+// The data is valid until the next call to Advance or Cancel.
+func (it *stream) Value() store.KeyValue {
+ if !it.advancedOnce {
+ vlog.Fatal("stream has never been advanced")
+ }
+ if it.cIter == nil {
+ vlog.Fatal("illegal state")
+ }
+ return store.KeyValue{
+ Key: it.key(),
+ Value: it.val(),
+ }
+}
+
+// Err implements the store.Stream interface.
+func (it *stream) Err() error {
+ return it.err
+}
+
+// Cancel implements the store.Stream interface.
+func (it *stream) Cancel() {
+ if it.cIter == nil {
+ return
+ }
+ it.err = errors.New("cancelled")
+ it.destroyLeveldbIter()
+}
+
+// key returns the key. The key points to buffer allocated on C heap.
+// The data is valid until the next call to Advance or Cancel.
+func (it *stream) key() string {
+ return goString(it.cIter.key, it.cIter.key_len)
+}
+
+// val returns the value. The value points to buffer allocated on C heap.
+// The data is valid until the next call to Advance or Cancel.
+func (it *stream) val() []byte {
+ return goBytes(it.cIter.val, it.cIter.val_len)
+}
diff --git a/services/syncbase/store/leveldb/syncbase_leveldb.cc b/services/syncbase/store/leveldb/syncbase_leveldb.cc
new file mode 100644
index 0000000..8c6f7e6
--- /dev/null
+++ b/services/syncbase/store/leveldb/syncbase_leveldb.cc
@@ -0,0 +1,47 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file is intended to be C++ so that we can access C++ LevelDB interface
+// directly if necessary.
+
+#include "syncbase_leveldb.h"
+
+extern "C" {
+
+static void PopulateIteratorFields(syncbase_leveldb_iterator_t* iter) {
+ iter->is_valid = leveldb_iter_valid(iter->rep);
+ if (!iter->is_valid) {
+ return;
+ }
+ iter->key = leveldb_iter_key(iter->rep, &iter->key_len);
+ iter->val = leveldb_iter_value(iter->rep, &iter->val_len);
+}
+
+syncbase_leveldb_iterator_t* syncbase_leveldb_create_iterator(
+ leveldb_t* db,
+ const leveldb_readoptions_t* options,
+ const char* start, size_t start_len) {
+ syncbase_leveldb_iterator_t* result = new syncbase_leveldb_iterator_t;
+ result->rep = leveldb_create_iterator(db, options);
+ leveldb_iter_seek(result->rep, start, start_len);
+ PopulateIteratorFields(result);
+ return result;
+}
+
+void syncbase_leveldb_iter_destroy(syncbase_leveldb_iterator_t* iter) {
+ leveldb_iter_destroy(iter->rep);
+ delete iter;
+}
+
+void syncbase_leveldb_iter_next(syncbase_leveldb_iterator_t* iter) {
+ leveldb_iter_next(iter->rep);
+ PopulateIteratorFields(iter);
+}
+
+void syncbase_leveldb_iter_get_error(
+ const syncbase_leveldb_iterator_t* iter, char** errptr) {
+ leveldb_iter_get_error(iter->rep, errptr);
+}
+
+} // end extern "C"
diff --git a/services/syncbase/store/leveldb/syncbase_leveldb.h b/services/syncbase/store/leveldb/syncbase_leveldb.h
new file mode 100644
index 0000000..d2faa82
--- /dev/null
+++ b/services/syncbase/store/leveldb/syncbase_leveldb.h
@@ -0,0 +1,61 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+//
+// This file contains helpers to minimize the number of cgo calls, which have
+// some overhead.
+// Some conventions:
+//
+// Errors are represented by a null-terminated C string. NULL means no error.
+// All operations that can raise an error are passed a "char** errptr" as the
+// last argument. *errptr should be NULL.
+// On failure, leveldb sets *errptr to a malloc()ed error message.
+//
+// All of the pointer arguments must be non-NULL.
+
+#ifndef V_IO_SYNCBASE_X_REF_SERVICES_SYNCBASE_STORE_LEVELDB_SYNCBASE_LEVELDB_H_
+#define V_IO_SYNCBASE_X_REF_SERVICES_SYNCBASE_STORE_LEVELDB_SYNCBASE_LEVELDB_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "leveldb/c.h"
+
+// Fields of this struct are accessed from go directly without cgo calls.
+struct syncbase_leveldb_iterator_t {
+ leveldb_iterator_t* rep;
+ unsigned char is_valid;
+ char const* key;
+ size_t key_len;
+ char const* val;
+ size_t val_len;
+};
+
+typedef struct syncbase_leveldb_iterator_t syncbase_leveldb_iterator_t;
+
+// Returns iterator that points to first key that is not less than |start|.
+// The returned iterator must be passed to |syncbase_leveldb_iter_destroy|
+// when finished.
+syncbase_leveldb_iterator_t* syncbase_leveldb_create_iterator(
+ leveldb_t* db,
+ const leveldb_readoptions_t* options,
+ const char* start, size_t start_len);
+
+// Deallocates iterator returned by |syncbase_leveldb_create_iterator|.
+void syncbase_leveldb_iter_destroy(syncbase_leveldb_iterator_t*);
+
+// Moves to the next entry in the source. After this call, |is_valid| is
+// true iff the iterator was not positioned at the last entry in the source.
+// REQUIRES: |is_valid| is true.
+void syncbase_leveldb_iter_next(syncbase_leveldb_iterator_t* iter);
+
+// Returns a non-nil error iff the iterator encountered any errors.
+void syncbase_leveldb_iter_get_error(
+ const syncbase_leveldb_iterator_t* iter, char** errptr);
+
+#ifdef __cplusplus
+} // end extern "C"
+#endif
+
+#endif // V_IO_SYNCBASE_X_REF_SERVICES_SYNCBASE_STORE_LEVELDB_SYNCBASE_LEVELDB_H_
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
new file mode 100644
index 0000000..3a9f4b2
--- /dev/null
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -0,0 +1,94 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #include "leveldb/c.h"
+import "C"
+import (
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// transaction is a wrapper around LevelDB WriteBatch that implements
+// the store.Transaction interface.
+// TODO(rogulenko): handle incorrect usage.
+type transaction struct {
+ db *DB
+ snapshot store.Snapshot
+ batch *C.leveldb_writebatch_t
+ cOpts *C.leveldb_writeoptions_t
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(db *DB) *transaction {
+ // The lock is held until the transaction is successfully
+ // committed or aborted.
+ db.mu.Lock()
+ return &transaction{
+ db,
+ db.NewSnapshot(),
+ C.leveldb_writebatch_create(),
+ db.writeOptions,
+ }
+}
+
+// close frees allocated C objects and releases acquired locks.
+func (tx *transaction) close() {
+ tx.db.mu.Unlock()
+ tx.snapshot.Close()
+ C.leveldb_writebatch_destroy(tx.batch)
+ if tx.cOpts != tx.db.writeOptions {
+ C.leveldb_writeoptions_destroy(tx.cOpts)
+ }
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+ tx.close()
+ return nil
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+ var cError *C.char
+ C.leveldb_write(tx.db.cDb, tx.cOpts, tx.batch, &cError)
+ if err := goError(cError); err != nil {
+ return err
+ }
+ tx.close()
+ return nil
+}
+
+// ResetForRetry implements the store.Transaction interface.
+func (tx *transaction) ResetForRetry() {
+ tx.snapshot.Close()
+ tx.snapshot = tx.db.NewSnapshot()
+ C.leveldb_writebatch_clear(tx.batch)
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, end string) (store.Stream, error) {
+ return tx.snapshot.Scan(start, end)
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key string) ([]byte, error) {
+ return tx.snapshot.Get(key)
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key string, v []byte) error {
+ cKey, cKeyLen := cSlice(key)
+ cVal, cValLen := cSliceFromBytes(v)
+ C.leveldb_writebatch_put(tx.batch, cKey, cKeyLen, cVal, cValLen)
+ return nil
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key string) error {
+ cKey, cKeyLen := cSlice(key)
+ C.leveldb_writebatch_delete(tx.batch, cKey, cKeyLen)
+ return nil
+}
diff --git a/services/syncbase/store/leveldb/util.go b/services/syncbase/store/leveldb/util.go
new file mode 100644
index 0000000..d7b9717
--- /dev/null
+++ b/services/syncbase/store/leveldb/util.go
@@ -0,0 +1,66 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #include "leveldb/c.h"
+import "C"
+import (
+ "errors"
+ "reflect"
+ "unsafe"
+)
+
+// goError copies C error into Go heap and frees C buffer.
+func goError(cError *C.char) error {
+ if cError == nil {
+ return nil
+ }
+ err := errors.New(C.GoString(cError))
+ C.leveldb_free(unsafe.Pointer(cError))
+ return err
+}
+
+// cSlice converts Go string to C string without copying the data.
+//
+// This function behaves similarly to standard Go slice copying or sub-slicing;
+// the caller need not worry about ownership or garbage collection.
+func cSlice(str string) (*C.char, C.size_t) {
+ if len(str) == 0 {
+ return nil, 0
+ }
+ data := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&str)).Data)
+ return (*C.char)(data), C.size_t(len(str))
+}
+
+// cSliceFromBytes converts Go []byte to C string without copying the data.
+// This function behaves similarly to cSlice.
+func cSliceFromBytes(bytes []byte) (*C.char, C.size_t) {
+ if len(bytes) == 0 {
+ return nil, 0
+ }
+ data := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&bytes)).Data)
+ return (*C.char)(data), C.size_t(len(bytes))
+}
+
+// goString converts C string to Go string without copying the data.
+// This function behaves similarly to cSlice.
+func goString(str *C.char, size C.size_t) string {
+ ptr := unsafe.Pointer(&reflect.StringHeader{
+ Data: uintptr(unsafe.Pointer(str)),
+ Len: int(size),
+ })
+ return *(*string)(ptr)
+}
+
+// goBytes converts C string to Go []byte without copying the data.
+// This function behaves similarly to cSlice.
+func goBytes(str *C.char, size C.size_t) []byte {
+ ptr := unsafe.Pointer(&reflect.SliceHeader{
+ Data: uintptr(unsafe.Pointer(str)),
+ Len: int(size),
+ Cap: int(size),
+ })
+ return *(*[]byte)(ptr)
+}
diff --git a/services/syncbase/store/memstore/memstore.go b/services/syncbase/store/memstore/memstore.go
index a97370e..ebd9b2a 100644
--- a/services/syncbase/store/memstore/memstore.go
+++ b/services/syncbase/store/memstore/memstore.go
@@ -201,3 +201,7 @@
vlog.Fatal("not implemented")
return nil
}
+
+func (st *memstore) Close() error {
+ return nil
+}
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index 166079f..5fda7e2 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -43,6 +43,9 @@
// NewSnapshot creates a snapshot.
// TODO(rogulenko): add snapshot options.
NewSnapshot() Snapshot
+
+ // Close closes the store.
+ Close() error
}
// Transaction provides a mechanism for atomic reads and writes.
@@ -97,10 +100,12 @@
// Value returns the element that was staged by Advance. Value may panic if
// Advance returned false or was not called at all. Value does not block.
+ // The data is valid until the next call to Advance or Cancel.
Value() KeyValue
// Err returns a non-nil error iff the stream encountered any errors. Err does
// not block.
+ // TODO(rogulenko): define an error type.
Err() error
// Cancel notifies the stream provider that it can stop producing elements.
diff --git a/services/syncbase/store/test/test.go b/services/syncbase/store/test/test.go
new file mode 100644
index 0000000..fc80ee9
--- /dev/null
+++ b/services/syncbase/store/test/test.go
@@ -0,0 +1,270 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// TODO(rogulenko): add more tests.
+
+package test
+
+import (
+ "bytes"
+ "fmt"
+ "math/rand"
+ "reflect"
+ "strconv"
+ "sync"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type operation int
+
+const (
+ Put operation = 0
+ Delete operation = 1
+)
+
+type testStep struct {
+ op operation
+ key int
+}
+
+func randomBytes(rnd *rand.Rand, length int) []byte {
+ var res []byte
+ for i := 0; i < length; i++ {
+ res = append(res, '0'+byte(rnd.Intn(10)))
+ }
+ return res
+}
+
+// dbState is the in-memory representation of the database state.
+type dbState struct {
+ // We assume that the database has keys from range [0..dbSize).
+ dbSize int
+ rnd *rand.Rand
+ memtable map[string][]byte
+}
+
+func newDBState(dbSize int) *dbState {
+ s := &dbState{
+ dbSize,
+ rand.New(rand.NewSource(239017)),
+ make(map[string][]byte),
+ }
+ return s
+}
+
+func (s *dbState) clone() *dbState {
+ other := &dbState{
+ s.dbSize,
+ s.rnd,
+ make(map[string][]byte),
+ }
+ for k, v := range s.memtable {
+ other.memtable[k] = v
+ }
+ return other
+}
+
+// nextKey returns minimal key in the database that is not less than the
+// provided key. In case of no such key, returns dbSize.
+func (s *dbState) lowerBound(key int) int {
+ for key < s.dbSize {
+ if _, ok := s.memtable[fmt.Sprintf("%05d", key)]; ok {
+ return key
+ }
+ key++
+ }
+ return key
+}
+
+// verify ensures that various read operation on store.Store and memtable
+// return the same result.
+func (s *dbState) verify(t *testing.T, st store.StoreReader) {
+ // Verify Get().
+ for i := 0; i < s.dbSize; i++ {
+ key := fmt.Sprintf("%05d", i)
+ answer, ok := s.memtable[key]
+ value, err := st.Get(key)
+ if ok {
+ if err != nil || !bytes.Equal(value, answer) {
+ t.Fatalf("unexpected get result for %v: got {%#v: %#v}, want {%#v: nil}", key, value, err, answer)
+ }
+ } else {
+ if !reflect.DeepEqual(&store.ErrUnknownKey{Key: key}, err) {
+ t.Fatalf("unexpected get error for key %v: %v", key, err)
+ }
+ }
+ }
+ // Verify 10 random Scan() calls.
+ for i := 0; i < 10; i++ {
+ start, end := s.rnd.Intn(s.dbSize), s.rnd.Intn(s.dbSize)
+ if start > end {
+ start, end = end, start
+ }
+ end++
+ stream, err := st.Scan(fmt.Sprintf("%05d", start), fmt.Sprintf("%05d", end))
+ if err != nil {
+ t.Fatalf("can't create stream: %v", err)
+ }
+ for stream.Advance() {
+ start = s.lowerBound(start)
+ key := fmt.Sprintf("%05d", start)
+ kv := stream.Value()
+ if kv.Key != key {
+ t.Fatalf("unexpected key during scan: got %s, want %s", kv.Key, key)
+ }
+ if !bytes.Equal(kv.Value, s.memtable[key]) {
+ t.Fatalf("unexpected value during scan: got %s, want %s", kv.Value, s.memtable[key])
+ }
+ start++
+ }
+ if start = s.lowerBound(start); start < end {
+ t.Fatalf("stream ended unexpectedly")
+ }
+ }
+}
+
+// runReadWriteTest verifies read/write/snapshot operations.
+func runReadWriteTest(t *testing.T, st store.Store, dbSize int, steps []testStep) {
+ s := newDBState(dbSize)
+ // We verify database state not more than ~100 times to prevent the test
+ // from being slow.
+ frequency := (len(steps) + 99) / 100
+ var states []*dbState
+ var snapshots []store.Snapshot
+ for i, step := range steps {
+ if step.key < 0 || step.key >= s.dbSize {
+ t.Fatalf("invalid test step %v", step)
+ }
+ switch step.op {
+ case Put:
+ key := fmt.Sprintf("%05d", step.key)
+ value := randomBytes(s.rnd, 100)
+ s.memtable[key] = value
+ st.Put(key, value)
+ case Delete:
+ key := fmt.Sprintf("%05d", step.key)
+ if _, ok := s.memtable[key]; ok {
+ delete(s.memtable, key)
+ st.Delete(key)
+ }
+ default:
+ t.Fatalf("invalid test step %v", step)
+ }
+ if i%frequency == 0 {
+ s.verify(t, st)
+ states = append(states, s.clone())
+ snapshots = append(snapshots, st.NewSnapshot())
+ }
+ }
+ s.verify(t, st)
+ for i := 0; i < len(states); i++ {
+ states[i].verify(t, snapshots[i])
+ snapshots[i].Close()
+ }
+}
+
+// RunReadWriteBasicTest runs a basic test that verifies reads, writes and
+// snapshots.
+func RunReadWriteBasicTest(t *testing.T, st store.Store) {
+ runReadWriteTest(t, st, 3, []testStep{
+ testStep{Put, 1},
+ testStep{Put, 2},
+ testStep{Delete, 1},
+ testStep{Put, 1},
+ testStep{Put, 2},
+ })
+}
+
+// RunReadWriteRandomTest runs a random-generated test that verifies reads,
+// writes and snapshots.
+func RunReadWriteRandomTest(t *testing.T, st store.Store) {
+ rnd := rand.New(rand.NewSource(239017))
+ var testcase []testStep
+ dbSize := 50
+ for i := 0; i < 10000; i++ {
+ testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(dbSize)})
+ }
+ runReadWriteTest(t, st, dbSize, testcase)
+}
+
+// RunTransactionsWithGetTest tests transactions that use Put and Get
+// operations.
+// NOTE: consider setting GOMAXPROCS to something greater than 1.
+func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
+ // Invariant: value mapped to n stores sum of values of 0..n-1.
+ // Each of k transactions takes m distinct random values from 0..n-1,
+ // adds 1 to each and m to value mapped to n.
+ // The correctness of sums is checked after all transactions are
+ // commited.
+ n, m, k := 10, 3, 100
+ for i := 0; i <= n; i++ {
+ if err := st.Put(fmt.Sprintf("%05d", i), []byte{'0'}); err != nil {
+ t.Fatalf("can't write to database")
+ }
+ }
+ var wg sync.WaitGroup
+ wg.Add(k)
+ for i := 0; i < k; i++ {
+ go func() {
+ rnd := rand.New(rand.NewSource(239017 * int64(i)))
+ perm := rnd.Perm(n)
+ if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ for j := 0; j <= m; j++ {
+ var key string
+ if j < m {
+ key = fmt.Sprintf("%05d", perm[j])
+ } else {
+ key = fmt.Sprintf("%05d", n)
+ }
+ val, err := st.Get(key)
+ if err != nil {
+ return fmt.Errorf("can't get key %s: %v", key, err)
+ }
+ intValue, err := strconv.ParseInt(string(val), 10, 64)
+ if err != nil {
+ return fmt.Errorf("can't parse int from %s: %v", val, err)
+ }
+ var newValue int64
+ if j < m {
+ newValue = intValue + 1
+ } else {
+ newValue = intValue + int64(m)
+ }
+ if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
+ return fmt.Errorf("can't put {%#v: %#v}: %v", key, newValue, err)
+ }
+ }
+ return nil
+ }); err != nil {
+ panic(fmt.Errorf("can't commit transaction: %v", err))
+ }
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ var sum int64
+ for j := 0; j <= n; j++ {
+ key := fmt.Sprintf("%05d", j)
+ val, err := st.Get(key)
+ if err != nil {
+ t.Fatalf("can't get key %s: %v", key, err)
+ }
+ intValue, err := strconv.ParseInt(string(val), 10, 64)
+ if err != nil {
+ t.Fatalf("can't parse int from %s: %v", val, err)
+ }
+ if j < n {
+ sum += intValue
+ } else {
+ if intValue != int64(m*k) {
+ t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
+ }
+ }
+ }
+ if sum != int64(m*k) {
+ t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
+ }
+}