veyron/runtimes/google/ipc: Impelement a better fix for cancellation goroutine leak.
This version prevents spawning many goroutines in the first place by reusing
the contexts cancel channel. Also fix a goroutine leak when a single
context is used to make a large number of calls over a long period of time
on the client side.
Change-Id: I29fdc6b290b4bbad2ea82c0b399c7deee9dff6f9
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 38404b7..57d90dc 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -630,6 +630,12 @@
func (fs *flowServer) serve() error {
defer fs.flow.Close()
+ // Here we remove the contexts channel as a deadline to the flow.
+ // We do this to ensure clients get a consistent error when they read/write
+ // after the flow is closed. Otherwise there is a race between the
+ // context cancellation and the flow being closed.
+ defer fs.flow.SetDeadline(nil)
+
results, err := fs.processRequest()
var traceResponse vtrace.Response
@@ -671,45 +677,47 @@
return nil
}
-func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
- start := time.Now()
+func (fs *flowServer) readIPCRequest() (*ipc.Request, verror.E) {
// Set a default timeout before reading from the flow. Without this timeout,
// a client that sends no request or a partial request will retain the flow
// indefinitely (and lock up server resources).
- deadline := start.Add(defaultCallTimeout)
- if verr := fs.setDeadline(deadline); verr != nil {
- return nil, verr
- }
+ initTimer := newTimer(defaultCallTimeout)
+ defer initTimer.Stop()
+ fs.flow.SetDeadline(initTimer.C)
+
// Decode the initial request.
var req ipc.Request
if err := fs.dec.Decode(&req); err != nil {
return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
}
+ return &req, nil
+}
+
+func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
+ start := time.Now()
+
+ req, verr := fs.readIPCRequest()
+ if verr != nil {
+ return nil, verr
+ }
fs.method = req.Method
+
// TODO(mattr): Currently this allows users to trigger trace collection
// on the server even if they will not be allowed to collect the
// results later. This might be consider a DOS vector.
spanName := fmt.Sprintf("Server Call: %s.%s", fs.Name(), fs.Method())
fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
- // Set the appropriate deadline, if specified.
- if req.Timeout == ipc.NoTimeout {
- deadline = time.Time{}
- } else if req.Timeout > 0 {
- deadline = start.Add(time.Duration(req.Timeout))
- }
- if verr := fs.setDeadline(deadline); verr != nil {
- return nil, verr
- }
-
var cancel context.CancelFunc
- if !deadline.IsZero() {
- fs.T, cancel = fs.WithDeadline(deadline)
+ if req.Timeout != ipc.NoTimeout {
+ fs.T, cancel = fs.WithDeadline(start.Add(time.Duration(req.Timeout)))
} else {
fs.T, cancel = fs.WithCancel()
}
+ fs.flow.SetDeadline(fs.Done())
- // Notify the context when the channel is closed.
+ // Ensure that the context gets cancelled if the flow is closed
+ // due to a network error, or client cancellation.
go func() {
<-fs.flow.Closed()
cancel()
@@ -841,16 +849,6 @@
return vsecurity.NewACLAuthorizer(defaultACL(dc.LocalID())).Authorize(dc)
}
-// setDeadline sets a deadline on the flow. The flow will be cancelled if it
-// is not closed by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (fs *flowServer) setDeadline(deadline time.Time) verror.E {
- if err := fs.flow.SetDeadline(deadline); err != nil {
- return verror.Internalf("ipc: flow SetDeadline failed: %v", err)
- }
- return nil
-}
-
// Send implements the ipc.Stream method.
func (fs *flowServer) Send(item interface{}) error {
defer vlog.LogCall()()