veyron/runtimes/google/ipc: Impelement a better fix for cancellation goroutine leak.

This version prevents spawning many goroutines in the first place by reusing
the contexts cancel channel.  Also fix a goroutine leak when a single
context is used to make a large number of calls over a long period of time
on the client side.

Change-Id: I29fdc6b290b4bbad2ea82c0b399c7deee9dff6f9
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 38404b7..57d90dc 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -630,6 +630,12 @@
 
 func (fs *flowServer) serve() error {
 	defer fs.flow.Close()
+	// Here we remove the contexts channel as a deadline to the flow.
+	// We do this to ensure clients get a consistent error when they read/write
+	// after the flow is closed.  Otherwise there is a race between the
+	// context cancellation and the flow being closed.
+	defer fs.flow.SetDeadline(nil)
+
 	results, err := fs.processRequest()
 
 	var traceResponse vtrace.Response
@@ -671,45 +677,47 @@
 	return nil
 }
 
-func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
-	start := time.Now()
+func (fs *flowServer) readIPCRequest() (*ipc.Request, verror.E) {
 	// Set a default timeout before reading from the flow. Without this timeout,
 	// a client that sends no request or a partial request will retain the flow
 	// indefinitely (and lock up server resources).
-	deadline := start.Add(defaultCallTimeout)
-	if verr := fs.setDeadline(deadline); verr != nil {
-		return nil, verr
-	}
+	initTimer := newTimer(defaultCallTimeout)
+	defer initTimer.Stop()
+	fs.flow.SetDeadline(initTimer.C)
+
 	// Decode the initial request.
 	var req ipc.Request
 	if err := fs.dec.Decode(&req); err != nil {
 		return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
 	}
+	return &req, nil
+}
+
+func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
+	start := time.Now()
+
+	req, verr := fs.readIPCRequest()
+	if verr != nil {
+		return nil, verr
+	}
 	fs.method = req.Method
+
 	// TODO(mattr): Currently this allows users to trigger trace collection
 	// on the server even if they will not be allowed to collect the
 	// results later.  This might be consider a DOS vector.
 	spanName := fmt.Sprintf("Server Call: %s.%s", fs.Name(), fs.Method())
 	fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
 
-	// Set the appropriate deadline, if specified.
-	if req.Timeout == ipc.NoTimeout {
-		deadline = time.Time{}
-	} else if req.Timeout > 0 {
-		deadline = start.Add(time.Duration(req.Timeout))
-	}
-	if verr := fs.setDeadline(deadline); verr != nil {
-		return nil, verr
-	}
-
 	var cancel context.CancelFunc
-	if !deadline.IsZero() {
-		fs.T, cancel = fs.WithDeadline(deadline)
+	if req.Timeout != ipc.NoTimeout {
+		fs.T, cancel = fs.WithDeadline(start.Add(time.Duration(req.Timeout)))
 	} else {
 		fs.T, cancel = fs.WithCancel()
 	}
+	fs.flow.SetDeadline(fs.Done())
 
-	// Notify the context when the channel is closed.
+	// Ensure that the context gets cancelled if the flow is closed
+	// due to a network error, or client cancellation.
 	go func() {
 		<-fs.flow.Closed()
 		cancel()
@@ -841,16 +849,6 @@
 	return vsecurity.NewACLAuthorizer(defaultACL(dc.LocalID())).Authorize(dc)
 }
 
-// setDeadline sets a deadline on the flow. The flow will be cancelled if it
-// is not closed by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (fs *flowServer) setDeadline(deadline time.Time) verror.E {
-	if err := fs.flow.SetDeadline(deadline); err != nil {
-		return verror.Internalf("ipc: flow SetDeadline failed: %v", err)
-	}
-	return nil
-}
-
 // Send implements the ipc.Stream method.
 func (fs *flowServer) Send(item interface{}) error {
 	defer vlog.LogCall()()