store/leveldb: make stream thread-safe
Wrappign stream functions with a mutex;
making Key and Value always return correct result is Advance returned true.
Left a TODO to wrap constructor with a db-scoped mutex.
Change-Id: I3355a1d4e50f5b3bc76f8e57b0d20a6398568b89
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
index c076a4e..fccad5f 100644
--- a/services/syncbase/store/leveldb/db_test.go
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -18,6 +18,12 @@
runtime.GOMAXPROCS(10)
}
+func TestStream(t *testing.T) {
+ db, dbPath := newDB()
+ defer destroyDB(db, dbPath)
+ test.RunStreamTest(t, db)
+}
+
func TestReadWriteBasic(t *testing.T) {
st, path := newDB()
defer destroyDB(st, path)
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index c19bfa5..f348567 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -9,26 +9,37 @@
import "C"
import (
"bytes"
- "errors"
+ "sync"
"v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
)
// stream is a wrapper around LevelDB iterator that implements
// the store.Stream interface.
-// TODO(rogulenko): ensure thread safety.
type stream struct {
+ // mu protects the state of the stream.
+ mu sync.Mutex
cIter *C.syncbase_leveldb_iterator_t
end []byte
hasAdvanced bool
err error
+
+ // hasValue is true iff a value has been staged. If hasValue is true,
+ // key and value point to the staged key/value pair. The underlying buffers
+ // of key and value are allocated on the C heap until Cancel is called,
+ // at which point they are copied to the Go heap.
+ hasValue bool
+ key []byte
+ value []byte
}
var _ store.Stream = (*stream)(nil)
func newStream(d *db, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
cStr, size := cSlice(start)
+ // TODO(rogulenko): check if (db.cDb != nil) under a db-scoped mutex.
cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
return &stream{
cIter: cIter,
@@ -36,68 +47,84 @@
}
}
-func (it *stream) destroyLeveldbIter() {
- C.syncbase_leveldb_iter_destroy(it.cIter)
- it.cIter = nil
+func (s *stream) destroyLeveldbIter() {
+ C.syncbase_leveldb_iter_destroy(s.cIter)
+ s.cIter = nil
}
// Advance implements the store.Stream interface.
-func (it *stream) Advance() bool {
- if it.cIter == nil {
+func (s *stream) Advance() bool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.hasValue = false
+ if s.cIter == nil {
return false
}
// The C iterator starts out initialized, pointing at the first value; we
// shouldn't move it during the first Advance() call.
- if !it.hasAdvanced {
- it.hasAdvanced = true
+ if !s.hasAdvanced {
+ s.hasAdvanced = true
} else {
- C.syncbase_leveldb_iter_next(it.cIter)
+ C.syncbase_leveldb_iter_next(s.cIter)
}
- if it.cIter.is_valid != 0 && bytes.Compare(it.end, it.cKey()) > 0 {
+ if s.cIter.is_valid != 0 && bytes.Compare(s.end, s.cKey()) > 0 {
+ s.hasValue = true
+ s.key = s.cKey()
+ s.value = s.cVal()
return true
}
var cError *C.char
- C.syncbase_leveldb_iter_get_error(it.cIter, &cError)
- it.err = goError(cError)
- it.destroyLeveldbIter()
+ C.syncbase_leveldb_iter_get_error(s.cIter, &cError)
+ s.err = goError(cError)
+ s.destroyLeveldbIter()
return false
}
// Key implements the store.Stream interface.
-func (it *stream) Key(keybuf []byte) []byte {
- if !it.hasAdvanced {
- panic("stream has never been advanced")
+func (s *stream) Key(keybuf []byte) []byte {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.hasValue {
+ panic("nothing staged")
}
- if it.cIter == nil {
- panic("illegal state")
- }
- return store.CopyBytes(keybuf, it.cKey())
+ return store.CopyBytes(keybuf, s.key)
}
// Value implements the store.Stream interface.
-func (it *stream) Value(valbuf []byte) []byte {
- if !it.hasAdvanced {
- panic("stream has never been advanced")
+func (s *stream) Value(valbuf []byte) []byte {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.hasValue {
+ panic("nothing staged")
}
- if it.cIter == nil {
- panic("illegal state")
- }
- return store.CopyBytes(valbuf, it.cVal())
+ return store.CopyBytes(valbuf, s.value)
}
// Err implements the store.Stream interface.
-func (it *stream) Err() error {
- return it.err
+func (s *stream) Err() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.err
}
// Cancel implements the store.Stream interface.
-func (it *stream) Cancel() {
- if it.cIter == nil {
+// TODO(rogulenko): make Cancel non-blocking.
+func (s *stream) Cancel() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.cIter == nil {
return
}
- it.err = errors.New("canceled")
- it.destroyLeveldbIter()
+ s.err = verror.New(verror.ErrCanceled, nil)
+ // s.hasValue might be false if Advance was never called.
+ if s.hasValue {
+ // We copy the key and the value from the C heap to the Go heap before
+ // deallocating the C iterator.
+ s.key = store.CopyBytes(nil, s.cKey())
+ s.value = store.CopyBytes(nil, s.cVal())
+ }
+ s.destroyLeveldbIter()
}
// cKey returns the current key.
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index d6b0371..1ba6e85 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -15,6 +15,12 @@
runtime.GOMAXPROCS(10)
}
+func TestStream(t *testing.T) {
+ st := New()
+ defer st.Close()
+ test.RunStreamTest(t, st)
+}
+
func TestReadWriteBasic(t *testing.T) {
st := New()
defer st.Close()
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index 1de276a..f1614eb 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -89,5 +89,5 @@
func (s *stream) Cancel() {
s.mu.Lock()
defer s.mu.Unlock()
- s.err = verror.New(verror.ErrAborted, nil)
+ s.err = verror.New(verror.ErrCanceled, nil)
}
diff --git a/services/syncbase/store/test/stream.go b/services/syncbase/store/test/stream.go
new file mode 100644
index 0000000..295d683
--- /dev/null
+++ b/services/syncbase/store/test/stream.go
@@ -0,0 +1,57 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "bytes"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
+)
+
+// RunStreamTest verifies store.Stream operations.
+func RunStreamTest(t *testing.T, st store.Store) {
+ key1, value1 := []byte("key1"), []byte("value1")
+ st.Put(key1, value1)
+ key2, value2 := []byte("key2"), []byte("value2")
+ st.Put(key2, value2)
+ key3, value3 := []byte("key3"), []byte("value3")
+ st.Put(key3, value3)
+
+ s, _ := st.Scan([]byte("a"), []byte("z"))
+ if !s.Advance() {
+ t.Fatalf("can't advance the stream")
+ }
+ var key, value []byte
+ for i := 0; i < 2; i++ {
+ if key = s.Key(key); !bytes.Equal(key, key1) {
+ t.Fatalf("unexpected key: got %q, want %q", key, key1)
+ }
+ if value = s.Value(value); !bytes.Equal(value, value1) {
+ t.Fatalf("unexpected value: got %q, want %q", value, value1)
+ }
+ }
+
+ if !s.Advance() {
+ t.Fatalf("can't advance the stream")
+ }
+ s.Cancel()
+ for i := 0; i < 2; i++ {
+ if key = s.Key(key); !bytes.Equal(key, key2) {
+ t.Fatalf("unexpected key: got %q, want %q", key, key2)
+ }
+ if value = s.Value(value); !bytes.Equal(value, value2) {
+ t.Fatalf("unexpected value: got %q, want %q", value, value2)
+ }
+ }
+
+ if s.Advance() {
+ t.Fatalf("advance returned true unexpectedly")
+ }
+ if verror.ErrorID(s.Err()) != verror.ErrCanceled.ID {
+ t.Fatalf("unexpected steam error: %v", s.Err())
+ }
+}