syncbase.store: memstore stream and snapshot
Change-Id: I10bd79f30838e6a0ab04dec2dc455f419f083b6f
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index 57556c1..b15374d 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+// Package leveldb provides a LevelDB-based implementation of store.Store.
package leveldb
// #cgo LDFLAGS: -lleveldb
@@ -16,9 +17,9 @@
"v.io/syncbase/x/ref/services/syncbase/store"
)
-// DB is a wrapper around LevelDB that implements the store.Store interface.
+// db is a wrapper around LevelDB that implements the store.Store interface.
// TODO(rogulenko): ensure thread safety.
-type DB struct {
+type db struct {
cDb *C.leveldb_t
// Default read/write options.
readOptions *C.leveldb_readoptions_t
@@ -28,11 +29,11 @@
mu sync.Mutex
}
-var _ store.Store = (*DB)(nil)
+var _ store.Store = (*db)(nil)
// Open opens the database located at the given path, creating it if it doesn't
// exist.
-func Open(path string) (*DB, error) {
+func Open(path string) (store.Store, error) {
var cError *C.char
cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
@@ -48,7 +49,7 @@
}
readOptions := C.leveldb_readoptions_create()
C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
- return &DB{
+ return &db{
cDb: cDb,
readOptions: readOptions,
writeOptions: C.leveldb_writeoptions_create(),
@@ -56,10 +57,10 @@
}
// Close implements the store.Store interface.
-func (db *DB) Close() error {
- C.leveldb_close(db.cDb)
- C.leveldb_readoptions_destroy(db.readOptions)
- C.leveldb_writeoptions_destroy(db.writeOptions)
+func (d *db) Close() error {
+ C.leveldb_close(d.cDb)
+ C.leveldb_readoptions_destroy(d.readOptions)
+ C.leveldb_writeoptions_destroy(d.writeOptions)
return nil
}
@@ -74,49 +75,49 @@
return goError(cError)
}
-// NewTransaction implements the store.Store interface.
-func (db *DB) NewTransaction() store.Transaction {
- return newTransaction(db)
-}
-
-// NewSnapshot implements the store.Store interface.
-func (db *DB) NewSnapshot() store.Snapshot {
- return newSnapshot(db)
+// Get implements the store.StoreReader interface.
+func (d *db) Get(key, valbuf []byte) ([]byte, error) {
+ return d.getWithOpts(key, valbuf, d.readOptions)
}
// Scan implements the store.StoreReader interface.
-func (db *DB) Scan(start, end []byte) (store.Stream, error) {
- return newStream(db, start, end, db.readOptions), nil
-}
-
-// Get implements the store.StoreReader interface.
-func (db *DB) Get(key, valbuf []byte) ([]byte, error) {
- return db.getWithOpts(key, valbuf, db.readOptions)
+func (d *db) Scan(start, end []byte) (store.Stream, error) {
+ return newStream(d, start, end, d.readOptions), nil
}
// Put implements the store.StoreWriter interface.
-func (db *DB) Put(key, value []byte) error {
+func (d *db) Put(key, value []byte) error {
// TODO(rogulenko): improve performance.
- return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(d, func(st store.StoreReadWriter) error {
return st.Put(key, value)
})
}
// Delete implements the store.StoreWriter interface.
-func (db *DB) Delete(key []byte) error {
+func (d *db) Delete(key []byte) error {
// TODO(rogulenko): improve performance.
- return store.RunInTransaction(db, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(d, func(st store.StoreReadWriter) error {
return st.Delete(key)
})
}
+// NewTransaction implements the store.Store interface.
+func (d *db) NewTransaction() store.Transaction {
+ return newTransaction(d)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (d *db) NewSnapshot() store.Snapshot {
+ return newSnapshot(d)
+}
+
// getWithOpts returns the value for the given key.
// cOpts may contain a pointer to a snapshot.
-func (db *DB) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
var cError *C.char
var valLen C.size_t
cStr, cLen := cSlice(key)
- val := C.leveldb_get(db.cDb, cOpts, cStr, cLen, &valLen, &cError)
+ val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError)
if err := goError(cError); err != nil {
return nil, err
}
@@ -124,5 +125,5 @@
return nil, &store.ErrUnknownKey{Key: string(key)}
}
defer C.leveldb_free(unsafe.Pointer(val))
- return copyAll(valbuf, goBytes(val, valLen)), nil
+ return store.CopyBytes(valbuf, goBytes(val, valLen)), nil
}
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
index d9801cf..c076a4e 100644
--- a/services/syncbase/store/leveldb/db_test.go
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -10,6 +10,7 @@
"runtime"
"testing"
+ "v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/test"
)
@@ -18,38 +19,38 @@
}
func TestReadWriteBasic(t *testing.T) {
- db, dbPath := newDB()
- defer destroyDB(db, dbPath)
- test.RunReadWriteBasicTest(t, db)
+ st, path := newDB()
+ defer destroyDB(st, path)
+ test.RunReadWriteBasicTest(t, st)
}
func TestReadWriteRandom(t *testing.T) {
- db, dbPath := newDB()
- defer destroyDB(db, dbPath)
- test.RunReadWriteRandomTest(t, db)
+ st, path := newDB()
+ defer destroyDB(st, path)
+ test.RunReadWriteRandomTest(t, st)
}
func TestTransactionsWithGet(t *testing.T) {
- db, dbPath := newDB()
- defer destroyDB(db, dbPath)
- test.RunTransactionsWithGetTest(t, db)
+ st, path := newDB()
+ defer destroyDB(st, path)
+ test.RunTransactionsWithGetTest(t, st)
}
-func newDB() (*DB, string) {
- dbPath, err := ioutil.TempDir("", "syncbase_leveldb")
+func newDB() (store.Store, string) {
+ path, err := ioutil.TempDir("", "syncbase_leveldb")
if err != nil {
panic(fmt.Sprintf("can't create temp dir: %v", err))
}
- db, err := Open(dbPath)
+ st, err := Open(path)
if err != nil {
- panic(fmt.Sprintf("can't open db in %v: %v", dbPath, err))
+ panic(fmt.Sprintf("can't open db at %v: %v", path, err))
}
- return db, dbPath
+ return st, path
}
-func destroyDB(db *DB, dbPath string) {
- db.Close()
- if err := Destroy(dbPath); err != nil {
- panic(fmt.Sprintf("can't destroy db located in %v: %v", dbPath, err))
+func destroyDB(st store.Store, path string) {
+ st.Close()
+ if err := Destroy(path); err != nil {
+ panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
}
}
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index eda5802..afa9f25 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -13,20 +13,20 @@
// snapshot is a wrapper around LevelDB snapshot that implements
// the store.Snapshot interface.
type snapshot struct {
- db *DB
+ d *db
cSnapshot *C.leveldb_snapshot_t
cOpts *C.leveldb_readoptions_t
}
var _ store.Snapshot = (*snapshot)(nil)
-func newSnapshot(db *DB) *snapshot {
- cSnapshot := C.leveldb_create_snapshot(db.cDb)
+func newSnapshot(d *db) *snapshot {
+ cSnapshot := C.leveldb_create_snapshot(d.cDb)
cOpts := C.leveldb_readoptions_create()
C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
return &snapshot{
- db,
+ d,
cSnapshot,
cOpts,
}
@@ -35,16 +35,16 @@
// Close implements the store.Snapshot interface.
func (s *snapshot) Close() error {
C.leveldb_readoptions_destroy(s.cOpts)
- C.leveldb_release_snapshot(s.db.cDb, s.cSnapshot)
+ C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
return nil
}
-// Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
- return newStream(s.db, start, end, s.cOpts), nil
-}
-
// Get implements the store.StoreReader interface.
func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
- return s.db.getWithOpts(key, valbuf, s.cOpts)
+ return s.d.getWithOpts(key, valbuf, s.cOpts)
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
+ return newStream(s.d, start, end, s.cOpts), nil
}
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index f1bddf7..c19bfa5 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -12,7 +12,6 @@
"errors"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/x/lib/vlog"
)
// stream is a wrapper around LevelDB iterator that implements
@@ -22,15 +21,15 @@
cIter *C.syncbase_leveldb_iterator_t
end []byte
- advancedOnce bool
- err error
+ hasAdvanced bool
+ err error
}
var _ store.Stream = (*stream)(nil)
-func newStream(db *DB, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
+func newStream(d *db, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
cStr, size := cSlice(start)
- cIter := C.syncbase_leveldb_create_iterator(db.cDb, cOpts, cStr, size)
+ cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
return &stream{
cIter: cIter,
end: end,
@@ -47,10 +46,10 @@
if it.cIter == nil {
return false
}
- // C iterator is already initialized after creation; we shouldn't move
- // it during first Advance() call.
- if !it.advancedOnce {
- it.advancedOnce = true
+ // The C iterator starts out initialized, pointing at the first value; we
+ // shouldn't move it during the first Advance() call.
+ if !it.hasAdvanced {
+ it.hasAdvanced = true
} else {
C.syncbase_leveldb_iter_next(it.cIter)
}
@@ -67,24 +66,24 @@
// Key implements the store.Stream interface.
func (it *stream) Key(keybuf []byte) []byte {
- if !it.advancedOnce {
- vlog.Fatal("stream has never been advanced")
+ if !it.hasAdvanced {
+ panic("stream has never been advanced")
}
if it.cIter == nil {
- vlog.Fatal("illegal state")
+ panic("illegal state")
}
- return copyAll(keybuf, it.cKey())
+ return store.CopyBytes(keybuf, it.cKey())
}
// Value implements the store.Stream interface.
func (it *stream) Value(valbuf []byte) []byte {
- if !it.advancedOnce {
- vlog.Fatal("stream has never been advanced")
+ if !it.hasAdvanced {
+ panic("stream has never been advanced")
}
if it.cIter == nil {
- vlog.Fatal("illegal state")
+ panic("illegal state")
}
- return copyAll(valbuf, it.cVal())
+ return store.CopyBytes(valbuf, it.cVal())
}
// Err implements the store.Stream interface.
@@ -101,14 +100,16 @@
it.destroyLeveldbIter()
}
-// cKey returns the key. The key points to buffer allocated on C heap.
-// The data is valid until the next call to Advance or Cancel.
+// cKey returns the current key.
+// The returned []byte points to a buffer allocated on the C heap. This buffer
+// is valid until the next call to Advance or Cancel.
func (it *stream) cKey() []byte {
return goBytes(it.cIter.key, it.cIter.key_len)
}
-// cVal returns the value. The value points to buffer allocated on C heap.
-// The data is valid until the next call to Advance or Cancel.
+// cVal returns the current value.
+// The returned []byte points to a buffer allocated on the C heap. This buffer
+// is valid until the next call to Advance or Cancel.
func (it *stream) cVal() []byte {
return goBytes(it.cIter.val, it.cIter.val_len)
}
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index 6aa079d..d358f51 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -14,7 +14,7 @@
// the store.Transaction interface.
// TODO(rogulenko): handle incorrect usage.
type transaction struct {
- db *DB
+ d *db
snapshot store.Snapshot
batch *C.leveldb_writebatch_t
cOpts *C.leveldb_writeoptions_t
@@ -22,62 +22,45 @@
var _ store.Transaction = (*transaction)(nil)
-func newTransaction(db *DB) *transaction {
+func newTransaction(d *db) *transaction {
// The lock is held until the transaction is successfully
// committed or aborted.
- db.mu.Lock()
+ d.mu.Lock()
return &transaction{
- db,
- db.NewSnapshot(),
+ d,
+ d.NewSnapshot(),
C.leveldb_writebatch_create(),
- db.writeOptions,
+ d.writeOptions,
}
}
// close frees allocated C objects and releases acquired locks.
func (tx *transaction) close() {
- tx.db.mu.Unlock()
+ tx.d.mu.Unlock()
tx.snapshot.Close()
C.leveldb_writebatch_destroy(tx.batch)
- if tx.cOpts != tx.db.writeOptions {
+ if tx.cOpts != tx.d.writeOptions {
C.leveldb_writeoptions_destroy(tx.cOpts)
}
}
-// Abort implements the store.Transaction interface.
-func (tx *transaction) Abort() error {
- tx.close()
- return nil
-}
-
-// Commit implements the store.Transaction interface.
-func (tx *transaction) Commit() error {
- var cError *C.char
- C.leveldb_write(tx.db.cDb, tx.cOpts, tx.batch, &cError)
- if err := goError(cError); err != nil {
- return err
- }
- tx.close()
- return nil
-}
-
// ResetForRetry implements the store.Transaction interface.
func (tx *transaction) ResetForRetry() {
tx.snapshot.Close()
- tx.snapshot = tx.db.NewSnapshot()
+ tx.snapshot = tx.d.NewSnapshot()
C.leveldb_writebatch_clear(tx.batch)
}
-// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
- return tx.snapshot.Scan(start, end)
-}
-
// Get implements the store.StoreReader interface.
func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
return tx.snapshot.Get(key, valbuf)
}
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+ return tx.snapshot.Scan(start, end)
+}
+
// Put implements the store.StoreWriter interface.
func (tx *transaction) Put(key, value []byte) error {
cKey, cKeyLen := cSlice(key)
@@ -92,3 +75,20 @@
C.leveldb_writebatch_delete(tx.batch, cKey, cKeyLen)
return nil
}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+ var cError *C.char
+ C.leveldb_write(tx.d.cDb, tx.cOpts, tx.batch, &cError)
+ if err := goError(cError); err != nil {
+ return err
+ }
+ tx.close()
+ return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+ tx.close()
+ return nil
+}
diff --git a/services/syncbase/store/leveldb/util.go b/services/syncbase/store/leveldb/util.go
index 333743f..af08336 100644
--- a/services/syncbase/store/leveldb/util.go
+++ b/services/syncbase/store/leveldb/util.go
@@ -23,9 +23,8 @@
}
// cSlice converts Go []byte to C string without copying the data.
-//
-// This function behaves similarly to standard Go slice copying or sub-slicing;
-// the caller need not worry about ownership or garbage collection.
+// This function behaves similarly to standard Go slice copying or sub-slicing,
+// in that the caller need not worry about ownership or garbage collection.
func cSlice(str []byte) (*C.char, C.size_t) {
if len(str) == 0 {
return nil, 0
@@ -44,18 +43,3 @@
})
return *(*[]byte)(ptr)
}
-
-// copyAll copies elements from a source slice into a destination slice.
-// The returned slice may be a sub-slice of dst if dst was large enough to hold
-// src. Otherwise, a newly allocated slice will be returned.
-func copyAll(dst, src []byte) []byte {
- if cap(dst) < len(src) {
- newlen := cap(dst)*2 + 2
- if newlen < len(src) {
- newlen = len(src)
- }
- dst = make([]byte, newlen)
- }
- copy(dst, src)
- return dst[:len(src)]
-}
diff --git a/services/syncbase/store/memstore/memstore.go b/services/syncbase/store/memstore/memstore.go
deleted file mode 100644
index 99c551e..0000000
--- a/services/syncbase/store/memstore/memstore.go
+++ /dev/null
@@ -1,207 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package memstore provides a simple, in-memory implementation of
-// store.TransactableStore. Since it's a prototype implementation, it makes no
-// attempt to be performant.
-package memstore
-
-import (
- "errors"
- "sync"
- "time"
-
- "v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/x/lib/vlog"
-)
-
-var (
- txnTimeout = time.Duration(5) * time.Second
- errExpiredTxn = errors.New("expired transaction")
- errCommittedTxn = errors.New("committed transaction")
- errAbortedTxn = errors.New("aborted transaction")
- errConcurrentTxn = errors.New("concurrent transaction")
-)
-
-type transaction struct {
- st *memstore
- // The following fields are used to determine whether method calls should
- // error out.
- err error
- seq uint64
- createdTime time.Time
- // The following fields track writes performed against this transaction.
- puts map[string][]byte
- deletes map[string]struct{}
-}
-
-var _ store.Transaction = (*transaction)(nil)
-
-type memstore struct {
- mu sync.Mutex
- data map[string][]byte
- // Most recent sequence number handed out.
- lastSeq uint64
- // Value of lastSeq at the time of the most recent commit.
- lastCommitSeq uint64
-}
-
-var _ store.Store = (*memstore)(nil)
-
-func New() store.Store {
- return &memstore{data: map[string][]byte{}}
-}
-
-////////////////////////////////////////
-// transaction methods
-
-func newTxn(st *memstore, seq uint64) *transaction {
- return &transaction{
- st: st,
- seq: seq,
- createdTime: time.Now(),
- puts: map[string][]byte{},
- deletes: map[string]struct{}{},
- }
-}
-
-func (tx *transaction) expired() bool {
- return time.Now().After(tx.createdTime.Add(txnTimeout))
-}
-
-func (tx *transaction) checkError() error {
- if tx.err != nil {
- return tx.err
- }
- if tx.expired() {
- return errExpiredTxn
- }
- if tx.seq <= tx.st.lastCommitSeq {
- return errConcurrentTxn
- }
- return nil
-}
-
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
- vlog.Fatal("not implemented")
- return nil, nil
-}
-
-func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- if err := tx.checkError(); err != nil {
- return nil, err
- }
- v, ok := tx.st.data[string(key)]
- if !ok {
- return nil, &store.ErrUnknownKey{Key: string(key)}
- }
- return v, nil
-}
-
-func (tx *transaction) Put(key, value []byte) error {
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- if err := tx.checkError(); err != nil {
- return err
- }
- delete(tx.deletes, string(key))
- tx.puts[string(key)] = value
- return nil
-}
-
-func (tx *transaction) Delete(key []byte) error {
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- if err := tx.checkError(); err != nil {
- return err
- }
- delete(tx.puts, string(key))
- tx.deletes[string(key)] = struct{}{}
- return nil
-}
-
-func (tx *transaction) Commit() error {
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- if err := tx.checkError(); err != nil {
- return err
- }
- tx.err = errCommittedTxn
- for k, v := range tx.puts {
- tx.st.data[k] = v
- }
- for k := range tx.deletes {
- delete(tx.st.data, k)
- }
- tx.st.lastCommitSeq = tx.st.lastSeq
- return nil
-}
-
-func (tx *transaction) Abort() error {
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- if err := tx.checkError(); err != nil {
- return err
- }
- tx.err = errAbortedTxn
- return nil
-}
-
-func (tx *transaction) ResetForRetry() {
- tx.puts = make(map[string][]byte)
- tx.deletes = make(map[string]struct{})
- tx.err = nil
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock()
- tx.st.lastSeq++
- tx.seq = tx.st.lastSeq
-}
-
-////////////////////////////////////////
-// memstore methods
-
-func (st *memstore) Scan(start, end []byte) (store.Stream, error) {
- vlog.Fatal("not implemented")
- return nil, nil
-}
-
-func (st *memstore) Get(key, valbuf []byte) ([]byte, error) {
- st.mu.Lock()
- defer st.mu.Unlock()
- v, ok := st.data[string(key)]
- if !ok {
- return nil, &store.ErrUnknownKey{Key: string(key)}
- }
- return v, nil
-}
-
-func (st *memstore) Put(key, value []byte) error {
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Put(key, value)
- })
-}
-
-func (st *memstore) Delete(key []byte) error {
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Delete(key)
- })
-}
-
-func (st *memstore) NewTransaction() store.Transaction {
- st.mu.Lock()
- defer st.mu.Unlock()
- st.lastSeq++
- return newTxn(st, st.lastSeq)
-}
-
-func (st *memstore) NewSnapshot() store.Snapshot {
- vlog.Fatal("not implemented")
- return nil
-}
-
-func (st *memstore) Close() error {
- return nil
-}
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
new file mode 100644
index 0000000..180cd14
--- /dev/null
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -0,0 +1,72 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package memstore
+
+import (
+ "errors"
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+var (
+ errClosedSnapshot = errors.New("closed snapshot")
+)
+
+type snapshot struct {
+ mu sync.Mutex
+ data map[string][]byte
+ err error
+}
+
+var _ store.Snapshot = (*snapshot)(nil)
+
+// Assumes st lock is held.
+func newSnapshot(st *memstore) *snapshot {
+ dataCopy := map[string][]byte{}
+ for k, v := range st.data {
+ dataCopy[k] = v
+ }
+ return &snapshot{data: dataCopy}
+}
+
+func (s *snapshot) error() error {
+ return s.err
+}
+
+// Close implements the store.Snapshot interface.
+func (s *snapshot) Close() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if err := s.error(); err != nil {
+ return err
+ }
+ s.err = errClosedSnapshot
+ return nil
+}
+
+// Get implements the store.StoreReader interface.
+func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if err := s.error(); err != nil {
+ return nil, err
+ }
+ value, ok := s.data[string(key)]
+ if !ok {
+ return nil, &store.ErrUnknownKey{Key: string(key)}
+ }
+ return store.CopyBytes(valbuf, value), nil
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if err := s.error(); err != nil {
+ return nil, err
+ }
+ return newStream(s, start, end), nil
+}
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
new file mode 100644
index 0000000..36a0758
--- /dev/null
+++ b/services/syncbase/store/memstore/store.go
@@ -0,0 +1,82 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package memstore provides a simple, in-memory implementation of store.Store.
+// Since it's a prototype implementation, it makes no attempt to be performant.
+package memstore
+
+import (
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type memstore struct {
+ mu sync.Mutex
+ data map[string][]byte
+ // Most recent sequence number handed out.
+ lastSeq uint64
+ // Value of lastSeq at the time of the most recent commit.
+ lastCommitSeq uint64
+}
+
+var _ store.Store = (*memstore)(nil)
+
+// New creates a new memstore.
+func New() store.Store {
+ return &memstore{data: map[string][]byte{}}
+}
+
+// Close implements the store.Store interface.
+func (st *memstore) Close() error {
+ // TODO(sadovsky): Make all subsequent method calls and stream advances fail.
+ return nil
+}
+
+// Get implements the store.StoreReader interface.
+func (st *memstore) Get(key, valbuf []byte) ([]byte, error) {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ value, ok := st.data[string(key)]
+ if !ok {
+ return nil, &store.ErrUnknownKey{Key: string(key)}
+ }
+ return store.CopyBytes(valbuf, value), nil
+}
+
+// Scan implements the store.StoreReader interface.
+func (st *memstore) Scan(start, end []byte) (store.Stream, error) {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ return newStream(newSnapshot(st), start, end), nil
+}
+
+// Put implements the store.StoreWriter interface.
+func (st *memstore) Put(key, value []byte) error {
+ return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ return st.Put(key, value)
+ })
+}
+
+// Delete implements the store.StoreWriter interface.
+func (st *memstore) Delete(key []byte) error {
+ return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ return st.Delete(key)
+ })
+}
+
+// NewTransaction implements the store.Store interface.
+func (st *memstore) NewTransaction() store.Transaction {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ st.lastSeq++
+ return newTransaction(st, st.lastSeq)
+}
+
+// NewSnapshot implements the store.Store interface.
+func (st *memstore) NewSnapshot() store.Snapshot {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ return newSnapshot(st)
+}
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
new file mode 100644
index 0000000..d6b0371
--- /dev/null
+++ b/services/syncbase/store/memstore/store_test.go
@@ -0,0 +1,36 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package memstore
+
+import (
+ "runtime"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store/test"
+)
+
+func init() {
+ runtime.GOMAXPROCS(10)
+}
+
+func TestReadWriteBasic(t *testing.T) {
+ st := New()
+ defer st.Close()
+ test.RunReadWriteBasicTest(t, st)
+}
+
+func TestReadWriteRandom(t *testing.T) {
+ st := New()
+ defer st.Close()
+ test.RunReadWriteRandomTest(t, st)
+}
+
+func TestTransactionsWithGet(t *testing.T) {
+ st := New()
+ defer st.Close()
+ // TODO(sadovsky): Enable this test once we've added a retry loop to
+ // RunInTransaction. Without that, concurrency makes the test fail.
+ //test.RunTransactionsWithGetTest(t, st)
+}
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
new file mode 100644
index 0000000..1de276a
--- /dev/null
+++ b/services/syncbase/store/memstore/stream.go
@@ -0,0 +1,93 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package memstore
+
+import (
+ "sort"
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
+)
+
+type stream struct {
+ mu sync.Mutex
+ sn *snapshot
+ keys []string
+ currIndex int
+ currKey *string
+ err error
+}
+
+var _ store.Stream = (*stream)(nil)
+
+func newStream(sn *snapshot, start, end []byte) *stream {
+ keys := []string{}
+ for k := range sn.data {
+ if k >= string(start) && k < string(end) {
+ keys = append(keys, k)
+ }
+ }
+ sort.Strings(keys)
+ return &stream{
+ sn: sn,
+ keys: keys,
+ currIndex: -1,
+ }
+}
+
+// Advance implements the store.Stream interface.
+func (s *stream) Advance() bool {
+ // TODO(sadovsky): Advance should return false and Err should return a non-nil
+ // error if the Store was closed, or if the Snapshot was closed, or if the
+ // Transaction was committed or aborted (or timed out).
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.err != nil {
+ s.currKey = nil
+ } else {
+ s.currIndex++
+ if s.currIndex < len(s.keys) {
+ s.currKey = &s.keys[s.currIndex]
+ } else {
+ s.currKey = nil
+ }
+ }
+ return s.currKey != nil
+}
+
+// Key implements the store.Stream interface.
+func (s *stream) Key(keybuf []byte) []byte {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.currKey == nil {
+ panic("nothing staged")
+ }
+ return store.CopyBytes(keybuf, []byte(*s.currKey))
+}
+
+// Value implements the store.Stream interface.
+func (s *stream) Value(valbuf []byte) []byte {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.currKey == nil {
+ panic("nothing staged")
+ }
+ return store.CopyBytes(valbuf, s.sn.data[*s.currKey])
+}
+
+// Err implements the store.Stream interface.
+func (s *stream) Err() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.err
+}
+
+// Cancel implements the store.Stream interface.
+func (s *stream) Cancel() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.err = verror.New(verror.ErrAborted, nil)
+}
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
new file mode 100644
index 0000000..3ef45aa
--- /dev/null
+++ b/services/syncbase/store/memstore/transaction.go
@@ -0,0 +1,159 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package memstore
+
+import (
+ "errors"
+ "sync"
+ "time"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+var (
+ txnTimeout = time.Duration(5) * time.Second
+ errExpiredTxn = errors.New("expired transaction")
+ errCommittedTxn = errors.New("committed transaction")
+ errAbortedTxn = errors.New("aborted transaction")
+ errAttemptedCommit = errors.New("already attempted to commit transaction")
+)
+
+type transaction struct {
+ mu sync.Mutex
+ st *memstore
+ sn *snapshot
+ // The following fields are used to determine whether method calls should
+ // error out.
+ err error
+ seq uint64
+ createdTime time.Time
+ // The following fields track writes performed against this transaction.
+ puts map[string][]byte
+ deletes map[string]struct{}
+}
+
+var _ store.Transaction = (*transaction)(nil)
+
+func newTransaction(st *memstore, seq uint64) *transaction {
+ return &transaction{
+ st: st,
+ sn: newSnapshot(st),
+ seq: seq,
+ createdTime: time.Now(),
+ puts: map[string][]byte{},
+ deletes: map[string]struct{}{},
+ }
+}
+
+func (tx *transaction) expired() bool {
+ return time.Now().After(tx.createdTime.Add(txnTimeout))
+}
+
+func (tx *transaction) error() error {
+ if tx.err != nil {
+ return tx.err
+ }
+ if tx.expired() {
+ return errExpiredTxn
+ }
+ return nil
+}
+
+// ResetForRetry implements the store.Transaction interface.
+func (tx *transaction) ResetForRetry() {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ tx.puts = make(map[string][]byte)
+ tx.deletes = make(map[string]struct{})
+ tx.err = nil
+ tx.st.mu.Lock()
+ defer tx.st.mu.Unlock() // note, defer is last-in-first-out
+ tx.st.lastSeq++
+ tx.seq = tx.st.lastSeq
+ tx.sn = newSnapshot(tx.st)
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if err := tx.error(); err != nil {
+ return nil, err
+ }
+ return tx.sn.Get(key, valbuf)
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if err := tx.error(); err != nil {
+ return nil, err
+ }
+ return newStream(tx.sn, start, end), nil
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *transaction) Put(key, value []byte) error {
+ tx.st.mu.Lock()
+ defer tx.st.mu.Unlock()
+ if err := tx.error(); err != nil {
+ return err
+ }
+ delete(tx.deletes, string(key))
+ tx.puts[string(key)] = value
+ return nil
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *transaction) Delete(key []byte) error {
+ tx.st.mu.Lock()
+ defer tx.st.mu.Unlock()
+ if err := tx.error(); err != nil {
+ return err
+ }
+ delete(tx.puts, string(key))
+ tx.deletes[string(key)] = struct{}{}
+ return nil
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *transaction) Commit() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if err := tx.error(); err != nil {
+ return err
+ }
+ tx.sn.Close()
+ tx.st.mu.Lock()
+ defer tx.st.mu.Unlock() // note, defer is last-in-first-out
+ if tx.seq <= tx.st.lastCommitSeq {
+ // Once Commit() has failed with store.ErrConcurrentTransaction, subsequent
+ // ops on the transaction will fail with errAttemptedCommit.
+ tx.err = errAttemptedCommit
+ return &store.ErrConcurrentTransaction{}
+ }
+ tx.err = errCommittedTxn
+ for k, v := range tx.puts {
+ tx.st.data[k] = v
+ }
+ for k := range tx.deletes {
+ delete(tx.st.data, k)
+ }
+ tx.st.lastCommitSeq = tx.st.lastSeq
+ return nil
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *transaction) Abort() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if err := tx.error(); err != nil {
+ return err
+ }
+ tx.sn.Close()
+ tx.err = errAbortedTxn
+ return nil
+}
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index ee02ac3..926b66d 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -8,15 +8,15 @@
// StoreReader reads data from a CRUD-capable storage engine.
type StoreReader interface {
- // Scan returns all rows with keys in range [start, end).
- Scan(start, end []byte) (Stream, error)
-
// Get returns the value for the given key. The returned slice may be a
// sub-slice of valbuf if valbuf was large enough to hold the entire value.
- // Otherwise, a newly allocated slice will be returned. It is valid to pass
- // a nil valbuf.
+ // Otherwise, a newly allocated slice will be returned. It is valid to pass a
+ // nil valbuf.
// Fails if the given key is unknown (ErrUnknownKey).
Get(key, valbuf []byte) ([]byte, error)
+
+ // Scan returns all rows with keys in range [start, end).
+ Scan(start, end []byte) (Stream, error)
}
// StoreWriter writes data to a CRUD-capable storage engine.
@@ -39,6 +39,9 @@
type Store interface {
StoreReadWriter
+ // Close closes the store.
+ Close() error
+
// NewTransaction creates a transaction.
// TODO(rogulenko): add transaction options.
NewTransaction() Transaction
@@ -46,24 +49,19 @@
// NewSnapshot creates a snapshot.
// TODO(rogulenko): add snapshot options.
NewSnapshot() Snapshot
-
- // Close closes the store.
- Close() error
}
// Transaction provides a mechanism for atomic reads and writes.
-// Reads don't reflect writes performed inside this transaction. (This
-// limitation is imposed for API parity with Spanner.)
+//
+// Reads don't reflect writes performed within this transaction.
// Once a transaction has been committed or aborted, subsequent method calls
// will fail with no effect.
-//
-// Operations on a transaction may start failing with ErrConcurrentTransaction
-// if writes from a newly-committed transaction conflict with reads or writes
-// from this transaction.
type Transaction interface {
StoreReadWriter
// Commit commits the transaction.
+ // Fails if writes from outside this transaction conflict with reads from
+ // within this transaction.
Commit() error
// Abort aborts the transaction.
@@ -75,9 +73,9 @@
}
// Snapshot is a handle to particular state in time of a Store.
-// All read operations are executed against a consistent snapshot of Store
-// commit history. Snapshots don't acquire locks and thus don't block
-// transactions.
+//
+// All read operations are executed against a consistent view of Store commit
+// history. Snapshots don't acquire locks and thus don't block transactions.
type Snapshot interface {
StoreReader
@@ -124,24 +122,6 @@
Cancel()
}
-// TODO(sadovsky): Add retry loop.
-func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
- tx := st.NewTransaction()
- if err := fn(tx); err != nil {
- tx.Abort()
- return err
- }
- if err := tx.Commit(); err != nil {
- // TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
- // failure or ErrConcurrentTransaction. Depending on the cause of failure,
- // it may be desirable to retry the Commit() and/or to call Abort(). For
- // now, we always abort on a failed commit.
- tx.Abort()
- return err
- }
- return nil
-}
-
type ErrConcurrentTransaction struct{}
func (e *ErrConcurrentTransaction) Error() string {
diff --git a/services/syncbase/store/test/test.go b/services/syncbase/store/test/test.go
index 6f46d50..0643bae 100644
--- a/services/syncbase/store/test/test.go
+++ b/services/syncbase/store/test/test.go
@@ -38,26 +38,25 @@
return res
}
-// dbState is the in-memory representation of the database state.
-type dbState struct {
- // We assume that the database has keys from range [0..dbSize).
- dbSize int
+// storeState is the in-memory representation of the store state.
+type storeState struct {
+ // We assume that the database has keys [0..size).
+ size int
rnd *rand.Rand
memtable map[string][]byte
}
-func newDBState(dbSize int) *dbState {
- s := &dbState{
- dbSize,
+func newStoreState(size int) *storeState {
+ return &storeState{
+ size,
rand.New(rand.NewSource(239017)),
make(map[string][]byte),
}
- return s
}
-func (s *dbState) clone() *dbState {
- other := &dbState{
- s.dbSize,
+func (s *storeState) clone() *storeState {
+ other := &storeState{
+ s.size,
s.rnd,
make(map[string][]byte),
}
@@ -67,10 +66,10 @@
return other
}
-// nextKey returns minimal key in the database that is not less than the
-// provided key. In case of no such key, returns dbSize.
-func (s *dbState) lowerBound(key int) int {
- for key < s.dbSize {
+// nextKey returns the smallest key in the store that is not less than the
+// provided key. If there is no such key, returns size.
+func (s *storeState) lowerBound(key int) int {
+ for key < s.size {
if _, ok := s.memtable[fmt.Sprintf("%05d", key)]; ok {
return key
}
@@ -79,13 +78,13 @@
return key
}
-// verify ensures that various read operation on store.Store and memtable
-// return the same result.
-func (s *dbState) verify(t *testing.T, st store.StoreReader) {
+// verify checks that various read operations on store.Store and memtable return
+// the same results.
+func (s *storeState) verify(t *testing.T, st store.StoreReader) {
var key, value []byte
var err error
// Verify Get().
- for i := 0; i < s.dbSize; i++ {
+ for i := 0; i < s.size; i++ {
keystr := fmt.Sprintf("%05d", i)
answer, ok := s.memtable[keystr]
key = []byte(keystr)
@@ -102,7 +101,7 @@
}
// Verify 10 random Scan() calls.
for i := 0; i < 10; i++ {
- start, end := s.rnd.Intn(s.dbSize), s.rnd.Intn(s.dbSize)
+ start, end := s.rnd.Intn(s.size), s.rnd.Intn(s.size)
if start > end {
start, end = end, start
}
@@ -130,15 +129,15 @@
}
// runReadWriteTest verifies read/write/snapshot operations.
-func runReadWriteTest(t *testing.T, st store.Store, dbSize int, steps []testStep) {
- s := newDBState(dbSize)
- // We verify database state not more than ~100 times to prevent the test
- // from being slow.
+func runReadWriteTest(t *testing.T, st store.Store, size int, steps []testStep) {
+ s := newStoreState(size)
+ // We verify database state no more than ~100 times to prevent the test from
+ // being slow.
frequency := (len(steps) + 99) / 100
- var states []*dbState
+ var states []*storeState
var snapshots []store.Snapshot
for i, step := range steps {
- if step.key < 0 || step.key >= s.dbSize {
+ if step.key < 0 || step.key >= s.size {
t.Fatalf("invalid test step %v", step)
}
switch step.op {
@@ -186,21 +185,22 @@
func RunReadWriteRandomTest(t *testing.T, st store.Store) {
rnd := rand.New(rand.NewSource(239017))
var testcase []testStep
- dbSize := 50
+ size := 50
for i := 0; i < 10000; i++ {
- testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(dbSize)})
+ testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
}
- runReadWriteTest(t, st, dbSize, testcase)
+ runReadWriteTest(t, st, size, testcase)
}
// RunTransactionsWithGetTest tests transactions that use Put and Get
// operations.
// NOTE: consider setting GOMAXPROCS to something greater than 1.
func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
- // Invariant: value mapped to n stores sum of values of 0..n-1.
- // Each of k transactions takes m distinct random values from 0..n-1,
- // adds 1 to each and m to value mapped to n.
- // The correctness of sums is checked after all transactions are committed.
+ // Invariant: value mapped to n is sum of values of 0..n-1.
+ // Each of k transactions takes m distinct random values from 0..n-1, adds 1
+ // to each and m to value mapped to n.
+ // The correctness of sums is checked after all transactions have been
+ // committed.
n, m, k := 10, 3, 100
for i := 0; i <= n; i++ {
if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
@@ -209,6 +209,8 @@
}
var wg sync.WaitGroup
wg.Add(k)
+ // TODO(sadovsky): This configuration creates huge resource contention.
+ // Perhaps we should add some random sleep's to reduce the contention.
for i := 0; i < k; i++ {
go func() {
rnd := rand.New(rand.NewSource(239017 * int64(i)))
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
new file mode 100644
index 0000000..d36b260
--- /dev/null
+++ b/services/syncbase/store/util.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package store
+
+// TODO(sadovsky): Add retry loop.
+func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
+ tx := st.NewTransaction()
+ if err := fn(tx); err != nil {
+ tx.Abort()
+ return err
+ }
+ if err := tx.Commit(); err != nil {
+ // TODO(sadovsky): Commit() can fail for a number of reasons, e.g. RPC
+ // failure or ErrConcurrentTransaction. Depending on the cause of failure,
+ // it may be desirable to retry the Commit() and/or to call Abort(). For
+ // now, we always abort on a failed commit.
+ tx.Abort()
+ return err
+ }
+ return nil
+}
+
+// CopyBytes copies elements from a source slice into a destination slice.
+// The returned slice may be a sub-slice of dst if dst was large enough to hold
+// src. Otherwise, a newly allocated slice will be returned.
+func CopyBytes(dst, src []byte) []byte {
+ if cap(dst) < len(src) {
+ newlen := cap(dst)*2 + 2
+ if newlen < len(src) {
+ newlen = len(src)
+ }
+ dst = make([]byte, newlen)
+ }
+ copy(dst, src)
+ return dst[:len(src)]
+}