Merge "TBR client test: cancel contexts with timeouts"
diff --git a/x/ref/services/syncbase/store/leveldb/stream.go b/x/ref/services/syncbase/store/leveldb/stream.go
index 5775337..3102d74 100644
--- a/x/ref/services/syncbase/store/leveldb/stream.go
+++ b/x/ref/services/syncbase/store/leveldb/stream.go
@@ -65,7 +65,7 @@
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	s.hasValue = false
-	if s.err != nil {
+	if s.cIter == nil {
 		return false
 	}
 	// The C iterator starts out initialized, pointing at the first value; we
@@ -121,7 +121,7 @@
 func (s *stream) Cancel() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	if s.err != nil {
+	if s.cIter == nil {
 		return
 	}
 	// s.hasValue will be false if Advance has never been called.
diff --git a/x/ref/services/syncbase/store/memstore/stream.go b/x/ref/services/syncbase/store/memstore/stream.go
index 9ad89a6..a8780be 100644
--- a/x/ref/services/syncbase/store/memstore/stream.go
+++ b/x/ref/services/syncbase/store/memstore/stream.go
@@ -20,6 +20,7 @@
 	currIndex int
 	currKey   *string
 	err       error
+	done      bool
 }
 
 var _ store.Stream = (*stream)(nil)
@@ -48,17 +49,18 @@
 func (s *stream) Advance() bool {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	if s.err != nil {
-		s.currKey = nil
-	} else {
-		s.currIndex++
-		if s.currIndex < len(s.keys) {
-			s.currKey = &s.keys[s.currIndex]
-		} else {
-			s.currKey = nil
-		}
+	s.currKey = nil
+	if s.done {
+		return false
 	}
-	return s.currKey != nil
+	s.currIndex++
+	if s.currIndex < len(s.keys) {
+		s.currKey = &s.keys[s.currIndex]
+	} else {
+		s.done = true
+		s.currKey = nil
+	}
+	return !s.done
 }
 
 // Key implements the store.Stream interface.
@@ -92,9 +94,10 @@
 func (s *stream) Cancel() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	if s.err != nil {
+	if s.done {
 		return
 	}
+	s.done = true
 	s.node.Close()
 	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgCanceledStream)
 }
diff --git a/x/ref/services/syncbase/store/test/stream.go b/x/ref/services/syncbase/store/test/stream.go
index 98becff..e058fc0 100644
--- a/x/ref/services/syncbase/store/test/stream.go
+++ b/x/ref/services/syncbase/store/test/stream.go
@@ -14,14 +14,26 @@
 
 // RunStreamTest verifies store.Stream operations.
 func RunStreamTest(t *testing.T, st store.Store) {
+	// Test that advancing or canceling a stream that has reached its end
+	// doesn't cause a panic.
+	s := st.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, nil, nil)
+	verifyAdvance(t, s, nil, nil)
+	if s.Err() != nil {
+		t.Fatalf("unexpected error: %v", s.Err())
+	}
+	s.Cancel()
+	if s.Err() != nil {
+		t.Fatalf("unexpected error: %v", s.Err())
+	}
+
 	key1, value1 := []byte("key1"), []byte("value1")
 	st.Put(key1, value1)
 	key2, value2 := []byte("key2"), []byte("value2")
 	st.Put(key2, value2)
 	key3, value3 := []byte("key3"), []byte("value3")
 	st.Put(key3, value3)
-
-	s := st.Scan([]byte("a"), []byte("z"))
+	s = st.Scan([]byte("a"), []byte("z"))
 	verifyAdvance(t, s, key1, value1)
 	if !s.Advance() {
 		t.Fatalf("can't advance the stream")