Merge "TBR client test: cancel contexts with timeouts"
diff --git a/x/ref/services/syncbase/store/leveldb/stream.go b/x/ref/services/syncbase/store/leveldb/stream.go
index 5775337..3102d74 100644
--- a/x/ref/services/syncbase/store/leveldb/stream.go
+++ b/x/ref/services/syncbase/store/leveldb/stream.go
@@ -65,7 +65,7 @@
s.mu.Lock()
defer s.mu.Unlock()
s.hasValue = false
- if s.err != nil {
+ if s.cIter == nil {
return false
}
// The C iterator starts out initialized, pointing at the first value; we
@@ -121,7 +121,7 @@
func (s *stream) Cancel() {
s.mu.Lock()
defer s.mu.Unlock()
- if s.err != nil {
+ if s.cIter == nil {
return
}
// s.hasValue will be false if Advance has never been called.
diff --git a/x/ref/services/syncbase/store/memstore/stream.go b/x/ref/services/syncbase/store/memstore/stream.go
index 9ad89a6..a8780be 100644
--- a/x/ref/services/syncbase/store/memstore/stream.go
+++ b/x/ref/services/syncbase/store/memstore/stream.go
@@ -20,6 +20,7 @@
currIndex int
currKey *string
err error
+ done bool
}
var _ store.Stream = (*stream)(nil)
@@ -48,17 +49,18 @@
func (s *stream) Advance() bool {
s.mu.Lock()
defer s.mu.Unlock()
- if s.err != nil {
- s.currKey = nil
- } else {
- s.currIndex++
- if s.currIndex < len(s.keys) {
- s.currKey = &s.keys[s.currIndex]
- } else {
- s.currKey = nil
- }
+ s.currKey = nil
+ if s.done {
+ return false
}
- return s.currKey != nil
+ s.currIndex++
+ if s.currIndex < len(s.keys) {
+ s.currKey = &s.keys[s.currIndex]
+ } else {
+ s.done = true
+ s.currKey = nil
+ }
+ return !s.done
}
// Key implements the store.Stream interface.
@@ -92,9 +94,10 @@
func (s *stream) Cancel() {
s.mu.Lock()
defer s.mu.Unlock()
- if s.err != nil {
+ if s.done {
return
}
+ s.done = true
s.node.Close()
s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgCanceledStream)
}
diff --git a/x/ref/services/syncbase/store/test/stream.go b/x/ref/services/syncbase/store/test/stream.go
index 98becff..e058fc0 100644
--- a/x/ref/services/syncbase/store/test/stream.go
+++ b/x/ref/services/syncbase/store/test/stream.go
@@ -14,14 +14,26 @@
// RunStreamTest verifies store.Stream operations.
func RunStreamTest(t *testing.T, st store.Store) {
+ // Test that advancing or canceling a stream that has reached its end
+ // doesn't cause a panic.
+ s := st.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, nil, nil)
+ verifyAdvance(t, s, nil, nil)
+ if s.Err() != nil {
+ t.Fatalf("unexpected error: %v", s.Err())
+ }
+ s.Cancel()
+ if s.Err() != nil {
+ t.Fatalf("unexpected error: %v", s.Err())
+ }
+
key1, value1 := []byte("key1"), []byte("value1")
st.Put(key1, value1)
key2, value2 := []byte("key2"), []byte("value2")
st.Put(key2, value2)
key3, value3 := []byte("key3"), []byte("value3")
st.Put(key3, value3)
-
- s := st.Scan([]byte("a"), []byte("z"))
+ s = st.Scan([]byte("a"), []byte("z"))
verifyAdvance(t, s, key1, value1)
if !s.Advance() {
t.Fatalf("can't advance the stream")