veyron/runtimes/google/ipc/stream/vc: Fix goroutine leak in cancel.

Robin noticed thousands of goroutines stacking up in the rps app:
4882 @ 0x2622c 0x262bc 0x31ac8 0x31d4c 0x26ab08 0x26464

This is caused by cancelChannel keeping a goroutine running until the
deadline for a call expires even if the associated call finishes much
earlier.

Change-Id: Ia2e22c1b150efd71b4889c77d6434bfa5e50c8f1
diff --git a/runtimes/google/ipc/stream/vc/cancel.go b/runtimes/google/ipc/stream/vc/cancel.go
index c621831..4fa5180 100644
--- a/runtimes/google/ipc/stream/vc/cancel.go
+++ b/runtimes/google/ipc/stream/vc/cancel.go
@@ -4,19 +4,29 @@
 
 // cancelChannel creates a channel usable by bqueue.Writer.Put and upcqueue.Get
 // to cancel the calls if they have not completed by the provided deadline.
+// It returns two channels, the first is the channel to supply to Put or Get
+// which will be closed when the deadline expires.
+// The second is the quit channel, which can be closed to clean up resources
+// when the first is no longer needed (the deadline is no longer worth enforcing).
 //
 // A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func cancelChannel(deadline time.Time) chan struct{} {
+func cancelChannel(deadline time.Time) (expired, quit chan struct{}) {
 	if deadline.IsZero() {
-		return nil
+		return nil, nil
 	}
-	sc := make(chan struct{})
-	tc := time.After(deadline.Sub(time.Now()))
-	go func(dst chan struct{}, src <-chan time.Time) {
-		<-src
-		close(dst)
-	}(sc, tc)
-	return sc
+	expired = make(chan struct{})
+	quit = make(chan struct{})
+	timer := time.NewTimer(deadline.Sub(time.Now()))
+
+	go func() {
+		select {
+		case <-timer.C:
+		case <-quit:
+		}
+		timer.Stop()
+		close(expired)
+	}()
+	return expired, quit
 }
 
 // timeoutError implements net.Error with Timeout returning true.
diff --git a/runtimes/google/ipc/stream/vc/cancel_test.go b/runtimes/google/ipc/stream/vc/cancel_test.go
index e31f90f..90db06d 100644
--- a/runtimes/google/ipc/stream/vc/cancel_test.go
+++ b/runtimes/google/ipc/stream/vc/cancel_test.go
@@ -7,14 +7,14 @@
 
 func TestCancelChannelNil(t *testing.T) {
 	var zero time.Time
-	if cancel := cancelChannel(zero); cancel != nil {
+	if cancel, _ := cancelChannel(zero); cancel != nil {
 		t.Errorf("Got %v want nil with deadline %v", cancel, zero)
 	}
 }
 
 func TestCancelChannel(t *testing.T) {
 	deadline := time.Now()
-	cancel := cancelChannel(deadline)
+	cancel, _ := cancelChannel(deadline)
 	if cancel == nil {
 		t.Fatalf("Got nil channel for deadline %v", deadline)
 	}
@@ -22,3 +22,12 @@
 		t.Errorf("Expected channel to be closed")
 	}
 }
+
+func TestCancelChannelQuit(t *testing.T) {
+	deadline := time.Now().Add(time.Hour)
+	cancel, quit := cancelChannel(deadline)
+	close(quit)
+	if _, ok := <-cancel; ok {
+		t.Errorf("Expected channel to be closed")
+	}
+}
diff --git a/runtimes/google/ipc/stream/vc/reader.go b/runtimes/google/ipc/stream/vc/reader.go
index 9e26a14..f1b6f3a 100644
--- a/runtimes/google/ipc/stream/vc/reader.go
+++ b/runtimes/google/ipc/stream/vc/reader.go
@@ -21,12 +21,13 @@
 // reader implements the io.Reader and SetReadDeadline interfaces for a Flow,
 // backed by iobuf.Slice objects read from a upcqueue.
 type reader struct {
-	handler    readHandler
-	src        *upcqueue.T
-	mu         sync.Mutex
-	buf        *iobuf.Slice  // GUARDED_BY(mu)
-	deadline   chan struct{} // GUARDED_BY(mu)
-	totalBytes uint32
+	handler        readHandler
+	src            *upcqueue.T
+	mu             sync.Mutex
+	buf            *iobuf.Slice  // GUARDED_BY(mu)
+	deadline       chan struct{} // GUARDED_BY(mu)
+	cancelDeadline chan struct{} // GUARDED_BY(mu)
+	totalBytes     uint32
 }
 
 func newReader(h readHandler) *reader {
@@ -91,9 +92,13 @@
 // does not complete by the specified deadline.
 // A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
 func (r *reader) SetReadDeadline(t time.Time) error {
-	c := cancelChannel(t)
+	c, q := cancelChannel(t)
 	r.mu.Lock()
+	if r.cancelDeadline != nil {
+		close(r.cancelDeadline)
+	}
 	r.deadline = c
+	r.cancelDeadline = q
 	r.mu.Unlock()
 	return nil
 }
diff --git a/runtimes/google/ipc/stream/vc/writer.go b/runtimes/google/ipc/stream/vc/writer.go
index 383beed..5114cef 100644
--- a/runtimes/google/ipc/stream/vc/writer.go
+++ b/runtimes/google/ipc/stream/vc/writer.go
@@ -21,11 +21,12 @@
 	Alloc          *iobuf.Allocator // Allocator for iobuf.Slice objects. GUARDED_BY(mu)
 	SharedCounters *vsync.Semaphore // Semaphore hosting counters shared by all flows over a VC.
 
-	mu        sync.Mutex    // Guards call to Writes
-	wroteOnce bool          // GUARDED_BY(mu)
-	deadline  chan struct{} // GUARDED_BY(mu)
-	isClosed  bool          // GUARDED_BY(mu)
-	closed    chan struct{} // GUARDED_BY(mu)
+	mu             sync.Mutex    // Guards call to Writes
+	wroteOnce      bool          // GUARDED_BY(mu)
+	deadline       chan struct{} // GUARDED_BY(mu)
+	cancelDeadline chan struct{} // GUARDED_BY(mu)
+	isClosed       bool          // GUARDED_BY(mu)
+	closed         chan struct{} // GUARDED_BY(mu)
 
 	// Total number of bytes filled in by all Write calls on this writer.
 	// Atomic operations are used to manipulate it.
@@ -141,9 +142,13 @@
 // does not complete by the specified deadline.
 // A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
 func (w *writer) SetWriteDeadline(t time.Time) error {
-	c := cancelChannel(t)
+	c, q := cancelChannel(t)
 	w.mu.Lock()
+	if w.cancelDeadline != nil {
+		close(w.cancelDeadline)
+	}
 	w.deadline = c
+	w.cancelDeadline = q
 	w.mu.Unlock()
 	return nil
 }