store/leveldb: make stream thread-safe

Wrappign stream functions with a mutex;
making Key and Value always return correct result is Advance returned true.
Left a TODO to wrap constructor with a db-scoped mutex.

Change-Id: I3355a1d4e50f5b3bc76f8e57b0d20a6398568b89
diff --git a/x/ref/services/syncbase/store/leveldb/db_test.go b/x/ref/services/syncbase/store/leveldb/db_test.go
index c076a4e..fccad5f 100644
--- a/x/ref/services/syncbase/store/leveldb/db_test.go
+++ b/x/ref/services/syncbase/store/leveldb/db_test.go
@@ -18,6 +18,12 @@
 	runtime.GOMAXPROCS(10)
 }
 
+func TestStream(t *testing.T) {
+	db, dbPath := newDB()
+	defer destroyDB(db, dbPath)
+	test.RunStreamTest(t, db)
+}
+
 func TestReadWriteBasic(t *testing.T) {
 	st, path := newDB()
 	defer destroyDB(st, path)
diff --git a/x/ref/services/syncbase/store/leveldb/stream.go b/x/ref/services/syncbase/store/leveldb/stream.go
index c19bfa5..f348567 100644
--- a/x/ref/services/syncbase/store/leveldb/stream.go
+++ b/x/ref/services/syncbase/store/leveldb/stream.go
@@ -9,26 +9,37 @@
 import "C"
 import (
 	"bytes"
-	"errors"
+	"sync"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
 )
 
 // stream is a wrapper around LevelDB iterator that implements
 // the store.Stream interface.
-// TODO(rogulenko): ensure thread safety.
 type stream struct {
+	// mu protects the state of the stream.
+	mu    sync.Mutex
 	cIter *C.syncbase_leveldb_iterator_t
 	end   []byte
 
 	hasAdvanced bool
 	err         error
+
+	// hasValue is true iff a value has been staged. If hasValue is true,
+	// key and value point to the staged key/value pair. The underlying buffers
+	// of key and value are allocated on the C heap until Cancel is called,
+	// at which point they are copied to the Go heap.
+	hasValue bool
+	key      []byte
+	value    []byte
 }
 
 var _ store.Stream = (*stream)(nil)
 
 func newStream(d *db, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
 	cStr, size := cSlice(start)
+	// TODO(rogulenko): check if (db.cDb != nil) under a db-scoped mutex.
 	cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
 	return &stream{
 		cIter: cIter,
@@ -36,68 +47,84 @@
 	}
 }
 
-func (it *stream) destroyLeveldbIter() {
-	C.syncbase_leveldb_iter_destroy(it.cIter)
-	it.cIter = nil
+func (s *stream) destroyLeveldbIter() {
+	C.syncbase_leveldb_iter_destroy(s.cIter)
+	s.cIter = nil
 }
 
 // Advance implements the store.Stream interface.
-func (it *stream) Advance() bool {
-	if it.cIter == nil {
+func (s *stream) Advance() bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.hasValue = false
+	if s.cIter == nil {
 		return false
 	}
 	// The C iterator starts out initialized, pointing at the first value; we
 	// shouldn't move it during the first Advance() call.
-	if !it.hasAdvanced {
-		it.hasAdvanced = true
+	if !s.hasAdvanced {
+		s.hasAdvanced = true
 	} else {
-		C.syncbase_leveldb_iter_next(it.cIter)
+		C.syncbase_leveldb_iter_next(s.cIter)
 	}
-	if it.cIter.is_valid != 0 && bytes.Compare(it.end, it.cKey()) > 0 {
+	if s.cIter.is_valid != 0 && bytes.Compare(s.end, s.cKey()) > 0 {
+		s.hasValue = true
+		s.key = s.cKey()
+		s.value = s.cVal()
 		return true
 	}
 
 	var cError *C.char
-	C.syncbase_leveldb_iter_get_error(it.cIter, &cError)
-	it.err = goError(cError)
-	it.destroyLeveldbIter()
+	C.syncbase_leveldb_iter_get_error(s.cIter, &cError)
+	s.err = goError(cError)
+	s.destroyLeveldbIter()
 	return false
 }
 
 // Key implements the store.Stream interface.
-func (it *stream) Key(keybuf []byte) []byte {
-	if !it.hasAdvanced {
-		panic("stream has never been advanced")
+func (s *stream) Key(keybuf []byte) []byte {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if !s.hasValue {
+		panic("nothing staged")
 	}
-	if it.cIter == nil {
-		panic("illegal state")
-	}
-	return store.CopyBytes(keybuf, it.cKey())
+	return store.CopyBytes(keybuf, s.key)
 }
 
 // Value implements the store.Stream interface.
-func (it *stream) Value(valbuf []byte) []byte {
-	if !it.hasAdvanced {
-		panic("stream has never been advanced")
+func (s *stream) Value(valbuf []byte) []byte {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if !s.hasValue {
+		panic("nothing staged")
 	}
-	if it.cIter == nil {
-		panic("illegal state")
-	}
-	return store.CopyBytes(valbuf, it.cVal())
+	return store.CopyBytes(valbuf, s.value)
 }
 
 // Err implements the store.Stream interface.
-func (it *stream) Err() error {
-	return it.err
+func (s *stream) Err() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.err
 }
 
 // Cancel implements the store.Stream interface.
-func (it *stream) Cancel() {
-	if it.cIter == nil {
+// TODO(rogulenko): make Cancel non-blocking.
+func (s *stream) Cancel() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.cIter == nil {
 		return
 	}
-	it.err = errors.New("canceled")
-	it.destroyLeveldbIter()
+	s.err = verror.New(verror.ErrCanceled, nil)
+	// s.hasValue might be false if Advance was never called.
+	if s.hasValue {
+		// We copy the key and the value from the C heap to the Go heap before
+		// deallocating the C iterator.
+		s.key = store.CopyBytes(nil, s.cKey())
+		s.value = store.CopyBytes(nil, s.cVal())
+	}
+	s.destroyLeveldbIter()
 }
 
 // cKey returns the current key.
diff --git a/x/ref/services/syncbase/store/memstore/store_test.go b/x/ref/services/syncbase/store/memstore/store_test.go
index d6b0371..1ba6e85 100644
--- a/x/ref/services/syncbase/store/memstore/store_test.go
+++ b/x/ref/services/syncbase/store/memstore/store_test.go
@@ -15,6 +15,12 @@
 	runtime.GOMAXPROCS(10)
 }
 
+func TestStream(t *testing.T) {
+	st := New()
+	defer st.Close()
+	test.RunStreamTest(t, st)
+}
+
 func TestReadWriteBasic(t *testing.T) {
 	st := New()
 	defer st.Close()
diff --git a/x/ref/services/syncbase/store/memstore/stream.go b/x/ref/services/syncbase/store/memstore/stream.go
index 1de276a..f1614eb 100644
--- a/x/ref/services/syncbase/store/memstore/stream.go
+++ b/x/ref/services/syncbase/store/memstore/stream.go
@@ -89,5 +89,5 @@
 func (s *stream) Cancel() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	s.err = verror.New(verror.ErrAborted, nil)
+	s.err = verror.New(verror.ErrCanceled, nil)
 }
diff --git a/x/ref/services/syncbase/store/test/stream.go b/x/ref/services/syncbase/store/test/stream.go
new file mode 100644
index 0000000..295d683
--- /dev/null
+++ b/x/ref/services/syncbase/store/test/stream.go
@@ -0,0 +1,57 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+	"bytes"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
+)
+
+// RunStreamTest verifies store.Stream operations.
+func RunStreamTest(t *testing.T, st store.Store) {
+	key1, value1 := []byte("key1"), []byte("value1")
+	st.Put(key1, value1)
+	key2, value2 := []byte("key2"), []byte("value2")
+	st.Put(key2, value2)
+	key3, value3 := []byte("key3"), []byte("value3")
+	st.Put(key3, value3)
+
+	s, _ := st.Scan([]byte("a"), []byte("z"))
+	if !s.Advance() {
+		t.Fatalf("can't advance the stream")
+	}
+	var key, value []byte
+	for i := 0; i < 2; i++ {
+		if key = s.Key(key); !bytes.Equal(key, key1) {
+			t.Fatalf("unexpected key: got %q, want %q", key, key1)
+		}
+		if value = s.Value(value); !bytes.Equal(value, value1) {
+			t.Fatalf("unexpected value: got %q, want %q", value, value1)
+		}
+	}
+
+	if !s.Advance() {
+		t.Fatalf("can't advance the stream")
+	}
+	s.Cancel()
+	for i := 0; i < 2; i++ {
+		if key = s.Key(key); !bytes.Equal(key, key2) {
+			t.Fatalf("unexpected key: got %q, want %q", key, key2)
+		}
+		if value = s.Value(value); !bytes.Equal(value, value2) {
+			t.Fatalf("unexpected value: got %q, want %q", value, value2)
+		}
+	}
+
+	if s.Advance() {
+		t.Fatalf("advance returned true unexpectedly")
+	}
+	if verror.ErrorID(s.Err()) != verror.ErrCanceled.ID {
+		t.Fatalf("unexpected steam error: %v", s.Err())
+	}
+}