runtime/internal/flow/conn: Implement flow cancelation.

Change-Id: Icbcaf895dadc913785258221d393b9a78440276b
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index ebe4535..8cd4b6a 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -14,6 +14,7 @@
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
 	"v.io/v23/security"
+	"v.io/v23/verror"
 
 	"v.io/x/ref/runtime/internal/flow/flowcontrol"
 )
@@ -155,18 +156,29 @@
 		// We've already torn this conn down.
 		return
 	}
+	ferr := err
+	if verror.ErrorID(err) == ErrConnClosedRemotely.ID {
+		ferr = NewErrFlowClosedRemotely(ctx)
+	} else {
+		message := ""
+		if err != nil {
+			message = err.Error()
+		}
+		cerr := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
+			return 0, true, c.mp.writeMsg(ctx, &tearDown{Message: message})
+		})
+		if cerr != nil {
+			ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, cerr)
+		}
+	}
 	for _, f := range flows {
-		f.close(err)
+		f.close(ctx, ferr)
 	}
-	err = c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
-		return 0, true, c.mp.writeMsg(ctx, &tearDown{Err: err})
-	})
-	if err != nil {
-		ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, err)
+	if cerr := c.mp.close(); cerr != nil {
+		ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
 	}
-	if err = c.mp.close(); err != nil {
-		ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, err)
-	}
+
+	// TODO(mattr): ensure the readLoop is finished before closing this.
 	close(c.closed)
 }
 
@@ -207,7 +219,7 @@
 
 		switch msg := x.(type) {
 		case *tearDown:
-			terr = msg.Err
+			terr = NewErrConnClosedRemotely(ctx, msg.Message)
 			return
 
 		case *openFlow:
@@ -244,7 +256,7 @@
 				return
 			}
 			if msg.flags&closeFlag != 0 {
-				f.close(nil)
+				f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
 			}
 
 		case *unencryptedData:
@@ -259,7 +271,7 @@
 				return
 			}
 			if msg.flags&closeFlag != 0 {
-				f.close(nil)
+				f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
 			}
 
 		default: