runtime/internal/flow/conn: Implement flow cancelation.
Change-Id: Icbcaf895dadc913785258221d393b9a78440276b
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index ebe4535..8cd4b6a 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -14,6 +14,7 @@
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
+ "v.io/v23/verror"
"v.io/x/ref/runtime/internal/flow/flowcontrol"
)
@@ -155,18 +156,29 @@
// We've already torn this conn down.
return
}
+ ferr := err
+ if verror.ErrorID(err) == ErrConnClosedRemotely.ID {
+ ferr = NewErrFlowClosedRemotely(ctx)
+ } else {
+ message := ""
+ if err != nil {
+ message = err.Error()
+ }
+ cerr := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+ return 0, true, c.mp.writeMsg(ctx, &tearDown{Message: message})
+ })
+ if cerr != nil {
+ ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, cerr)
+ }
+ }
for _, f := range flows {
- f.close(err)
+ f.close(ctx, ferr)
}
- err = c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
- return 0, true, c.mp.writeMsg(ctx, &tearDown{Err: err})
- })
- if err != nil {
- ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, err)
+ if cerr := c.mp.close(); cerr != nil {
+ ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
}
- if err = c.mp.close(); err != nil {
- ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, err)
- }
+
+ // TODO(mattr): ensure the readLoop is finished before closing this.
close(c.closed)
}
@@ -207,7 +219,7 @@
switch msg := x.(type) {
case *tearDown:
- terr = msg.Err
+ terr = NewErrConnClosedRemotely(ctx, msg.Message)
return
case *openFlow:
@@ -244,7 +256,7 @@
return
}
if msg.flags&closeFlag != 0 {
- f.close(nil)
+ f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
}
case *unencryptedData:
@@ -259,7 +271,7 @@
return
}
if msg.flags&closeFlag != 0 {
- f.close(nil)
+ f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
}
default: