Merge "flow manager: The conncache should run the users actual authorizer against cached connections instead of just trying to check blessing patterns in the remote endpoints.."
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index dd9f70a..a479aec 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -406,6 +406,12 @@
 	return blessings
 }
 
+func (c *Conn) RemoteDischarges() map[string]security.Discharge {
+	// Its safe to ignore this error. It means that this conn is closed.
+	_, discharges, _ := c.blessingsFlow.getLatestRemote(nil, c.rBKey)
+	return discharges
+}
+
 // CommonVersion returns the RPCVersion negotiated between the local and remote endpoints.
 func (c *Conn) CommonVersion() version.RPCVersion { return c.version }
 
diff --git a/runtime/internal/flow/flowtest/flowtest.go b/runtime/internal/flow/flowtest/flowtest.go
index 5c3a996..5685a03 100644
--- a/runtime/internal/flow/flowtest/flowtest.go
+++ b/runtime/internal/flow/flowtest/flowtest.go
@@ -5,6 +5,7 @@
 package flowtest
 
 import (
+	"fmt"
 	"testing"
 	"time"
 
@@ -53,3 +54,42 @@
 	security.Blessings, map[string]security.Discharge, error) {
 	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
 }
+
+type peersAuthorizer []string
+
+func NewPeerAuthorizer(names []string) flow.PeerAuthorizer {
+	if len(names) == 0 {
+		return AllowAllPeersAuthorizer{}
+	}
+	return peersAuthorizer(names)
+}
+
+func (p peersAuthorizer) AuthorizePeer(
+	ctx *context.T,
+	localEndpoint, remoteEndpoint naming.Endpoint,
+	remoteBlessings security.Blessings,
+	remoteDischarges map[string]security.Discharge,
+) ([]string, []security.RejectedBlessing, error) {
+	call := security.NewCall(&security.CallParams{
+		Timestamp:        time.Now(),
+		LocalPrincipal:   v23.GetPrincipal(ctx),
+		LocalEndpoint:    localEndpoint,
+		RemoteBlessings:  remoteBlessings,
+		RemoteDischarges: remoteDischarges,
+		RemoteEndpoint:   remoteEndpoint,
+	})
+	peerNames, rejectedPeerNames := security.RemoteBlessingNames(ctx, call)
+	ctx.Infof("validating against %v", peerNames)
+	for _, pattern := range p {
+		if security.BlessingPattern(pattern).MatchedBy(peerNames...) {
+			return peerNames, rejectedPeerNames, nil
+		}
+		ctx.Infof("no match %v", pattern)
+	}
+	return peerNames, rejectedPeerNames, fmt.Errorf("not authorized")
+}
+
+func (peersAuthorizer) BlessingsForPeer(ctx *context.T, _ []string) (
+	security.Blessings, map[string]security.Discharge, error) {
+	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index 78a5365..0e4467d 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -5,10 +5,13 @@
 package manager
 
 import (
+	"bytes"
+	"fmt"
 	"sort"
 	"sync"
 
 	"v.io/v23/context"
+	"v.io/v23/flow"
 	"v.io/v23/naming"
 	"v.io/x/ref/runtime/internal/flow/conn"
 )
@@ -26,11 +29,10 @@
 }
 
 type connEntry struct {
-	conn          *conn.Conn
-	rid           naming.RoutingID
-	addrKey       string
-	blessingNames []string
-	proxy         bool
+	conn    *conn.Conn
+	rid     naming.RoutingID
+	addrKey string
+	proxy   bool
 }
 
 func NewConnCache() *ConnCache {
@@ -46,6 +48,19 @@
 	}
 }
 
+func (c *ConnCache) String() string {
+	buf := &bytes.Buffer{}
+	fmt.Fprintln(buf, "AddressCache:")
+	for k, v := range c.addrCache {
+		fmt.Fprintf(buf, "%v: %p\n", k, v.conn)
+	}
+	fmt.Fprintln(buf, "RIDCache:")
+	for k, v := range c.ridCache {
+		fmt.Fprintf(buf, "%v: %p\n", k, v.conn)
+	}
+	return buf.String()
+}
+
 // Insert adds conn to the cache, keyed by both (protocol, address) and (routingID)
 // An error will be returned iff the cache has been closed.
 func (c *ConnCache) Insert(conn *conn.Conn, protocol, address string, proxy bool) error {
@@ -62,9 +77,6 @@
 		addrKey: k,
 		proxy:   proxy,
 	}
-	if !entry.proxy {
-		entry.blessingNames = ep.BlessingNames()
-	}
 	if old := c.ridCache[entry.rid]; old != nil {
 		c.unmappedConns[old] = true
 	}
@@ -86,9 +98,6 @@
 		rid:   ep.RoutingID(),
 		proxy: proxy,
 	}
-	if !entry.proxy {
-		entry.blessingNames = ep.BlessingNames()
-	}
 	if old := c.ridCache[entry.rid]; old != nil {
 		c.unmappedConns[old] = true
 	}
@@ -96,19 +105,19 @@
 	return nil
 }
 
-func (c *ConnCache) Find(protocol, address string, rid naming.RoutingID, blessingNames []string) (*conn.Conn, error) {
+func (c *ConnCache) Find(ctx *context.T, protocol, address string, rid naming.RoutingID, auth flow.PeerAuthorizer) (*conn.Conn, error) {
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	if c.addrCache == nil {
 		return nil, NewErrCacheClosed(nil)
 	}
 	if rid != naming.NullRoutingID {
-		if entry := c.removeUndialable(c.ridCache[rid], blessingNames); entry != nil {
+		if entry := c.removeUndialable(ctx, c.ridCache[rid], auth); entry != nil {
 			return entry, nil
 		}
 	}
 	k := key(protocol, address)
-	return c.removeUndialable(c.addrCache[k], blessingNames), nil
+	return c.removeUndialable(ctx, c.addrCache[k], auth), nil
 }
 
 // ReservedFind returns a Conn based on the input remoteEndpoint.
@@ -120,14 +129,14 @@
 // the arguments provided to ReservedFind.
 // All new ReservedFind calls for the (protocol, address) will Block
 // until the corresponding Unreserve call is made.
-func (c *ConnCache) ReservedFind(protocol, address string, rid naming.RoutingID, blessingNames []string) (*conn.Conn, error) {
+func (c *ConnCache) ReservedFind(ctx *context.T, protocol, address string, rid naming.RoutingID, auth flow.PeerAuthorizer) (*conn.Conn, error) {
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	if c.addrCache == nil {
 		return nil, NewErrCacheClosed(nil)
 	}
 	if rid != naming.NullRoutingID {
-		if entry := c.removeUndialable(c.ridCache[rid], blessingNames); entry != nil {
+		if entry := c.removeUndialable(ctx, c.ridCache[rid], auth); entry != nil {
 			return entry, nil
 		}
 	}
@@ -139,7 +148,7 @@
 		}
 	}
 	c.started[k] = true
-	return c.removeUndialable(c.addrCache[k], blessingNames), nil
+	return c.removeUndialable(ctx, c.addrCache[k], auth), nil
 }
 
 // Unreserve marks the status of the (protocol, address) as no longer started, and
@@ -166,8 +175,8 @@
 }
 
 // removeUndialable filters connections that are closed, lameducked, or non-proxied
-// connections whose blessings dont match blessingNames.
-func (c *ConnCache) removeUndialable(e *connEntry, blessingNames []string) *conn.Conn {
+// connections that do not authorize.
+func (c *ConnCache) removeUndialable(ctx *context.T, e *connEntry, auth flow.PeerAuthorizer) *conn.Conn {
 	if e == nil {
 		return nil
 	}
@@ -179,8 +188,15 @@
 		}
 		return nil
 	}
-	if !e.proxy && len(blessingNames) > 0 && !matchBlessings(e.blessingNames, blessingNames) {
-		return nil
+	if !e.proxy && auth != nil {
+		_, _, err := auth.AuthorizePeer(ctx,
+			e.conn.LocalEndpoint(),
+			e.conn.RemoteEndpoint(),
+			e.conn.RemoteBlessings(),
+			e.conn.RemoteDischarges())
+		if err != nil {
+			return nil
+		}
 	}
 	return e.conn
 }
@@ -200,7 +216,7 @@
 	err := NewErrConnKilledToFreeResources(ctx)
 	pq := make(connEntries, 0, len(c.ridCache))
 	for _, e := range c.ridCache {
-		if c.removeUndialable(e, nil) == nil {
+		if c.removeUndialable(ctx, e, nil) == nil {
 			continue
 		}
 		if e.conn.IsEncapsulated() {
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index bf1a739..260a4a4 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -14,16 +14,18 @@
 	"v.io/v23/flow"
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
+	"v.io/v23/security"
 	connpackage "v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	_ "v.io/x/ref/runtime/protocols/local"
+	"v.io/x/ref/test"
 	"v.io/x/ref/test/goroutines"
 )
 
 func TestCache(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
-	ctx, shutdown := v23.Init()
+	ctx, shutdown := test.V23Init()
 	defer shutdown()
 
 	c := NewConnCache()
@@ -31,8 +33,10 @@
 		Protocol:  "tcp",
 		Address:   "127.0.0.1:1111",
 		RID:       naming.FixedRoutingID(0x5555),
-		Blessings: []string{"A", "B", "C"},
+		Blessings: unionBlessing(ctx, "A", "B", "C"),
 	}
+
+	auth := flowtest.NewPeerAuthorizer(remote.Blessings)
 	caf := makeConnAndFlow(t, ctx, remote)
 	defer caf.stop(ctx)
 	conn := caf.c
@@ -40,32 +44,32 @@
 		t.Fatal(err)
 	}
 	// We should be able to find the conn in the cache.
-	if got, err := c.ReservedFind(remote.Protocol, remote.Address, naming.NullRoutingID, remote.Blessings); err != nil || got != conn {
+	if got, err := c.ReservedFind(ctx, remote.Protocol, remote.Address, naming.NullRoutingID, auth); err != nil || got != conn {
 		t.Errorf("got %v, want %v, err: %v", got, conn, err)
 	}
 	c.Unreserve(remote.Protocol, remote.Address)
 	// Changing the protocol should fail.
-	if got, err := c.ReservedFind("wrong", remote.Address, naming.NullRoutingID, remote.Blessings); err != nil || got != nil {
+	if got, err := c.ReservedFind(ctx, "wrong", remote.Address, naming.NullRoutingID, auth); err != nil || got != nil {
 		t.Errorf("got %v, want <nil>, err: %v", got, err)
 	}
 	c.Unreserve("wrong", remote.Address)
 	// Changing the address should fail.
-	if got, err := c.ReservedFind(remote.Protocol, "wrong", naming.NullRoutingID, remote.Blessings); err != nil || got != nil {
+	if got, err := c.ReservedFind(ctx, remote.Protocol, "wrong", naming.NullRoutingID, auth); err != nil || got != nil {
 		t.Errorf("got %v, want <nil>, err: %v", got, err)
 	}
 	c.Unreserve(remote.Protocol, "wrong")
 	// Changing the blessingNames should fail.
-	if got, err := c.ReservedFind(remote.Protocol, remote.Address, naming.NullRoutingID, []string{"wrong"}); err != nil || got != nil {
+	if got, err := c.ReservedFind(ctx, remote.Protocol, remote.Address, naming.NullRoutingID, flowtest.NewPeerAuthorizer([]string{"wrong"})); err != nil || got != nil {
 		t.Errorf("got %v, want <nil>, err: %v", got, err)
 	}
 	c.Unreserve(remote.Protocol, remote.Address)
-	// But finding a set of blessings that has at least on blessings in remote.Blessings should succeed.
-	if got, err := c.ReservedFind(remote.Protocol, remote.Address, naming.NullRoutingID, []string{"foo", "A"}); err != nil || got != conn {
+	// But finding a set of blessings that has at least one blessings in remote.Blessings should succeed.
+	if got, err := c.ReservedFind(ctx, remote.Protocol, remote.Address, naming.NullRoutingID, flowtest.NewPeerAuthorizer([]string{"foo", remote.Blessings[0]})); err != nil || got != conn {
 		t.Errorf("got %v, want %v, err: %v", got, conn, err)
 	}
 	c.Unreserve(remote.Protocol, remote.Address)
 	// Finding by routing ID should work.
-	if got, err := c.ReservedFind("wrong", "wrong", remote.RID, remote.Blessings); err != nil || got != conn {
+	if got, err := c.ReservedFind(ctx, "wrong", "wrong", remote.RID, auth); err != nil || got != conn {
 		t.Errorf("got %v, want %v, err: %v", got, conn, err)
 	}
 	c.Unreserve("wrong", "wrong")
@@ -76,7 +80,7 @@
 		Protocol:  "tcp",
 		Address:   "127.0.0.1:2222",
 		RID:       naming.FixedRoutingID(0x5555),
-		Blessings: []string{"A", "B", "C"},
+		Blessings: unionBlessing(ctx, "A", "B", "C"),
 	}
 	caf = makeConnAndFlow(t, ctx, proxyep)
 	defer caf.stop(ctx)
@@ -85,7 +89,7 @@
 		t.Fatal(err)
 	}
 	// Wrong blessingNames should still work
-	if got, err := c.ReservedFind(proxyep.Protocol, proxyep.Address, naming.NullRoutingID, []string{"wrong"}); err != nil || got != proxyConn {
+	if got, err := c.ReservedFind(ctx, proxyep.Protocol, proxyep.Address, naming.NullRoutingID, flowtest.NewPeerAuthorizer([]string{"wrong"})); err != nil || got != proxyConn {
 		t.Errorf("got %v, want %v, err: %v", got, proxyConn, err)
 	}
 	c.Unreserve(proxyep.Protocol, proxyep.Address)
@@ -95,20 +99,21 @@
 		Protocol:  "ridonly",
 		Address:   "ridonly",
 		RID:       naming.FixedRoutingID(0x1111),
-		Blessings: []string{"ridonly"},
+		Blessings: unionBlessing(ctx, "ridonly"),
 	}
+	ridauth := flowtest.NewPeerAuthorizer(ridEP.Blessings)
 	caf = makeConnAndFlow(t, ctx, ridEP)
 	defer caf.stop(ctx)
 	ridConn := caf.c
 	if err := c.InsertWithRoutingID(ridConn, false); err != nil {
 		t.Fatal(err)
 	}
-	if got, err := c.ReservedFind(ridEP.Protocol, ridEP.Address, naming.NullRoutingID, ridEP.Blessings); err != nil || got != nil {
+	if got, err := c.ReservedFind(ctx, ridEP.Protocol, ridEP.Address, naming.NullRoutingID, ridauth); err != nil || got != nil {
 		t.Errorf("got %v, want <nil>, err: %v", got, err)
 	}
 	c.Unreserve(ridEP.Protocol, ridEP.Address)
 	// Finding by routing ID should work.
-	if got, err := c.ReservedFind("wrong", "wrong", ridEP.RID, ridEP.Blessings); err != nil || got != ridConn {
+	if got, err := c.ReservedFind(ctx, "wrong", "wrong", ridEP.RID, ridauth); err != nil || got != ridConn {
 		t.Errorf("got %v, want %v, err: %v", got, ridConn, err)
 	}
 	c.Unreserve("wrong", "wrong")
@@ -117,20 +122,21 @@
 		Protocol:  "other",
 		Address:   "other",
 		RID:       naming.FixedRoutingID(0x2222),
-		Blessings: []string{"other"},
+		Blessings: unionBlessing(ctx, "other"),
 	}
+	otherAuth := flowtest.NewPeerAuthorizer(otherEP.Blessings)
 	caf = makeConnAndFlow(t, ctx, otherEP)
 	defer caf.stop(ctx)
 	otherConn := caf.c
 
 	// Looking up a not yet inserted endpoint should fail.
-	if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, naming.NullRoutingID, otherEP.Blessings); err != nil || got != nil {
+	if got, err := c.ReservedFind(ctx, otherEP.Protocol, otherEP.Address, naming.NullRoutingID, otherAuth); err != nil || got != nil {
 		t.Errorf("got %v, want <nil>, err: %v", got, err)
 	}
 	// Looking it up again should block until a matching Unreserve call is made.
 	ch := make(chan *connpackage.Conn, 1)
 	go func(ch chan *connpackage.Conn) {
-		conn, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, naming.NullRoutingID, otherEP.Blessings)
+		conn, err := c.ReservedFind(ctx, otherEP.Protocol, otherEP.Address, naming.NullRoutingID, otherAuth)
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -177,7 +183,7 @@
 
 func TestLRU(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
-	ctx, shutdown := v23.Init()
+	ctx, shutdown := test.V23Init()
 	defer shutdown()
 
 	// Ensure that the least recently created conns are killed by KillConnections.
@@ -202,13 +208,14 @@
 		if status := conn.c.Status(); status == connpackage.Closed {
 			t.Errorf("conn %v should not have been closed", conn)
 		}
-		if !isInCache(t, c, conn.c) {
-			t.Errorf("conn %v should still be in cache", conn)
+		if !isInCache(t, ctx, c, conn.c) {
+			t.Errorf("conn %v(%p) should still be in cache:\n%s",
+				conn.c.RemoteEndpoint(), conn.c, c)
 		}
 	}
 	for _, conn := range conns[:3] {
 		<-conn.c.Closed()
-		if isInCache(t, c, conn.c) {
+		if isInCache(t, ctx, c, conn.c) {
 			t.Errorf("conn %v should not be in cache", conn)
 		}
 	}
@@ -238,13 +245,13 @@
 		if status := conn.c.Status(); status == connpackage.Closed {
 			t.Errorf("conn %v should not have been closed", conn)
 		}
-		if !isInCache(t, c, conn.c) {
+		if !isInCache(t, ctx, c, conn.c) {
 			t.Errorf("conn %v should still be in cache", conn)
 		}
 	}
 	for _, conn := range conns[7:] {
 		<-conn.c.Closed()
-		if isInCache(t, c, conn.c) {
+		if isInCache(t, ctx, c, conn.c) {
 			t.Errorf("conn %v should not be in cache", conn)
 		}
 	}
@@ -274,21 +281,22 @@
 		if status := conn.c.Status(); status == connpackage.Closed {
 			t.Errorf("conn %v should not have been closed", conn)
 		}
-		if !isInCache(t, c, conn.c) {
-			t.Errorf("conn %v should still be in cache", conn)
+		if !isInCache(t, ctx, c, conn.c) {
+			t.Errorf("conn %v(%p) should still be in cache:\n%s",
+				conn.c.RemoteEndpoint(), conn.c, c)
 		}
 	}
 	for _, conn := range conns[7:] {
 		<-conn.c.Closed()
-		if isInCache(t, c, conn.c) {
+		if isInCache(t, ctx, c, conn.c) {
 			t.Errorf("conn %v should not be in cache", conn)
 		}
 	}
 }
 
-func isInCache(t *testing.T, c *ConnCache, conn *connpackage.Conn) bool {
+func isInCache(t *testing.T, ctx *context.T, c *ConnCache, conn *connpackage.Conn) bool {
 	rep := conn.RemoteEndpoint()
-	rfconn, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.RoutingID(), rep.BlessingNames())
+	rfconn, err := c.ReservedFind(ctx, rep.Addr().Network(), rep.Addr().String(), rep.RoutingID(), flowtest.NewPeerAuthorizer(rep.BlessingNames()))
 	if err != nil {
 		t.Error(err)
 	}
@@ -392,3 +400,25 @@
 	}()
 	return nil
 }
+
+func unionBlessing(ctx *context.T, names ...string) []string {
+	principal := v23.GetPrincipal(ctx)
+	blessings := make([]security.Blessings, len(names))
+	for i, name := range names {
+		var err error
+		if blessings[i], err = principal.BlessSelf(name); err != nil {
+			panic(err)
+		}
+	}
+	union, err := security.UnionOfBlessings(blessings...)
+	if err != nil {
+		panic(err)
+	}
+	if err := security.AddToRoots(principal, union); err != nil {
+		panic(err)
+	}
+	if err := principal.BlessingStore().SetDefault(union); err != nil {
+		panic(err)
+	}
+	return security.BlessingNames(principal, principal.BlessingStore().Default())
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 8ac1742..e35abc8 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -657,8 +657,8 @@
 
 func (m *manager) internalDial(ctx *context.T, remote naming.Endpoint, auth flow.PeerAuthorizer, channelTimeout time.Duration, proxy bool) (flow.Flow, error) {
 	// Fast path, look for the conn based on unresolved network, address, and routingId first.
-	addr, rid, blessingNames := remote.Addr(), remote.RoutingID(), remote.BlessingNames()
-	c, err := m.cache.Find(addr.Network(), addr.String(), rid, blessingNames)
+	addr, rid := remote.Addr(), remote.RoutingID()
+	c, err := m.cache.Find(ctx, addr.Network(), addr.String(), rid, auth)
 	if err != nil {
 		return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
 	}
@@ -680,7 +680,7 @@
 		if err != nil {
 			return nil, iflow.MaybeWrapError(flow.ErrResolveFailed, ctx, err)
 		}
-		c, err = m.cache.ReservedFind(network, address, rid, blessingNames)
+		c, err = m.cache.ReservedFind(ctx, network, address, rid, auth)
 		if err != nil {
 			return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
 		}