store: initial implementations of internal API on top of LevelDB

The initial implementations supports all operations defined in the
internal storage engine API. The biggest short-cut is having only one
active transaction at a time (others are blocked by a mutex).

This implementation doesn't use levigo layer at all as the signature
of most levigo methods is different from internal storage API.

Change-Id: I7c254a3c5528c92570c2cac097db5b3293af345a
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
new file mode 100644
index 0000000..bd251c3
--- /dev/null
+++ b/services/syncbase/store/leveldb/db.go
@@ -0,0 +1,128 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #cgo LDFLAGS: -lleveldb
+// #include <stdlib.h>
+// #include "leveldb/c.h"
+// #include "syncbase_leveldb.h"
+import "C"
+import (
+	"sync"
+	"unsafe"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// DB is a wrapper around LevelDB that implements the store.Store interface.
+// TODO(rogulenko): ensure thread safety.
+type DB struct {
+	cDb *C.leveldb_t
+	// Default read/write options.
+	readOptions  *C.leveldb_readoptions_t
+	writeOptions *C.leveldb_writeoptions_t
+	// Used to prevent concurrent transactions.
+	// TODO(rogulenko): improve concurrency.
+	mu sync.Mutex
+}
+
+var _ store.Store = (*DB)(nil)
+
+// Open opens the database located at the given path, creating it if it doesn't
+// exist.
+func Open(path string) (*DB, error) {
+	var cError *C.char
+	cPath := C.CString(path)
+	defer C.free(unsafe.Pointer(cPath))
+
+	cOpts := C.leveldb_options_create()
+	C.leveldb_options_set_create_if_missing(cOpts, 1)
+	C.leveldb_options_set_paranoid_checks(cOpts, 1)
+	defer C.leveldb_options_destroy(cOpts)
+
+	cDb := C.leveldb_open(cOpts, cPath, &cError)
+	if err := goError(cError); err != nil {
+		return nil, err
+	}
+	readOptions := C.leveldb_readoptions_create()
+	C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
+	return &DB{
+		cDb:          cDb,
+		readOptions:  readOptions,
+		writeOptions: C.leveldb_writeoptions_create(),
+	}, nil
+}
+
+// Close implements the store.Store interface.
+func (db *DB) Close() error {
+	C.leveldb_close(db.cDb)
+	C.leveldb_readoptions_destroy(db.readOptions)
+	C.leveldb_writeoptions_destroy(db.writeOptions)
+	return nil
+}
+
+// Destroy removes all physical data of the database located at the given path.
+func Destroy(path string) error {
+	var cError *C.char
+	cPath := C.CString(path)
+	defer C.free(unsafe.Pointer(cPath))
+	cOpts := C.leveldb_options_create()
+	defer C.leveldb_options_destroy(cOpts)
+	C.leveldb_destroy_db(cOpts, cPath, &cError)
+	return goError(cError)
+}
+
+// NewTransaction implements the store.Store interface.
+func (db *DB) NewTransaction() store.Transaction {
+	return newTransaction(db)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (db *DB) NewSnapshot() store.Snapshot {
+	return newSnapshot(db)
+}
+
+// Scan implements the store.StoreReader interface.
+func (db *DB) Scan(start, end string) (store.Stream, error) {
+	return newStream(db, start, end, db.readOptions), nil
+}
+
+// Get implements the store.StoreReader interface.
+func (db *DB) Get(key string) ([]byte, error) {
+	return db.getWithOpts(key, db.readOptions)
+}
+
+// Put implements the store.StoreWriter interface.
+func (db *DB) Put(key string, v []byte) error {
+	// TODO(rogulenko): improve performance.
+	return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
+		return st.Put(key, v)
+	})
+}
+
+// Delete implements the store.StoreWriter interface.
+func (db *DB) Delete(key string) error {
+	// TODO(rogulenko): improve performance.
+	return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
+		return st.Delete(key)
+	})
+}
+
+// getWithOpts returns the value for the given key.
+// cOpts may contain a pointer to a snapshot.
+func (db *DB) getWithOpts(key string, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+	var cError *C.char
+	var valLen C.size_t
+	cStr, cLen := cSlice(key)
+	val := C.leveldb_get(db.cDb, cOpts, cStr, cLen, &valLen, &cError)
+	if err := goError(cError); err != nil {
+		return nil, err
+	}
+	if val == nil {
+		return nil, &store.ErrUnknownKey{Key: key}
+	}
+	defer C.leveldb_free(unsafe.Pointer(val))
+	return C.GoBytes(unsafe.Pointer(val), C.int(valLen)), nil
+}
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
new file mode 100644
index 0000000..d9801cf
--- /dev/null
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -0,0 +1,55 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+import (
+	"fmt"
+	"io/ioutil"
+	"runtime"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store/test"
+)
+
+func init() {
+	runtime.GOMAXPROCS(10)
+}
+
+func TestReadWriteBasic(t *testing.T) {
+	db, dbPath := newDB()
+	defer destroyDB(db, dbPath)
+	test.RunReadWriteBasicTest(t, db)
+}
+
+func TestReadWriteRandom(t *testing.T) {
+	db, dbPath := newDB()
+	defer destroyDB(db, dbPath)
+	test.RunReadWriteRandomTest(t, db)
+}
+
+func TestTransactionsWithGet(t *testing.T) {
+	db, dbPath := newDB()
+	defer destroyDB(db, dbPath)
+	test.RunTransactionsWithGetTest(t, db)
+}
+
+func newDB() (*DB, string) {
+	dbPath, err := ioutil.TempDir("", "syncbase_leveldb")
+	if err != nil {
+		panic(fmt.Sprintf("can't create temp dir: %v", err))
+	}
+	db, err := Open(dbPath)
+	if err != nil {
+		panic(fmt.Sprintf("can't open db in %v: %v", dbPath, err))
+	}
+	return db, dbPath
+}
+
+func destroyDB(db *DB, dbPath string) {
+	db.Close()
+	if err := Destroy(dbPath); err != nil {
+		panic(fmt.Sprintf("can't destroy db located in %v: %v", dbPath, err))
+	}
+}
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
new file mode 100644
index 0000000..802e67c
--- /dev/null
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -0,0 +1,50 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #include "leveldb/c.h"
+import "C"
+import (
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// snapshot is a wrapper around LevelDB snapshot that implements
+// the store.Snapshot interface.
+type snapshot struct {
+	db        *DB
+	cSnapshot *C.leveldb_snapshot_t
+	cOpts     *C.leveldb_readoptions_t
+}
+
+var _ store.Snapshot = (*snapshot)(nil)
+
+func newSnapshot(db *DB) *snapshot {
+	cSnapshot := C.leveldb_create_snapshot(db.cDb)
+	cOpts := C.leveldb_readoptions_create()
+	C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
+	C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
+	return &snapshot{
+		db,
+		cSnapshot,
+		cOpts,
+	}
+}
+
+// Close implements the store.Snapshot interface.
+func (s *snapshot) Close() error {
+	C.leveldb_readoptions_destroy(s.cOpts)
+	C.leveldb_release_snapshot(s.db.cDb, s.cSnapshot)
+	return nil
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, end string) (store.Stream, error) {
+	return newStream(s.db, start, end, s.cOpts), nil
+}
+
+// Get implements the store.StoreReader interface.
+func (s *snapshot) Get(key string) ([]byte, error) {
+	return s.db.getWithOpts(key, s.cOpts)
+}
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
new file mode 100644
index 0000000..1f3baa7
--- /dev/null
+++ b/services/syncbase/store/leveldb/stream.go
@@ -0,0 +1,107 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #include "leveldb/c.h"
+// #include "syncbase_leveldb.h"
+import "C"
+import (
+	"errors"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/x/lib/vlog"
+)
+
+// stream is a wrapper around LevelDB iterator that implements
+// the store.Stream interface.
+// TODO(rogulenko): ensure thread safety.
+type stream struct {
+	cIter *C.syncbase_leveldb_iterator_t
+	end   string
+
+	advancedOnce bool
+	err          error
+}
+
+var _ store.Stream = (*stream)(nil)
+
+func newStream(db *DB, start, end string, cOpts *C.leveldb_readoptions_t) *stream {
+	cStr, size := cSlice(start)
+	cIter := C.syncbase_leveldb_create_iterator(db.cDb, cOpts, cStr, size)
+	return &stream{
+		cIter: cIter,
+		end:   end,
+	}
+}
+
+func (it *stream) destroyLeveldbIter() {
+	C.syncbase_leveldb_iter_destroy(it.cIter)
+	it.cIter = nil
+}
+
+// Advance implements the store.Stream interface.
+func (it *stream) Advance() bool {
+	if it.cIter == nil {
+		return false
+	}
+	// C iterator is already initialized after creation; we shouldn't move
+	// it during first Advance() call.
+	if !it.advancedOnce {
+		it.advancedOnce = true
+	} else {
+		C.syncbase_leveldb_iter_next(it.cIter)
+	}
+	if it.cIter.is_valid != 0 && it.end > it.key() {
+		return true
+	}
+
+	var cError *C.char
+	C.syncbase_leveldb_iter_get_error(it.cIter, &cError)
+	it.err = goError(cError)
+	it.destroyLeveldbIter()
+	return false
+}
+
+// Value implements the store.Stream interface.
+// The returned values point to buffers allocated on C heap.
+// The data is valid until the next call to Advance or Cancel.
+func (it *stream) Value() store.KeyValue {
+	if !it.advancedOnce {
+		vlog.Fatal("stream has never been advanced")
+	}
+	if it.cIter == nil {
+		vlog.Fatal("illegal state")
+	}
+	return store.KeyValue{
+		Key:   it.key(),
+		Value: it.val(),
+	}
+}
+
+// Err implements the store.Stream interface.
+func (it *stream) Err() error {
+	return it.err
+}
+
+// Cancel implements the store.Stream interface.
+func (it *stream) Cancel() {
+	if it.cIter == nil {
+		return
+	}
+	it.err = errors.New("cancelled")
+	it.destroyLeveldbIter()
+}
+
+// key returns the key. The key points to buffer allocated on C heap.
+// The data is valid until the next call to Advance or Cancel.
+func (it *stream) key() string {
+	return goString(it.cIter.key, it.cIter.key_len)
+}
+
+// val returns the value. The value points to buffer allocated on C heap.
+// The data is valid until the next call to Advance or Cancel.
+func (it *stream) val() []byte {
+	return goBytes(it.cIter.val, it.cIter.val_len)
+}
diff --git a/services/syncbase/store/leveldb/syncbase_leveldb.cc b/services/syncbase/store/leveldb/syncbase_leveldb.cc
new file mode 100644
index 0000000..8c6f7e6
--- /dev/null
+++ b/services/syncbase/store/leveldb/syncbase_leveldb.cc
@@ -0,0 +1,47 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file is intended to be C++ so that we can access C++ LevelDB interface
+// directly if necessary.
+
+#include "syncbase_leveldb.h"
+
+extern "C" {
+
+static void PopulateIteratorFields(syncbase_leveldb_iterator_t* iter) {
+  iter->is_valid = leveldb_iter_valid(iter->rep);
+  if (!iter->is_valid) {
+    return;
+  }
+  iter->key = leveldb_iter_key(iter->rep, &iter->key_len);
+  iter->val = leveldb_iter_value(iter->rep, &iter->val_len);
+}
+
+syncbase_leveldb_iterator_t* syncbase_leveldb_create_iterator(
+    leveldb_t* db,
+    const leveldb_readoptions_t* options,
+    const char* start, size_t start_len) {
+  syncbase_leveldb_iterator_t* result = new syncbase_leveldb_iterator_t;
+  result->rep = leveldb_create_iterator(db, options);
+  leveldb_iter_seek(result->rep, start, start_len);
+  PopulateIteratorFields(result);
+  return result;
+}
+
+void syncbase_leveldb_iter_destroy(syncbase_leveldb_iterator_t* iter) {
+  leveldb_iter_destroy(iter->rep);
+  delete iter;
+}
+
+void syncbase_leveldb_iter_next(syncbase_leveldb_iterator_t* iter) {
+  leveldb_iter_next(iter->rep);
+  PopulateIteratorFields(iter);
+}
+
+void syncbase_leveldb_iter_get_error(
+    const syncbase_leveldb_iterator_t* iter, char** errptr) {
+  leveldb_iter_get_error(iter->rep, errptr);
+}
+
+}  // end extern "C"
diff --git a/services/syncbase/store/leveldb/syncbase_leveldb.h b/services/syncbase/store/leveldb/syncbase_leveldb.h
new file mode 100644
index 0000000..d2faa82
--- /dev/null
+++ b/services/syncbase/store/leveldb/syncbase_leveldb.h
@@ -0,0 +1,61 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+//
+// This file contains helpers to minimize the number of cgo calls, which have
+// some overhead.
+// Some conventions:
+//
+// Errors are represented by a null-terminated C string. NULL means no error.
+// All operations that can raise an error are passed a "char** errptr" as the
+// last argument. *errptr should be NULL.
+// On failure, leveldb sets *errptr to a malloc()ed error message.
+//
+// All of the pointer arguments must be non-NULL.
+
+#ifndef V_IO_SYNCBASE_X_REF_SERVICES_SYNCBASE_STORE_LEVELDB_SYNCBASE_LEVELDB_H_
+#define V_IO_SYNCBASE_X_REF_SERVICES_SYNCBASE_STORE_LEVELDB_SYNCBASE_LEVELDB_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "leveldb/c.h"
+
+// Fields of this struct are accessed from go directly without cgo calls.
+struct syncbase_leveldb_iterator_t {
+  leveldb_iterator_t* rep;
+  unsigned char is_valid;
+  char const* key;
+  size_t key_len;
+  char const* val;
+  size_t val_len;
+};
+
+typedef struct syncbase_leveldb_iterator_t syncbase_leveldb_iterator_t;
+
+// Returns iterator that points to first key that is not less than |start|.
+// The returned iterator must be passed to |syncbase_leveldb_iter_destroy|
+// when finished.
+syncbase_leveldb_iterator_t* syncbase_leveldb_create_iterator(
+    leveldb_t* db,
+    const leveldb_readoptions_t* options,
+    const char* start, size_t start_len);
+
+// Deallocates iterator returned by |syncbase_leveldb_create_iterator|.
+void syncbase_leveldb_iter_destroy(syncbase_leveldb_iterator_t*);
+
+// Moves to the next entry in the source. After this call, |is_valid| is
+// true iff the iterator was not positioned at the last entry in the source.
+// REQUIRES: |is_valid| is true.
+void syncbase_leveldb_iter_next(syncbase_leveldb_iterator_t* iter);
+
+// Returns a non-nil error iff the iterator encountered any errors.
+void syncbase_leveldb_iter_get_error(
+    const syncbase_leveldb_iterator_t* iter, char** errptr);
+
+#ifdef __cplusplus
+}  // end extern "C"
+#endif
+
+#endif  // V_IO_SYNCBASE_X_REF_SERVICES_SYNCBASE_STORE_LEVELDB_SYNCBASE_LEVELDB_H_
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
new file mode 100644
index 0000000..3a9f4b2
--- /dev/null
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -0,0 +1,94 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #include "leveldb/c.h"
+import "C"
+import (
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// transaction is a wrapper around LevelDB WriteBatch that implements
+// the store.Transaction interface.
+// TODO(rogulenko): handle incorrect usage.
+type transaction struct {
+	db       *DB
+	snapshot store.Snapshot
+	batch    *C.leveldb_writebatch_t
+	cOpts    *C.leveldb_writeoptions_t
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(db *DB) *transaction {
+	// The lock is held until the transaction is successfully
+	// committed or aborted.
+	db.mu.Lock()
+	return &transaction{
+		db,
+		db.NewSnapshot(),
+		C.leveldb_writebatch_create(),
+		db.writeOptions,
+	}
+}
+
+// close frees allocated C objects and releases acquired locks.
+func (tx *transaction) close() {
+	tx.db.mu.Unlock()
+	tx.snapshot.Close()
+	C.leveldb_writebatch_destroy(tx.batch)
+	if tx.cOpts != tx.db.writeOptions {
+		C.leveldb_writeoptions_destroy(tx.cOpts)
+	}
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+	tx.close()
+	return nil
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+	var cError *C.char
+	C.leveldb_write(tx.db.cDb, tx.cOpts, tx.batch, &cError)
+	if err := goError(cError); err != nil {
+		return err
+	}
+	tx.close()
+	return nil
+}
+
+// ResetForRetry implements the store.Transaction interface.
+func (tx *transaction) ResetForRetry() {
+	tx.snapshot.Close()
+	tx.snapshot = tx.db.NewSnapshot()
+	C.leveldb_writebatch_clear(tx.batch)
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, end string) (store.Stream, error) {
+	return tx.snapshot.Scan(start, end)
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key string) ([]byte, error) {
+	return tx.snapshot.Get(key)
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key string, v []byte) error {
+	cKey, cKeyLen := cSlice(key)
+	cVal, cValLen := cSliceFromBytes(v)
+	C.leveldb_writebatch_put(tx.batch, cKey, cKeyLen, cVal, cValLen)
+	return nil
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key string) error {
+	cKey, cKeyLen := cSlice(key)
+	C.leveldb_writebatch_delete(tx.batch, cKey, cKeyLen)
+	return nil
+}
diff --git a/services/syncbase/store/leveldb/util.go b/services/syncbase/store/leveldb/util.go
new file mode 100644
index 0000000..d7b9717
--- /dev/null
+++ b/services/syncbase/store/leveldb/util.go
@@ -0,0 +1,66 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+// #include "leveldb/c.h"
+import "C"
+import (
+	"errors"
+	"reflect"
+	"unsafe"
+)
+
+// goError copies C error into Go heap and frees C buffer.
+func goError(cError *C.char) error {
+	if cError == nil {
+		return nil
+	}
+	err := errors.New(C.GoString(cError))
+	C.leveldb_free(unsafe.Pointer(cError))
+	return err
+}
+
+// cSlice converts Go string to C string without copying the data.
+//
+// This function behaves similarly to standard Go slice copying or sub-slicing;
+// the caller need not worry about ownership or garbage collection.
+func cSlice(str string) (*C.char, C.size_t) {
+	if len(str) == 0 {
+		return nil, 0
+	}
+	data := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&str)).Data)
+	return (*C.char)(data), C.size_t(len(str))
+}
+
+// cSliceFromBytes converts Go []byte to C string without copying the data.
+// This function behaves similarly to cSlice.
+func cSliceFromBytes(bytes []byte) (*C.char, C.size_t) {
+	if len(bytes) == 0 {
+		return nil, 0
+	}
+	data := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&bytes)).Data)
+	return (*C.char)(data), C.size_t(len(bytes))
+}
+
+// goString converts C string to Go string without copying the data.
+// This function behaves similarly to cSlice.
+func goString(str *C.char, size C.size_t) string {
+	ptr := unsafe.Pointer(&reflect.StringHeader{
+		Data: uintptr(unsafe.Pointer(str)),
+		Len:  int(size),
+	})
+	return *(*string)(ptr)
+}
+
+// goBytes converts C string to Go []byte without copying the data.
+// This function behaves similarly to cSlice.
+func goBytes(str *C.char, size C.size_t) []byte {
+	ptr := unsafe.Pointer(&reflect.SliceHeader{
+		Data: uintptr(unsafe.Pointer(str)),
+		Len:  int(size),
+		Cap:  int(size),
+	})
+	return *(*[]byte)(ptr)
+}
diff --git a/services/syncbase/store/memstore/memstore.go b/services/syncbase/store/memstore/memstore.go
index a97370e..ebd9b2a 100644
--- a/services/syncbase/store/memstore/memstore.go
+++ b/services/syncbase/store/memstore/memstore.go
@@ -201,3 +201,7 @@
 	vlog.Fatal("not implemented")
 	return nil
 }
+
+func (st *memstore) Close() error {
+	return nil
+}
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index 166079f..5fda7e2 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -43,6 +43,9 @@
 	// NewSnapshot creates a snapshot.
 	// TODO(rogulenko): add snapshot options.
 	NewSnapshot() Snapshot
+
+	// Close closes the store.
+	Close() error
 }
 
 // Transaction provides a mechanism for atomic reads and writes.
@@ -97,10 +100,12 @@
 
 	// Value returns the element that was staged by Advance. Value may panic if
 	// Advance returned false or was not called at all. Value does not block.
+	// The data is valid until the next call to Advance or Cancel.
 	Value() KeyValue
 
 	// Err returns a non-nil error iff the stream encountered any errors. Err does
 	// not block.
+	// TODO(rogulenko): define an error type.
 	Err() error
 
 	// Cancel notifies the stream provider that it can stop producing elements.
diff --git a/services/syncbase/store/test/test.go b/services/syncbase/store/test/test.go
new file mode 100644
index 0000000..fc80ee9
--- /dev/null
+++ b/services/syncbase/store/test/test.go
@@ -0,0 +1,270 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// TODO(rogulenko): add more tests.
+
+package test
+
+import (
+	"bytes"
+	"fmt"
+	"math/rand"
+	"reflect"
+	"strconv"
+	"sync"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type operation int
+
+const (
+	Put    operation = 0
+	Delete operation = 1
+)
+
+type testStep struct {
+	op  operation
+	key int
+}
+
+func randomBytes(rnd *rand.Rand, length int) []byte {
+	var res []byte
+	for i := 0; i < length; i++ {
+		res = append(res, '0'+byte(rnd.Intn(10)))
+	}
+	return res
+}
+
+// dbState is the in-memory representation of the database state.
+type dbState struct {
+	// We assume that the database has keys from range [0..dbSize).
+	dbSize   int
+	rnd      *rand.Rand
+	memtable map[string][]byte
+}
+
+func newDBState(dbSize int) *dbState {
+	s := &dbState{
+		dbSize,
+		rand.New(rand.NewSource(239017)),
+		make(map[string][]byte),
+	}
+	return s
+}
+
+func (s *dbState) clone() *dbState {
+	other := &dbState{
+		s.dbSize,
+		s.rnd,
+		make(map[string][]byte),
+	}
+	for k, v := range s.memtable {
+		other.memtable[k] = v
+	}
+	return other
+}
+
+// nextKey returns minimal key in the database that is not less than the
+// provided key. In case of no such key, returns dbSize.
+func (s *dbState) lowerBound(key int) int {
+	for key < s.dbSize {
+		if _, ok := s.memtable[fmt.Sprintf("%05d", key)]; ok {
+			return key
+		}
+		key++
+	}
+	return key
+}
+
+// verify ensures that various read operation on store.Store and memtable
+// return the same result.
+func (s *dbState) verify(t *testing.T, st store.StoreReader) {
+	// Verify Get().
+	for i := 0; i < s.dbSize; i++ {
+		key := fmt.Sprintf("%05d", i)
+		answer, ok := s.memtable[key]
+		value, err := st.Get(key)
+		if ok {
+			if err != nil || !bytes.Equal(value, answer) {
+				t.Fatalf("unexpected get result for %v: got {%#v: %#v}, want {%#v: nil}", key, value, err, answer)
+			}
+		} else {
+			if !reflect.DeepEqual(&store.ErrUnknownKey{Key: key}, err) {
+				t.Fatalf("unexpected get error for key %v: %v", key, err)
+			}
+		}
+	}
+	// Verify 10 random Scan() calls.
+	for i := 0; i < 10; i++ {
+		start, end := s.rnd.Intn(s.dbSize), s.rnd.Intn(s.dbSize)
+		if start > end {
+			start, end = end, start
+		}
+		end++
+		stream, err := st.Scan(fmt.Sprintf("%05d", start), fmt.Sprintf("%05d", end))
+		if err != nil {
+			t.Fatalf("can't create stream: %v", err)
+		}
+		for stream.Advance() {
+			start = s.lowerBound(start)
+			key := fmt.Sprintf("%05d", start)
+			kv := stream.Value()
+			if kv.Key != key {
+				t.Fatalf("unexpected key during scan: got %s, want %s", kv.Key, key)
+			}
+			if !bytes.Equal(kv.Value, s.memtable[key]) {
+				t.Fatalf("unexpected value during scan: got %s, want %s", kv.Value, s.memtable[key])
+			}
+			start++
+		}
+		if start = s.lowerBound(start); start < end {
+			t.Fatalf("stream ended unexpectedly")
+		}
+	}
+}
+
+// runReadWriteTest verifies read/write/snapshot operations.
+func runReadWriteTest(t *testing.T, st store.Store, dbSize int, steps []testStep) {
+	s := newDBState(dbSize)
+	// We verify database state not more than ~100 times to prevent the test
+	// from being slow.
+	frequency := (len(steps) + 99) / 100
+	var states []*dbState
+	var snapshots []store.Snapshot
+	for i, step := range steps {
+		if step.key < 0 || step.key >= s.dbSize {
+			t.Fatalf("invalid test step %v", step)
+		}
+		switch step.op {
+		case Put:
+			key := fmt.Sprintf("%05d", step.key)
+			value := randomBytes(s.rnd, 100)
+			s.memtable[key] = value
+			st.Put(key, value)
+		case Delete:
+			key := fmt.Sprintf("%05d", step.key)
+			if _, ok := s.memtable[key]; ok {
+				delete(s.memtable, key)
+				st.Delete(key)
+			}
+		default:
+			t.Fatalf("invalid test step %v", step)
+		}
+		if i%frequency == 0 {
+			s.verify(t, st)
+			states = append(states, s.clone())
+			snapshots = append(snapshots, st.NewSnapshot())
+		}
+	}
+	s.verify(t, st)
+	for i := 0; i < len(states); i++ {
+		states[i].verify(t, snapshots[i])
+		snapshots[i].Close()
+	}
+}
+
+// RunReadWriteBasicTest runs a basic test that verifies reads, writes and
+// snapshots.
+func RunReadWriteBasicTest(t *testing.T, st store.Store) {
+	runReadWriteTest(t, st, 3, []testStep{
+		testStep{Put, 1},
+		testStep{Put, 2},
+		testStep{Delete, 1},
+		testStep{Put, 1},
+		testStep{Put, 2},
+	})
+}
+
+// RunReadWriteRandomTest runs a random-generated test that verifies reads,
+// writes and snapshots.
+func RunReadWriteRandomTest(t *testing.T, st store.Store) {
+	rnd := rand.New(rand.NewSource(239017))
+	var testcase []testStep
+	dbSize := 50
+	for i := 0; i < 10000; i++ {
+		testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(dbSize)})
+	}
+	runReadWriteTest(t, st, dbSize, testcase)
+}
+
+// RunTransactionsWithGetTest tests transactions that use Put and Get
+// operations.
+// NOTE: consider setting GOMAXPROCS to something greater than 1.
+func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
+	// Invariant: value mapped to n stores sum of values of 0..n-1.
+	// Each of k transactions takes m distinct random values from 0..n-1,
+	// adds 1 to each and m to value mapped to n.
+	// The correctness of sums is checked after all transactions are
+	// commited.
+	n, m, k := 10, 3, 100
+	for i := 0; i <= n; i++ {
+		if err := st.Put(fmt.Sprintf("%05d", i), []byte{'0'}); err != nil {
+			t.Fatalf("can't write to database")
+		}
+	}
+	var wg sync.WaitGroup
+	wg.Add(k)
+	for i := 0; i < k; i++ {
+		go func() {
+			rnd := rand.New(rand.NewSource(239017 * int64(i)))
+			perm := rnd.Perm(n)
+			if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+				for j := 0; j <= m; j++ {
+					var key string
+					if j < m {
+						key = fmt.Sprintf("%05d", perm[j])
+					} else {
+						key = fmt.Sprintf("%05d", n)
+					}
+					val, err := st.Get(key)
+					if err != nil {
+						return fmt.Errorf("can't get key %s: %v", key, err)
+					}
+					intValue, err := strconv.ParseInt(string(val), 10, 64)
+					if err != nil {
+						return fmt.Errorf("can't parse int from %s: %v", val, err)
+					}
+					var newValue int64
+					if j < m {
+						newValue = intValue + 1
+					} else {
+						newValue = intValue + int64(m)
+					}
+					if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
+						return fmt.Errorf("can't put {%#v: %#v}: %v", key, newValue, err)
+					}
+				}
+				return nil
+			}); err != nil {
+				panic(fmt.Errorf("can't commit transaction: %v", err))
+			}
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+	var sum int64
+	for j := 0; j <= n; j++ {
+		key := fmt.Sprintf("%05d", j)
+		val, err := st.Get(key)
+		if err != nil {
+			t.Fatalf("can't get key %s: %v", key, err)
+		}
+		intValue, err := strconv.ParseInt(string(val), 10, 64)
+		if err != nil {
+			t.Fatalf("can't parse int from %s: %v", val, err)
+		}
+		if j < n {
+			sum += intValue
+		} else {
+			if intValue != int64(m*k) {
+				t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
+			}
+		}
+	}
+	if sum != int64(m*k) {
+		t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
+	}
+}