manager/conncache: Fix deadlocked conncache.

(1) Ensure that RemoteDischarges calls unblock when the connection is closed.
(2) Release the ConnCache lock when calling AuthorizePeer since it can now
    hang if a remote end never sends Discharges.
    - note that the implementation results in a few unnecessary locks, but
      the alternative increased code complexity tremendously.

Change-Id: Ie372a6cae61eca4c776190dd4a637478a1dc9ce5
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index ef49528..b5f6e57 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -451,8 +451,7 @@
 	// It may happen that in the case of bidirectional RPC the dialer of the connection
 	// has sent blessings,  but not yet discharges.  In this case we will wait for them
 	// to send the discharges before allowing a bidirectional flow dial.
-	if len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 {
-		valid := c.remoteValid
+	if valid := c.remoteValid; valid != nil && len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 {
 		c.mu.Unlock()
 		<-valid
 		c.mu.Lock()
@@ -518,8 +517,7 @@
 	// It may happen that in the case of bidirectional RPC the dialer of the connection
 	// has sent blessings,  but not yet discharges.  In this case we will wait for them
 	// to send the discharges instead of returning the initial nil discharges.
-	if len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 {
-		valid := c.remoteValid
+	if valid := c.remoteValid; valid != nil && len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 {
 		c.mu.Unlock()
 		<-valid
 		c.mu.Lock()
@@ -629,6 +627,10 @@
 		close(c.lameDucked)
 	}
 	c.status = Closing
+	if c.remoteValid != nil {
+		close(c.remoteValid)
+		c.remoteValid = nil
+	}
 
 	go func(c *Conn) {
 		if c.hcstate != nil {
@@ -845,8 +847,10 @@
 		c.mu.Lock()
 		c.remoteBlessings = blessings
 		c.remoteDischarges = discharges
-		close(c.remoteValid)
-		c.remoteValid = make(chan struct{})
+		if c.remoteValid != nil {
+			close(c.remoteValid)
+			c.remoteValid = make(chan struct{})
+		}
 		c.mu.Unlock()
 	default:
 		return NewErrUnexpectedMsg(ctx, reflect.TypeOf(msg).String())
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index a8dabe3..7871202 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -114,15 +114,15 @@
 
 // Find returns a Conn based only on the RoutingID of remote.
 func (c *ConnCache) FindWithRoutingID(ctx *context.T, remote naming.Endpoint,
-	auth flow.PeerAuthorizer) (entry *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
+	auth flow.PeerAuthorizer) (conn *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	if c.addrCache == nil {
 		return nil, nil, nil, NewErrCacheClosed(nil)
 	}
 	if rid := remote.RoutingID(); rid != naming.NullRoutingID {
-		if entry, names, rejected := c.removeUndialable(ctx, remote, c.ridCache[rid], auth); entry != nil {
-			return entry, names, rejected, nil
+		if conn, names, rejected := c.removeAndFilterConnBreaksCriticalSection(ctx, remote, c.ridCache[rid], auth); conn != nil {
+			return conn, names, rejected, nil
 		}
 	}
 	return nil, nil, nil, nil
@@ -139,15 +139,15 @@
 // until the corresponding Unreserve call is made.
 // p is used to check the cache for resolved protocols.
 func (c *ConnCache) Find(ctx *context.T, remote naming.Endpoint, network, address string, auth flow.PeerAuthorizer,
-	p flow.Protocol) (entry *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
+	p flow.Protocol) (conn *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	if c.addrCache == nil {
 		return nil, nil, nil, NewErrCacheClosed(nil)
 	}
 	if rid := remote.RoutingID(); rid != naming.NullRoutingID {
-		if entry, names, rejected := c.removeUndialable(ctx, remote, c.ridCache[rid], auth); entry != nil {
-			return entry, names, rejected, nil
+		if conn, names, rejected := c.removeAndFilterConnBreaksCriticalSection(ctx, remote, c.ridCache[rid], auth); conn != nil {
+			return conn, names, rejected, nil
 		}
 	}
 	k := key(network, address)
@@ -158,15 +158,14 @@
 		}
 	}
 	c.started[k] = true
-	entry, names, rejected = c.removeUndialable(ctx, remote, c.addrCache[k], auth)
-	if entry != nil {
-		return entry, names, rejected, nil
+	if conn, names, rejected = c.removeAndFilterConnBreaksCriticalSection(ctx, remote, c.addrCache[k], auth); conn != nil {
+		return conn, names, rejected, nil
 	}
 	return c.findResolvedLocked(ctx, remote, network, address, auth, p)
 }
 
 func (c *ConnCache) findResolvedLocked(ctx *context.T, remote naming.Endpoint, unresNetwork string, unresAddress string,
-	auth flow.PeerAuthorizer, p flow.Protocol) (entry *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
+	auth flow.PeerAuthorizer, p flow.Protocol) (conn *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
 	network, addresses, err := resolve(ctx, p, unresNetwork, unresAddress)
 	if err != nil {
 		c.unreserveLocked(unresNetwork, unresAddress)
@@ -174,9 +173,8 @@
 	}
 	for _, address := range addresses {
 		k := key(network, address)
-		entry, names, rejected = c.removeUndialable(ctx, remote, c.addrCache[k], auth)
-		if entry != nil {
-			return entry, names, rejected, nil
+		if conn, names, rejected = c.removeAndFilterConnBreaksCriticalSection(ctx, remote, c.addrCache[k], auth); conn != nil {
+			return conn, names, rejected, nil
 		}
 	}
 	// No entries for any of the addresses were in the cache.
@@ -218,14 +216,24 @@
 	c.iterateOnConnsLocked(ctx, func(e *connEntry) { e.conn.Close(ctx, err) })
 }
 
-// removeUndialable removes connections that are:
+// TODO(suharshs): This function starts and ends holding the lock, but releases the lock
+// during its execution.
+func (c *ConnCache) removeAndFilterConnBreaksCriticalSection(ctx *context.T, remote naming.Endpoint, e *connEntry, auth flow.PeerAuthorizer) (*conn.Conn, []string, []security.RejectedBlessing) {
+	if c.removeUndialableLocked(e) {
+		return nil, nil, nil
+	}
+	defer c.mu.Lock()
+	c.mu.Unlock()
+	return c.filterUnauthorized(ctx, remote, e, auth)
+}
+
+// removeUndialableLocked removes connections that are:
 // - closed
 // - lameducked
-// and filters connections that are:
-// - non-proxied and fail to authorize.
-func (c *ConnCache) removeUndialable(ctx *context.T, remote naming.Endpoint, e *connEntry, auth flow.PeerAuthorizer) (*conn.Conn, []string, []security.RejectedBlessing) {
+// returns true if the connection was removed.
+func (c *ConnCache) removeUndialableLocked(e *connEntry) bool {
 	if e == nil {
-		return nil, nil, nil
+		return true
 	}
 	if status := e.conn.Status(); status >= conn.Closing || e.conn.RemoteLameDuck() {
 		delete(c.addrCache, e.addrKey)
@@ -233,8 +241,19 @@
 		if status < conn.Closing {
 			c.unmappedConns[e] = true
 		}
-		return nil, nil, nil
+		return true
 	}
+	return false
+}
+
+// filterUnauthorized connections that are non-proxied and fail to authorize.
+// We only filter unauthorized connections here, rather than removing them from
+// the cache because a connection may authorize in regards to another authorizer
+// for a different RPC call.
+// IMPORTANT: ConnCache.mu.lock should not be held during this function because
+// AuthorizePeer calls conn.RemoteDischarges which can hang if a misbehaving
+// client fails to send its discharges.
+func (c *ConnCache) filterUnauthorized(ctx *context.T, remote naming.Endpoint, e *connEntry, auth flow.PeerAuthorizer) (*conn.Conn, []string, []security.RejectedBlessing) {
 	if !e.proxy && auth != nil {
 		names, rejected, err := auth.AuthorizePeer(ctx,
 			e.conn.LocalEndpoint(),
@@ -284,7 +303,7 @@
 
 func (c *ConnCache) removeUndialableConnsLocked(ctx *context.T) {
 	for _, e := range c.ridCache {
-		c.removeUndialable(ctx, nil, e, nil)
+		c.removeUndialableLocked(e)
 	}
 	for d := range c.unmappedConns {
 		if status := d.conn.Status(); status == conn.Closed {