Merge "rpc/discovery: support permission"
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index dc3ecd4..02bcc8c 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -150,7 +150,6 @@
 			return err
 		}
 	}
-	var rPublicKey security.PublicKey
 	if rauth.BlessingsKey != 0 {
 		var err error
 		// TODO(mattr): Make sure we cancel out of this at some point.
@@ -158,14 +157,14 @@
 		if err != nil {
 			return err
 		}
-		rPublicKey = c.rBlessings.PublicKey()
+		c.rPublicKey = c.rBlessings.PublicKey()
 	} else {
-		rPublicKey = rauth.PublicKey
+		c.rPublicKey = rauth.PublicKey
 	}
-	if rPublicKey == nil {
+	if c.rPublicKey == nil {
 		return NewErrNoPublicKey(ctx)
 	}
-	if !rauth.ChannelBinding.Verify(rPublicKey, binding) {
+	if !rauth.ChannelBinding.Verify(c.rPublicKey, binding) {
 		return NewErrInvalidChannelBinding(ctx)
 	}
 	return nil
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 5a6e248..17475a5 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -67,6 +67,7 @@
 	mp                     *messagePipe
 	version                version.RPCVersion
 	lBlessings, rBlessings security.Blessings
+	rPublicKey             security.PublicKey
 	local, remote          naming.Endpoint
 	closed                 chan struct{}
 	blessingsFlow          *blessingsFlow
@@ -361,6 +362,14 @@
 		c.borrowing[msg.ID] = true
 		c.mu.Unlock()
 
+		rBlessings, _, err := c.blessingsFlow.get(ctx, msg.BlessingsKey, msg.DischargeKey)
+		if err != nil {
+			return err
+		}
+		if !reflect.DeepEqual(rBlessings.PublicKey(), c.rPublicKey) {
+			return NewErrBlessingsNotBound(ctx)
+		}
+
 		handler.HandleFlow(f)
 		if err := f.q.put(ctx, msg.Payload); err != nil {
 			return err
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 42d092f..42b56bf 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -25,4 +25,5 @@
   DialingNonServer() {"en": "You are attempting to dial on a connection with no remote server."}
   AcceptorBlessingsMissing() {"en": "The acceptor did not send blessings."}
   UpdatingNilFlowHandler() {"en": "nil flowHandler cannot be updated to non-nil value."}
+  BlessingsNotBound() {"en": "blessings not bound to connection remote public key"}
 )
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index ea7a30e..506f283 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -29,6 +29,7 @@
 	ErrDialingNonServer         = verror.Register("v.io/x/ref/runtime/internal/flow/conn.DialingNonServer", verror.NoRetry, "{1:}{2:} You are attempting to dial on a connection with no remote server.")
 	ErrAcceptorBlessingsMissing = verror.Register("v.io/x/ref/runtime/internal/flow/conn.AcceptorBlessingsMissing", verror.NoRetry, "{1:}{2:} The acceptor did not send blessings.")
 	ErrUpdatingNilFlowHandler   = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UpdatingNilFlowHandler", verror.NoRetry, "{1:}{2:} nil flowHandler cannot be updated to non-nil value.")
+	ErrBlessingsNotBound        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.BlessingsNotBound", verror.NoRetry, "{1:}{2:} blessings not bound to connection remote public key")
 )
 
 func init() {
@@ -46,6 +47,7 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDialingNonServer.ID), "{1:}{2:} You are attempting to dial on a connection with no remote server.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptorBlessingsMissing.ID), "{1:}{2:} The acceptor did not send blessings.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUpdatingNilFlowHandler.ID), "{1:}{2:} nil flowHandler cannot be updated to non-nil value.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBlessingsNotBound.ID), "{1:}{2:} blessings not bound to connection remote public key")
 }
 
 // NewErrMissingSetupOption returns an error with the ErrMissingSetupOption ID.
@@ -117,3 +119,8 @@
 func NewErrUpdatingNilFlowHandler(ctx *context.T) error {
 	return verror.New(ErrUpdatingNilFlowHandler, ctx)
 }
+
+// NewErrBlessingsNotBound returns an error with the ErrBlessingsNotBound ID.
+func NewErrBlessingsNotBound(ctx *context.T) error {
+	return verror.New(ErrBlessingsNotBound, ctx)
+}
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index b3975e2..257b681 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -19,11 +19,12 @@
 // Multiple goroutines can invoke methods on the ConnCache simultaneously.
 // TODO(suharshs): We should periodically look for closed connections and remove them.
 type ConnCache struct {
-	mu        *sync.Mutex
-	addrCache map[string]*connEntry           // keyed by (protocol, address, blessingNames)
-	ridCache  map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
-	started   map[string]bool                 // keyed by (protocol, address, blessingNames)
-	cond      *sync.Cond
+	mu            *sync.Mutex
+	cond          *sync.Cond
+	addrCache     map[string]*connEntry           // keyed by (protocol, address, blessingNames)
+	ridCache      map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
+	started       map[string]bool                 // keyed by (protocol, address, blessingNames)
+	unmappedConns map[*connEntry]bool             // list of connEntries replaced by other entries
 }
 
 type connEntry struct {
@@ -36,11 +37,12 @@
 	mu := &sync.Mutex{}
 	cond := sync.NewCond(mu)
 	return &ConnCache{
-		mu:        mu,
-		addrCache: make(map[string]*connEntry),
-		ridCache:  make(map[naming.RoutingID]*connEntry),
-		started:   make(map[string]bool),
-		cond:      cond,
+		mu:            mu,
+		cond:          cond,
+		addrCache:     make(map[string]*connEntry),
+		ridCache:      make(map[naming.RoutingID]*connEntry),
+		started:       make(map[string]bool),
+		unmappedConns: make(map[*connEntry]bool),
 	}
 }
 
@@ -101,6 +103,9 @@
 		rid:     ep.RoutingID(),
 		addrKey: k,
 	}
+	if old := c.ridCache[entry.rid]; old != nil {
+		c.unmappedConns[old] = true
+	}
 	c.addrCache[k] = entry
 	c.ridCache[entry.rid] = entry
 	return nil
@@ -117,6 +122,9 @@
 		conn: conn,
 		rid:  conn.RemoteEndpoint().RoutingID(),
 	}
+	if old := c.ridCache[entry.rid]; old != nil {
+		c.unmappedConns[old] = true
+	}
 	c.ridCache[entry.rid] = entry
 	return nil
 }
@@ -126,8 +134,12 @@
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	c.addrCache, c.started = nil, nil
+	err := NewErrCacheClosed(ctx)
 	for _, d := range c.ridCache {
-		d.conn.Close(ctx, NewErrCacheClosed(ctx))
+		d.conn.Close(ctx, err)
+	}
+	for d := range c.unmappedConns {
+		d.conn.Close(ctx, err)
 	}
 }
 
@@ -157,12 +169,23 @@
 		}
 		pq = append(pq, e)
 	}
+	for d := range c.unmappedConns {
+		if isClosed(d.conn) {
+			delete(c.unmappedConns, d)
+			continue
+		}
+		if d.conn.IsEncapsulated() {
+			continue
+		}
+		pq = append(pq, d)
+	}
 	sort.Sort(pq)
 	for i := 0; i < num; i++ {
 		d := pq[i]
 		d.conn.Close(ctx, err)
 		delete(c.addrCache, d.addrKey)
 		delete(c.ridCache, d.rid)
+		delete(c.unmappedConns, d)
 	}
 	return nil
 }
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 8d02e9a..c66afa1 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -114,17 +114,27 @@
 		t.Errorf("got %v, want %v", cachedConn, otherConn)
 	}
 
+	// Insert a duplicate conn to ensure that replaced conns still get closed.
+	dupConn := makeConnAndFlow(t, ctx, remote).c
+	if err := c.Insert(dupConn); err != nil {
+		t.Fatal(err)
+	}
+
 	// Closing the cache should close all the connections in the cache.
 	// Ensure that the conns are not closed yet.
 	if isClosed(conn) {
-		t.Fatalf("wanted conn to not be closed")
+		t.Fatal("wanted conn to not be closed")
+	}
+	if isClosed(dupConn) {
+		t.Fatal("wanted dupConn to not be closed")
 	}
 	if isClosed(otherConn) {
-		t.Fatalf("wanted otherConn to not be closed")
+		t.Fatal("wanted otherConn to not be closed")
 	}
 	c.Close(ctx)
 	// Now the connections should be closed.
 	<-conn.Closed()
+	<-dupConn.Closed()
 	<-otherConn.Closed()
 }