Merge "flow manager: The conncache should run the users actual authorizer against cached connections instead of just trying to check blessing patterns in the remote endpoints.."
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index dd9f70a..a479aec 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -406,6 +406,12 @@
return blessings
}
+func (c *Conn) RemoteDischarges() map[string]security.Discharge {
+ // Its safe to ignore this error. It means that this conn is closed.
+ _, discharges, _ := c.blessingsFlow.getLatestRemote(nil, c.rBKey)
+ return discharges
+}
+
// CommonVersion returns the RPCVersion negotiated between the local and remote endpoints.
func (c *Conn) CommonVersion() version.RPCVersion { return c.version }
diff --git a/runtime/internal/flow/flowtest/flowtest.go b/runtime/internal/flow/flowtest/flowtest.go
index 5c3a996..5685a03 100644
--- a/runtime/internal/flow/flowtest/flowtest.go
+++ b/runtime/internal/flow/flowtest/flowtest.go
@@ -5,6 +5,7 @@
package flowtest
import (
+ "fmt"
"testing"
"time"
@@ -53,3 +54,42 @@
security.Blessings, map[string]security.Discharge, error) {
return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
}
+
+type peersAuthorizer []string
+
+func NewPeerAuthorizer(names []string) flow.PeerAuthorizer {
+ if len(names) == 0 {
+ return AllowAllPeersAuthorizer{}
+ }
+ return peersAuthorizer(names)
+}
+
+func (p peersAuthorizer) AuthorizePeer(
+ ctx *context.T,
+ localEndpoint, remoteEndpoint naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge,
+) ([]string, []security.RejectedBlessing, error) {
+ call := security.NewCall(&security.CallParams{
+ Timestamp: time.Now(),
+ LocalPrincipal: v23.GetPrincipal(ctx),
+ LocalEndpoint: localEndpoint,
+ RemoteBlessings: remoteBlessings,
+ RemoteDischarges: remoteDischarges,
+ RemoteEndpoint: remoteEndpoint,
+ })
+ peerNames, rejectedPeerNames := security.RemoteBlessingNames(ctx, call)
+ ctx.Infof("validating against %v", peerNames)
+ for _, pattern := range p {
+ if security.BlessingPattern(pattern).MatchedBy(peerNames...) {
+ return peerNames, rejectedPeerNames, nil
+ }
+ ctx.Infof("no match %v", pattern)
+ }
+ return peerNames, rejectedPeerNames, fmt.Errorf("not authorized")
+}
+
+func (peersAuthorizer) BlessingsForPeer(ctx *context.T, _ []string) (
+ security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index 78a5365..0e4467d 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -5,10 +5,13 @@
package manager
import (
+ "bytes"
+ "fmt"
"sort"
"sync"
"v.io/v23/context"
+ "v.io/v23/flow"
"v.io/v23/naming"
"v.io/x/ref/runtime/internal/flow/conn"
)
@@ -26,11 +29,10 @@
}
type connEntry struct {
- conn *conn.Conn
- rid naming.RoutingID
- addrKey string
- blessingNames []string
- proxy bool
+ conn *conn.Conn
+ rid naming.RoutingID
+ addrKey string
+ proxy bool
}
func NewConnCache() *ConnCache {
@@ -46,6 +48,19 @@
}
}
+func (c *ConnCache) String() string {
+ buf := &bytes.Buffer{}
+ fmt.Fprintln(buf, "AddressCache:")
+ for k, v := range c.addrCache {
+ fmt.Fprintf(buf, "%v: %p\n", k, v.conn)
+ }
+ fmt.Fprintln(buf, "RIDCache:")
+ for k, v := range c.ridCache {
+ fmt.Fprintf(buf, "%v: %p\n", k, v.conn)
+ }
+ return buf.String()
+}
+
// Insert adds conn to the cache, keyed by both (protocol, address) and (routingID)
// An error will be returned iff the cache has been closed.
func (c *ConnCache) Insert(conn *conn.Conn, protocol, address string, proxy bool) error {
@@ -62,9 +77,6 @@
addrKey: k,
proxy: proxy,
}
- if !entry.proxy {
- entry.blessingNames = ep.BlessingNames()
- }
if old := c.ridCache[entry.rid]; old != nil {
c.unmappedConns[old] = true
}
@@ -86,9 +98,6 @@
rid: ep.RoutingID(),
proxy: proxy,
}
- if !entry.proxy {
- entry.blessingNames = ep.BlessingNames()
- }
if old := c.ridCache[entry.rid]; old != nil {
c.unmappedConns[old] = true
}
@@ -96,19 +105,19 @@
return nil
}
-func (c *ConnCache) Find(protocol, address string, rid naming.RoutingID, blessingNames []string) (*conn.Conn, error) {
+func (c *ConnCache) Find(ctx *context.T, protocol, address string, rid naming.RoutingID, auth flow.PeerAuthorizer) (*conn.Conn, error) {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return nil, NewErrCacheClosed(nil)
}
if rid != naming.NullRoutingID {
- if entry := c.removeUndialable(c.ridCache[rid], blessingNames); entry != nil {
+ if entry := c.removeUndialable(ctx, c.ridCache[rid], auth); entry != nil {
return entry, nil
}
}
k := key(protocol, address)
- return c.removeUndialable(c.addrCache[k], blessingNames), nil
+ return c.removeUndialable(ctx, c.addrCache[k], auth), nil
}
// ReservedFind returns a Conn based on the input remoteEndpoint.
@@ -120,14 +129,14 @@
// the arguments provided to ReservedFind.
// All new ReservedFind calls for the (protocol, address) will Block
// until the corresponding Unreserve call is made.
-func (c *ConnCache) ReservedFind(protocol, address string, rid naming.RoutingID, blessingNames []string) (*conn.Conn, error) {
+func (c *ConnCache) ReservedFind(ctx *context.T, protocol, address string, rid naming.RoutingID, auth flow.PeerAuthorizer) (*conn.Conn, error) {
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
return nil, NewErrCacheClosed(nil)
}
if rid != naming.NullRoutingID {
- if entry := c.removeUndialable(c.ridCache[rid], blessingNames); entry != nil {
+ if entry := c.removeUndialable(ctx, c.ridCache[rid], auth); entry != nil {
return entry, nil
}
}
@@ -139,7 +148,7 @@
}
}
c.started[k] = true
- return c.removeUndialable(c.addrCache[k], blessingNames), nil
+ return c.removeUndialable(ctx, c.addrCache[k], auth), nil
}
// Unreserve marks the status of the (protocol, address) as no longer started, and
@@ -166,8 +175,8 @@
}
// removeUndialable filters connections that are closed, lameducked, or non-proxied
-// connections whose blessings dont match blessingNames.
-func (c *ConnCache) removeUndialable(e *connEntry, blessingNames []string) *conn.Conn {
+// connections that do not authorize.
+func (c *ConnCache) removeUndialable(ctx *context.T, e *connEntry, auth flow.PeerAuthorizer) *conn.Conn {
if e == nil {
return nil
}
@@ -179,8 +188,15 @@
}
return nil
}
- if !e.proxy && len(blessingNames) > 0 && !matchBlessings(e.blessingNames, blessingNames) {
- return nil
+ if !e.proxy && auth != nil {
+ _, _, err := auth.AuthorizePeer(ctx,
+ e.conn.LocalEndpoint(),
+ e.conn.RemoteEndpoint(),
+ e.conn.RemoteBlessings(),
+ e.conn.RemoteDischarges())
+ if err != nil {
+ return nil
+ }
}
return e.conn
}
@@ -200,7 +216,7 @@
err := NewErrConnKilledToFreeResources(ctx)
pq := make(connEntries, 0, len(c.ridCache))
for _, e := range c.ridCache {
- if c.removeUndialable(e, nil) == nil {
+ if c.removeUndialable(ctx, e, nil) == nil {
continue
}
if e.conn.IsEncapsulated() {
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index bf1a739..260a4a4 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -14,16 +14,18 @@
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc/version"
+ "v.io/v23/security"
connpackage "v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/flowtest"
inaming "v.io/x/ref/runtime/internal/naming"
_ "v.io/x/ref/runtime/protocols/local"
+ "v.io/x/ref/test"
"v.io/x/ref/test/goroutines"
)
func TestCache(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
- ctx, shutdown := v23.Init()
+ ctx, shutdown := test.V23Init()
defer shutdown()
c := NewConnCache()
@@ -31,8 +33,10 @@
Protocol: "tcp",
Address: "127.0.0.1:1111",
RID: naming.FixedRoutingID(0x5555),
- Blessings: []string{"A", "B", "C"},
+ Blessings: unionBlessing(ctx, "A", "B", "C"),
}
+
+ auth := flowtest.NewPeerAuthorizer(remote.Blessings)
caf := makeConnAndFlow(t, ctx, remote)
defer caf.stop(ctx)
conn := caf.c
@@ -40,32 +44,32 @@
t.Fatal(err)
}
// We should be able to find the conn in the cache.
- if got, err := c.ReservedFind(remote.Protocol, remote.Address, naming.NullRoutingID, remote.Blessings); err != nil || got != conn {
+ if got, err := c.ReservedFind(ctx, remote.Protocol, remote.Address, naming.NullRoutingID, auth); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
c.Unreserve(remote.Protocol, remote.Address)
// Changing the protocol should fail.
- if got, err := c.ReservedFind("wrong", remote.Address, naming.NullRoutingID, remote.Blessings); err != nil || got != nil {
+ if got, err := c.ReservedFind(ctx, "wrong", remote.Address, naming.NullRoutingID, auth); err != nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
c.Unreserve("wrong", remote.Address)
// Changing the address should fail.
- if got, err := c.ReservedFind(remote.Protocol, "wrong", naming.NullRoutingID, remote.Blessings); err != nil || got != nil {
+ if got, err := c.ReservedFind(ctx, remote.Protocol, "wrong", naming.NullRoutingID, auth); err != nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
c.Unreserve(remote.Protocol, "wrong")
// Changing the blessingNames should fail.
- if got, err := c.ReservedFind(remote.Protocol, remote.Address, naming.NullRoutingID, []string{"wrong"}); err != nil || got != nil {
+ if got, err := c.ReservedFind(ctx, remote.Protocol, remote.Address, naming.NullRoutingID, flowtest.NewPeerAuthorizer([]string{"wrong"})); err != nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
c.Unreserve(remote.Protocol, remote.Address)
- // But finding a set of blessings that has at least on blessings in remote.Blessings should succeed.
- if got, err := c.ReservedFind(remote.Protocol, remote.Address, naming.NullRoutingID, []string{"foo", "A"}); err != nil || got != conn {
+ // But finding a set of blessings that has at least one blessings in remote.Blessings should succeed.
+ if got, err := c.ReservedFind(ctx, remote.Protocol, remote.Address, naming.NullRoutingID, flowtest.NewPeerAuthorizer([]string{"foo", remote.Blessings[0]})); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
c.Unreserve(remote.Protocol, remote.Address)
// Finding by routing ID should work.
- if got, err := c.ReservedFind("wrong", "wrong", remote.RID, remote.Blessings); err != nil || got != conn {
+ if got, err := c.ReservedFind(ctx, "wrong", "wrong", remote.RID, auth); err != nil || got != conn {
t.Errorf("got %v, want %v, err: %v", got, conn, err)
}
c.Unreserve("wrong", "wrong")
@@ -76,7 +80,7 @@
Protocol: "tcp",
Address: "127.0.0.1:2222",
RID: naming.FixedRoutingID(0x5555),
- Blessings: []string{"A", "B", "C"},
+ Blessings: unionBlessing(ctx, "A", "B", "C"),
}
caf = makeConnAndFlow(t, ctx, proxyep)
defer caf.stop(ctx)
@@ -85,7 +89,7 @@
t.Fatal(err)
}
// Wrong blessingNames should still work
- if got, err := c.ReservedFind(proxyep.Protocol, proxyep.Address, naming.NullRoutingID, []string{"wrong"}); err != nil || got != proxyConn {
+ if got, err := c.ReservedFind(ctx, proxyep.Protocol, proxyep.Address, naming.NullRoutingID, flowtest.NewPeerAuthorizer([]string{"wrong"})); err != nil || got != proxyConn {
t.Errorf("got %v, want %v, err: %v", got, proxyConn, err)
}
c.Unreserve(proxyep.Protocol, proxyep.Address)
@@ -95,20 +99,21 @@
Protocol: "ridonly",
Address: "ridonly",
RID: naming.FixedRoutingID(0x1111),
- Blessings: []string{"ridonly"},
+ Blessings: unionBlessing(ctx, "ridonly"),
}
+ ridauth := flowtest.NewPeerAuthorizer(ridEP.Blessings)
caf = makeConnAndFlow(t, ctx, ridEP)
defer caf.stop(ctx)
ridConn := caf.c
if err := c.InsertWithRoutingID(ridConn, false); err != nil {
t.Fatal(err)
}
- if got, err := c.ReservedFind(ridEP.Protocol, ridEP.Address, naming.NullRoutingID, ridEP.Blessings); err != nil || got != nil {
+ if got, err := c.ReservedFind(ctx, ridEP.Protocol, ridEP.Address, naming.NullRoutingID, ridauth); err != nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
c.Unreserve(ridEP.Protocol, ridEP.Address)
// Finding by routing ID should work.
- if got, err := c.ReservedFind("wrong", "wrong", ridEP.RID, ridEP.Blessings); err != nil || got != ridConn {
+ if got, err := c.ReservedFind(ctx, "wrong", "wrong", ridEP.RID, ridauth); err != nil || got != ridConn {
t.Errorf("got %v, want %v, err: %v", got, ridConn, err)
}
c.Unreserve("wrong", "wrong")
@@ -117,20 +122,21 @@
Protocol: "other",
Address: "other",
RID: naming.FixedRoutingID(0x2222),
- Blessings: []string{"other"},
+ Blessings: unionBlessing(ctx, "other"),
}
+ otherAuth := flowtest.NewPeerAuthorizer(otherEP.Blessings)
caf = makeConnAndFlow(t, ctx, otherEP)
defer caf.stop(ctx)
otherConn := caf.c
// Looking up a not yet inserted endpoint should fail.
- if got, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, naming.NullRoutingID, otherEP.Blessings); err != nil || got != nil {
+ if got, err := c.ReservedFind(ctx, otherEP.Protocol, otherEP.Address, naming.NullRoutingID, otherAuth); err != nil || got != nil {
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
// Looking it up again should block until a matching Unreserve call is made.
ch := make(chan *connpackage.Conn, 1)
go func(ch chan *connpackage.Conn) {
- conn, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, naming.NullRoutingID, otherEP.Blessings)
+ conn, err := c.ReservedFind(ctx, otherEP.Protocol, otherEP.Address, naming.NullRoutingID, otherAuth)
if err != nil {
t.Fatal(err)
}
@@ -177,7 +183,7 @@
func TestLRU(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
- ctx, shutdown := v23.Init()
+ ctx, shutdown := test.V23Init()
defer shutdown()
// Ensure that the least recently created conns are killed by KillConnections.
@@ -202,13 +208,14 @@
if status := conn.c.Status(); status == connpackage.Closed {
t.Errorf("conn %v should not have been closed", conn)
}
- if !isInCache(t, c, conn.c) {
- t.Errorf("conn %v should still be in cache", conn)
+ if !isInCache(t, ctx, c, conn.c) {
+ t.Errorf("conn %v(%p) should still be in cache:\n%s",
+ conn.c.RemoteEndpoint(), conn.c, c)
}
}
for _, conn := range conns[:3] {
<-conn.c.Closed()
- if isInCache(t, c, conn.c) {
+ if isInCache(t, ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
@@ -238,13 +245,13 @@
if status := conn.c.Status(); status == connpackage.Closed {
t.Errorf("conn %v should not have been closed", conn)
}
- if !isInCache(t, c, conn.c) {
+ if !isInCache(t, ctx, c, conn.c) {
t.Errorf("conn %v should still be in cache", conn)
}
}
for _, conn := range conns[7:] {
<-conn.c.Closed()
- if isInCache(t, c, conn.c) {
+ if isInCache(t, ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
@@ -274,21 +281,22 @@
if status := conn.c.Status(); status == connpackage.Closed {
t.Errorf("conn %v should not have been closed", conn)
}
- if !isInCache(t, c, conn.c) {
- t.Errorf("conn %v should still be in cache", conn)
+ if !isInCache(t, ctx, c, conn.c) {
+ t.Errorf("conn %v(%p) should still be in cache:\n%s",
+ conn.c.RemoteEndpoint(), conn.c, c)
}
}
for _, conn := range conns[7:] {
<-conn.c.Closed()
- if isInCache(t, c, conn.c) {
+ if isInCache(t, ctx, c, conn.c) {
t.Errorf("conn %v should not be in cache", conn)
}
}
}
-func isInCache(t *testing.T, c *ConnCache, conn *connpackage.Conn) bool {
+func isInCache(t *testing.T, ctx *context.T, c *ConnCache, conn *connpackage.Conn) bool {
rep := conn.RemoteEndpoint()
- rfconn, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.RoutingID(), rep.BlessingNames())
+ rfconn, err := c.ReservedFind(ctx, rep.Addr().Network(), rep.Addr().String(), rep.RoutingID(), flowtest.NewPeerAuthorizer(rep.BlessingNames()))
if err != nil {
t.Error(err)
}
@@ -392,3 +400,25 @@
}()
return nil
}
+
+func unionBlessing(ctx *context.T, names ...string) []string {
+ principal := v23.GetPrincipal(ctx)
+ blessings := make([]security.Blessings, len(names))
+ for i, name := range names {
+ var err error
+ if blessings[i], err = principal.BlessSelf(name); err != nil {
+ panic(err)
+ }
+ }
+ union, err := security.UnionOfBlessings(blessings...)
+ if err != nil {
+ panic(err)
+ }
+ if err := security.AddToRoots(principal, union); err != nil {
+ panic(err)
+ }
+ if err := principal.BlessingStore().SetDefault(union); err != nil {
+ panic(err)
+ }
+ return security.BlessingNames(principal, principal.BlessingStore().Default())
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 8ac1742..e35abc8 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -657,8 +657,8 @@
func (m *manager) internalDial(ctx *context.T, remote naming.Endpoint, auth flow.PeerAuthorizer, channelTimeout time.Duration, proxy bool) (flow.Flow, error) {
// Fast path, look for the conn based on unresolved network, address, and routingId first.
- addr, rid, blessingNames := remote.Addr(), remote.RoutingID(), remote.BlessingNames()
- c, err := m.cache.Find(addr.Network(), addr.String(), rid, blessingNames)
+ addr, rid := remote.Addr(), remote.RoutingID()
+ c, err := m.cache.Find(ctx, addr.Network(), addr.String(), rid, auth)
if err != nil {
return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
}
@@ -680,7 +680,7 @@
if err != nil {
return nil, iflow.MaybeWrapError(flow.ErrResolveFailed, ctx, err)
}
- c, err = m.cache.ReservedFind(network, address, rid, blessingNames)
+ c, err = m.cache.ReservedFind(ctx, network, address, rid, auth)
if err != nil {
return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err)
}