Merge "ref: Fix the flaky cancel test."
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 33db6e7..d773ae8 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -1018,9 +1018,17 @@
// Decode the response header, if it hasn't already been decoded by Recv.
if fc.response.Error == nil && !fc.response.EndStreamResults {
if err := fc.dec.Decode(&fc.response); err != nil {
- id, verr := decodeNetError(fc.ctx, err)
- berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
- return fc.close(berr)
+ select {
+ case <-fc.ctx.Done():
+ if fc.ctx.Err() == context.Canceled {
+ return verror.New(verror.ErrCanceled, fc.ctx)
+ }
+ return verror.New(verror.ErrTimeout, fc.ctx)
+ default:
+ id, verr := decodeNetError(fc.ctx, err)
+ berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
+ return fc.close(berr)
+ }
}
// The response header must indicate the streaming results have ended.
if fc.response.Error == nil && !fc.response.EndStreamResults {
diff --git a/runtime/internal/rpc/test/cancel_test.go b/runtime/internal/rpc/test/cancel_test.go
index 42ef4c2..cd5d9e9 100644
--- a/runtime/internal/rpc/test/cancel_test.go
+++ b/runtime/internal/rpc/test/cancel_test.go
@@ -176,7 +176,7 @@
// Fill up all the write buffers to ensure that cancelling works even when the stream
// is blocked.
if ref.RPCTransitionState() >= ref.XServers {
- call.Send(conn.DefaultBytesBufferedPerFlow)
+ call.Send(make([]byte, conn.DefaultBytesBufferedPerFlow-2048))
} else {
call.Send(make([]byte, vc.MaxSharedBytes))
call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))