manager/conncache: Fix deadlocked conncache.
(1) Ensure that RemoteDischarges calls unblock when the connection is closed.
(2) Release the ConnCache lock when calling AuthorizePeer since it can now
hang if a remote end never sends Discharges.
- note that the implementation results in a few unnecessary locks, but
the alternative increased code complexity tremendously.
Change-Id: Ie372a6cae61eca4c776190dd4a637478a1dc9ce5
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index ef49528..b5f6e57 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -451,8 +451,7 @@
// It may happen that in the case of bidirectional RPC the dialer of the connection
// has sent blessings, but not yet discharges. In this case we will wait for them
// to send the discharges before allowing a bidirectional flow dial.
- if len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 {
- valid := c.remoteValid
+ if valid := c.remoteValid; valid != nil && len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 {
c.mu.Unlock()
<-valid
c.mu.Lock()
@@ -518,8 +517,7 @@
// It may happen that in the case of bidirectional RPC the dialer of the connection
// has sent blessings, but not yet discharges. In this case we will wait for them
// to send the discharges instead of returning the initial nil discharges.
- if len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 {
- valid := c.remoteValid
+ if valid := c.remoteValid; valid != nil && len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 {
c.mu.Unlock()
<-valid
c.mu.Lock()
@@ -629,6 +627,10 @@
close(c.lameDucked)
}
c.status = Closing
+ if c.remoteValid != nil {
+ close(c.remoteValid)
+ c.remoteValid = nil
+ }
go func(c *Conn) {
if c.hcstate != nil {
@@ -845,8 +847,10 @@
c.mu.Lock()
c.remoteBlessings = blessings
c.remoteDischarges = discharges
- close(c.remoteValid)
- c.remoteValid = make(chan struct{})
+ if c.remoteValid != nil {
+ close(c.remoteValid)
+ c.remoteValid = make(chan struct{})
+ }
c.mu.Unlock()
default:
return NewErrUnexpectedMsg(ctx, reflect.TypeOf(msg).String())
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index a8dabe3..7871202 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -114,15 +114,15 @@
// Find returns a Conn based only on the RoutingID of remote.
func (c *ConnCache) FindWithRoutingID(ctx *context.T, remote naming.Endpoint,
- auth flow.PeerAuthorizer) (entry *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
+ auth flow.PeerAuthorizer) (conn *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return nil, nil, nil, NewErrCacheClosed(nil)
}
if rid := remote.RoutingID(); rid != naming.NullRoutingID {
- if entry, names, rejected := c.removeUndialable(ctx, remote, c.ridCache[rid], auth); entry != nil {
- return entry, names, rejected, nil
+ if conn, names, rejected := c.removeAndFilterConnBreaksCriticalSection(ctx, remote, c.ridCache[rid], auth); conn != nil {
+ return conn, names, rejected, nil
}
}
return nil, nil, nil, nil
@@ -139,15 +139,15 @@
// until the corresponding Unreserve call is made.
// p is used to check the cache for resolved protocols.
func (c *ConnCache) Find(ctx *context.T, remote naming.Endpoint, network, address string, auth flow.PeerAuthorizer,
- p flow.Protocol) (entry *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
+ p flow.Protocol) (conn *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return nil, nil, nil, NewErrCacheClosed(nil)
}
if rid := remote.RoutingID(); rid != naming.NullRoutingID {
- if entry, names, rejected := c.removeUndialable(ctx, remote, c.ridCache[rid], auth); entry != nil {
- return entry, names, rejected, nil
+ if conn, names, rejected := c.removeAndFilterConnBreaksCriticalSection(ctx, remote, c.ridCache[rid], auth); conn != nil {
+ return conn, names, rejected, nil
}
}
k := key(network, address)
@@ -158,15 +158,14 @@
}
}
c.started[k] = true
- entry, names, rejected = c.removeUndialable(ctx, remote, c.addrCache[k], auth)
- if entry != nil {
- return entry, names, rejected, nil
+ if conn, names, rejected = c.removeAndFilterConnBreaksCriticalSection(ctx, remote, c.addrCache[k], auth); conn != nil {
+ return conn, names, rejected, nil
}
return c.findResolvedLocked(ctx, remote, network, address, auth, p)
}
func (c *ConnCache) findResolvedLocked(ctx *context.T, remote naming.Endpoint, unresNetwork string, unresAddress string,
- auth flow.PeerAuthorizer, p flow.Protocol) (entry *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
+ auth flow.PeerAuthorizer, p flow.Protocol) (conn *conn.Conn, names []string, rejected []security.RejectedBlessing, err error) {
network, addresses, err := resolve(ctx, p, unresNetwork, unresAddress)
if err != nil {
c.unreserveLocked(unresNetwork, unresAddress)
@@ -174,9 +173,8 @@
}
for _, address := range addresses {
k := key(network, address)
- entry, names, rejected = c.removeUndialable(ctx, remote, c.addrCache[k], auth)
- if entry != nil {
- return entry, names, rejected, nil
+ if conn, names, rejected = c.removeAndFilterConnBreaksCriticalSection(ctx, remote, c.addrCache[k], auth); conn != nil {
+ return conn, names, rejected, nil
}
}
// No entries for any of the addresses were in the cache.
@@ -218,14 +216,24 @@
c.iterateOnConnsLocked(ctx, func(e *connEntry) { e.conn.Close(ctx, err) })
}
-// removeUndialable removes connections that are:
+// TODO(suharshs): This function starts and ends holding the lock, but releases the lock
+// during its execution.
+func (c *ConnCache) removeAndFilterConnBreaksCriticalSection(ctx *context.T, remote naming.Endpoint, e *connEntry, auth flow.PeerAuthorizer) (*conn.Conn, []string, []security.RejectedBlessing) {
+ if c.removeUndialableLocked(e) {
+ return nil, nil, nil
+ }
+ defer c.mu.Lock()
+ c.mu.Unlock()
+ return c.filterUnauthorized(ctx, remote, e, auth)
+}
+
+// removeUndialableLocked removes connections that are:
// - closed
// - lameducked
-// and filters connections that are:
-// - non-proxied and fail to authorize.
-func (c *ConnCache) removeUndialable(ctx *context.T, remote naming.Endpoint, e *connEntry, auth flow.PeerAuthorizer) (*conn.Conn, []string, []security.RejectedBlessing) {
+// returns true if the connection was removed.
+func (c *ConnCache) removeUndialableLocked(e *connEntry) bool {
if e == nil {
- return nil, nil, nil
+ return true
}
if status := e.conn.Status(); status >= conn.Closing || e.conn.RemoteLameDuck() {
delete(c.addrCache, e.addrKey)
@@ -233,8 +241,19 @@
if status < conn.Closing {
c.unmappedConns[e] = true
}
- return nil, nil, nil
+ return true
}
+ return false
+}
+
+// filterUnauthorized connections that are non-proxied and fail to authorize.
+// We only filter unauthorized connections here, rather than removing them from
+// the cache because a connection may authorize in regards to another authorizer
+// for a different RPC call.
+// IMPORTANT: ConnCache.mu.lock should not be held during this function because
+// AuthorizePeer calls conn.RemoteDischarges which can hang if a misbehaving
+// client fails to send its discharges.
+func (c *ConnCache) filterUnauthorized(ctx *context.T, remote naming.Endpoint, e *connEntry, auth flow.PeerAuthorizer) (*conn.Conn, []string, []security.RejectedBlessing) {
if !e.proxy && auth != nil {
names, rejected, err := auth.AuthorizePeer(ctx,
e.conn.LocalEndpoint(),
@@ -284,7 +303,7 @@
func (c *ConnCache) removeUndialableConnsLocked(ctx *context.T) {
for _, e := range c.ridCache {
- c.removeUndialable(ctx, nil, e, nil)
+ c.removeUndialableLocked(e)
}
for d := range c.unmappedConns {
if status := d.conn.Status(); status == conn.Closed {