veyron/runtimes/google/ipc/stream/vc: Fix goroutine leak in cancel.
Robin noticed thousands of goroutines stacking up in the rps app:
4882 @ 0x2622c 0x262bc 0x31ac8 0x31d4c 0x26ab08 0x26464
This is caused by cancelChannel keeping a goroutine running until the
deadline for a call expires even if the associated call finishes much
earlier.
Change-Id: Ia2e22c1b150efd71b4889c77d6434bfa5e50c8f1
diff --git a/runtimes/google/ipc/stream/vc/cancel.go b/runtimes/google/ipc/stream/vc/cancel.go
index c621831..4fa5180 100644
--- a/runtimes/google/ipc/stream/vc/cancel.go
+++ b/runtimes/google/ipc/stream/vc/cancel.go
@@ -4,19 +4,29 @@
// cancelChannel creates a channel usable by bqueue.Writer.Put and upcqueue.Get
// to cancel the calls if they have not completed by the provided deadline.
+// It returns two channels, the first is the channel to supply to Put or Get
+// which will be closed when the deadline expires.
+// The second is the quit channel, which can be closed to clean up resources
+// when the first is no longer needed (the deadline is no longer worth enforcing).
//
// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func cancelChannel(deadline time.Time) chan struct{} {
+func cancelChannel(deadline time.Time) (expired, quit chan struct{}) {
if deadline.IsZero() {
- return nil
+ return nil, nil
}
- sc := make(chan struct{})
- tc := time.After(deadline.Sub(time.Now()))
- go func(dst chan struct{}, src <-chan time.Time) {
- <-src
- close(dst)
- }(sc, tc)
- return sc
+ expired = make(chan struct{})
+ quit = make(chan struct{})
+ timer := time.NewTimer(deadline.Sub(time.Now()))
+
+ go func() {
+ select {
+ case <-timer.C:
+ case <-quit:
+ }
+ timer.Stop()
+ close(expired)
+ }()
+ return expired, quit
}
// timeoutError implements net.Error with Timeout returning true.
diff --git a/runtimes/google/ipc/stream/vc/cancel_test.go b/runtimes/google/ipc/stream/vc/cancel_test.go
index e31f90f..90db06d 100644
--- a/runtimes/google/ipc/stream/vc/cancel_test.go
+++ b/runtimes/google/ipc/stream/vc/cancel_test.go
@@ -7,14 +7,14 @@
func TestCancelChannelNil(t *testing.T) {
var zero time.Time
- if cancel := cancelChannel(zero); cancel != nil {
+ if cancel, _ := cancelChannel(zero); cancel != nil {
t.Errorf("Got %v want nil with deadline %v", cancel, zero)
}
}
func TestCancelChannel(t *testing.T) {
deadline := time.Now()
- cancel := cancelChannel(deadline)
+ cancel, _ := cancelChannel(deadline)
if cancel == nil {
t.Fatalf("Got nil channel for deadline %v", deadline)
}
@@ -22,3 +22,12 @@
t.Errorf("Expected channel to be closed")
}
}
+
+func TestCancelChannelQuit(t *testing.T) {
+ deadline := time.Now().Add(time.Hour)
+ cancel, quit := cancelChannel(deadline)
+ close(quit)
+ if _, ok := <-cancel; ok {
+ t.Errorf("Expected channel to be closed")
+ }
+}
diff --git a/runtimes/google/ipc/stream/vc/reader.go b/runtimes/google/ipc/stream/vc/reader.go
index 9e26a14..f1b6f3a 100644
--- a/runtimes/google/ipc/stream/vc/reader.go
+++ b/runtimes/google/ipc/stream/vc/reader.go
@@ -21,12 +21,13 @@
// reader implements the io.Reader and SetReadDeadline interfaces for a Flow,
// backed by iobuf.Slice objects read from a upcqueue.
type reader struct {
- handler readHandler
- src *upcqueue.T
- mu sync.Mutex
- buf *iobuf.Slice // GUARDED_BY(mu)
- deadline chan struct{} // GUARDED_BY(mu)
- totalBytes uint32
+ handler readHandler
+ src *upcqueue.T
+ mu sync.Mutex
+ buf *iobuf.Slice // GUARDED_BY(mu)
+ deadline chan struct{} // GUARDED_BY(mu)
+ cancelDeadline chan struct{} // GUARDED_BY(mu)
+ totalBytes uint32
}
func newReader(h readHandler) *reader {
@@ -91,9 +92,13 @@
// does not complete by the specified deadline.
// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
func (r *reader) SetReadDeadline(t time.Time) error {
- c := cancelChannel(t)
+ c, q := cancelChannel(t)
r.mu.Lock()
+ if r.cancelDeadline != nil {
+ close(r.cancelDeadline)
+ }
r.deadline = c
+ r.cancelDeadline = q
r.mu.Unlock()
return nil
}
diff --git a/runtimes/google/ipc/stream/vc/writer.go b/runtimes/google/ipc/stream/vc/writer.go
index 383beed..5114cef 100644
--- a/runtimes/google/ipc/stream/vc/writer.go
+++ b/runtimes/google/ipc/stream/vc/writer.go
@@ -21,11 +21,12 @@
Alloc *iobuf.Allocator // Allocator for iobuf.Slice objects. GUARDED_BY(mu)
SharedCounters *vsync.Semaphore // Semaphore hosting counters shared by all flows over a VC.
- mu sync.Mutex // Guards call to Writes
- wroteOnce bool // GUARDED_BY(mu)
- deadline chan struct{} // GUARDED_BY(mu)
- isClosed bool // GUARDED_BY(mu)
- closed chan struct{} // GUARDED_BY(mu)
+ mu sync.Mutex // Guards call to Writes
+ wroteOnce bool // GUARDED_BY(mu)
+ deadline chan struct{} // GUARDED_BY(mu)
+ cancelDeadline chan struct{} // GUARDED_BY(mu)
+ isClosed bool // GUARDED_BY(mu)
+ closed chan struct{} // GUARDED_BY(mu)
// Total number of bytes filled in by all Write calls on this writer.
// Atomic operations are used to manipulate it.
@@ -141,9 +142,13 @@
// does not complete by the specified deadline.
// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
func (w *writer) SetWriteDeadline(t time.Time) error {
- c := cancelChannel(t)
+ c, q := cancelChannel(t)
w.mu.Lock()
+ if w.cancelDeadline != nil {
+ close(w.cancelDeadline)
+ }
w.deadline = c
+ w.cancelDeadline = q
w.mu.Unlock()
return nil
}