runtime/internal/flow/conn: Fix a bug in the readq that caused tests
to sometimes flake.

Change-Id: Iddd8fc214e0f2d725a561fd2157b2e6d2588b494
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 2c3ca40..3ff2767 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -43,15 +43,22 @@
 	}(want)
 
 	af := <-flows
+	read := 0
 	for len(want) > 0 {
 		got, err := af.ReadMsg()
 		if err != nil && err != io.EOF {
 			t.Fatalf("Unexpected error: %v", err)
 		}
 		if !bytes.Equal(got, want[:len(got)]) {
-			t.Fatalf("Got: %s want %s", got, want)
+			pl := len(got)
+			if pl > 100 {
+				pl = 100
+			}
+			pg, pw := got[:pl], want[:pl]
+			t.Fatalf("On read %d got: %v want %v", read, pg, pw)
 		}
 		want = want[len(got):]
+		read++
 	}
 	if len(want) != 0 {
 		t.Errorf("got %d leftover bytes, expected 0.", len(want))
diff --git a/runtime/internal/flow/conn/readq.go b/runtime/internal/flow/conn/readq.go
index a2c09e6..cd61bc7 100644
--- a/runtime/internal/flow/conn/readq.go
+++ b/runtime/internal/flow/conn/readq.go
@@ -17,6 +17,7 @@
 	b, e int
 
 	size      int
+	nbufs     int
 	toRelease int
 	notify    chan struct{}
 }
@@ -32,9 +33,6 @@
 }
 
 func (r *readq) put(ctx *context.T, bufs [][]byte) error {
-	if len(bufs) == 0 {
-		return nil
-	}
 	l := 0
 	for _, b := range bufs {
 		l += len(b)
@@ -53,13 +51,13 @@
 	if newSize > defaultBufferSize {
 		return NewErrCounterOverflow(ctx)
 	}
-	if r.e == r.b {
-		r.reserveLocked(len(bufs))
-	}
+	newBufs := r.nbufs + len(bufs)
+	r.reserveLocked(newBufs)
 	for _, b := range bufs {
 		r.bufs[r.e] = b
 		r.e = (r.e + 1) % len(r.bufs)
 	}
+	r.nbufs = newBufs
 	if r.size == 0 {
 		select {
 		case r.notify <- struct{}{}:
@@ -82,6 +80,7 @@
 	if len(buf) > 0 {
 		r.bufs[r.b] = buf
 	} else {
+		r.nbufs -= 1
 		r.b = (r.b + 1) % len(r.bufs)
 	}
 	r.size -= n
@@ -98,6 +97,7 @@
 	out = r.bufs[r.b]
 	r.b = (r.b + 1) % len(r.bufs)
 	r.size -= len(out)
+	r.nbufs -= 1
 	r.toRelease += len(out)
 	return out, r.toRelease > defaultBufferSize/2, nil
 }
@@ -131,16 +131,17 @@
 }
 
 func (r *readq) reserveLocked(n int) {
-	needed := n + r.e - r.b
-	if r.e < r.b {
-		needed += len(r.bufs)
-	}
-	if needed < len(r.bufs) {
+	if n < len(r.bufs) {
 		return
 	}
-	nb := make([][]byte, 2*needed)
-	copied := copy(nb, r.bufs[r.b:])
-	copied += copy(nb[n:], r.bufs[:r.e])
+	nb := make([][]byte, 2*n)
+	copied := 0
+	if r.e >= r.b {
+		copied = copy(nb, r.bufs[r.b:r.e])
+	} else {
+		copied = copy(nb, r.bufs[r.b:])
+		copied += copy(nb[copied:], r.bufs[:r.e])
+	}
 	r.bufs, r.b, r.e = nb, 0, copied
 }