runtime/internal/flow/conn: Fix a bug in the readq that caused tests
to sometimes flake.
Change-Id: Iddd8fc214e0f2d725a561fd2157b2e6d2588b494
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 2c3ca40..3ff2767 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -43,15 +43,22 @@
}(want)
af := <-flows
+ read := 0
for len(want) > 0 {
got, err := af.ReadMsg()
if err != nil && err != io.EOF {
t.Fatalf("Unexpected error: %v", err)
}
if !bytes.Equal(got, want[:len(got)]) {
- t.Fatalf("Got: %s want %s", got, want)
+ pl := len(got)
+ if pl > 100 {
+ pl = 100
+ }
+ pg, pw := got[:pl], want[:pl]
+ t.Fatalf("On read %d got: %v want %v", read, pg, pw)
}
want = want[len(got):]
+ read++
}
if len(want) != 0 {
t.Errorf("got %d leftover bytes, expected 0.", len(want))
diff --git a/runtime/internal/flow/conn/readq.go b/runtime/internal/flow/conn/readq.go
index a2c09e6..cd61bc7 100644
--- a/runtime/internal/flow/conn/readq.go
+++ b/runtime/internal/flow/conn/readq.go
@@ -17,6 +17,7 @@
b, e int
size int
+ nbufs int
toRelease int
notify chan struct{}
}
@@ -32,9 +33,6 @@
}
func (r *readq) put(ctx *context.T, bufs [][]byte) error {
- if len(bufs) == 0 {
- return nil
- }
l := 0
for _, b := range bufs {
l += len(b)
@@ -53,13 +51,13 @@
if newSize > defaultBufferSize {
return NewErrCounterOverflow(ctx)
}
- if r.e == r.b {
- r.reserveLocked(len(bufs))
- }
+ newBufs := r.nbufs + len(bufs)
+ r.reserveLocked(newBufs)
for _, b := range bufs {
r.bufs[r.e] = b
r.e = (r.e + 1) % len(r.bufs)
}
+ r.nbufs = newBufs
if r.size == 0 {
select {
case r.notify <- struct{}{}:
@@ -82,6 +80,7 @@
if len(buf) > 0 {
r.bufs[r.b] = buf
} else {
+ r.nbufs -= 1
r.b = (r.b + 1) % len(r.bufs)
}
r.size -= n
@@ -98,6 +97,7 @@
out = r.bufs[r.b]
r.b = (r.b + 1) % len(r.bufs)
r.size -= len(out)
+ r.nbufs -= 1
r.toRelease += len(out)
return out, r.toRelease > defaultBufferSize/2, nil
}
@@ -131,16 +131,17 @@
}
func (r *readq) reserveLocked(n int) {
- needed := n + r.e - r.b
- if r.e < r.b {
- needed += len(r.bufs)
- }
- if needed < len(r.bufs) {
+ if n < len(r.bufs) {
return
}
- nb := make([][]byte, 2*needed)
- copied := copy(nb, r.bufs[r.b:])
- copied += copy(nb[n:], r.bufs[:r.e])
+ nb := make([][]byte, 2*n)
+ copied := 0
+ if r.e >= r.b {
+ copied = copy(nb, r.bufs[r.b:r.e])
+ } else {
+ copied = copy(nb, r.bufs[r.b:])
+ copied += copy(nb[copied:], r.bufs[:r.e])
+ }
r.bufs, r.b, r.e = nb, 0, copied
}