Merge "rpc/discovery: support permission"
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index dc3ecd4..02bcc8c 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -150,7 +150,6 @@
return err
}
}
- var rPublicKey security.PublicKey
if rauth.BlessingsKey != 0 {
var err error
// TODO(mattr): Make sure we cancel out of this at some point.
@@ -158,14 +157,14 @@
if err != nil {
return err
}
- rPublicKey = c.rBlessings.PublicKey()
+ c.rPublicKey = c.rBlessings.PublicKey()
} else {
- rPublicKey = rauth.PublicKey
+ c.rPublicKey = rauth.PublicKey
}
- if rPublicKey == nil {
+ if c.rPublicKey == nil {
return NewErrNoPublicKey(ctx)
}
- if !rauth.ChannelBinding.Verify(rPublicKey, binding) {
+ if !rauth.ChannelBinding.Verify(c.rPublicKey, binding) {
return NewErrInvalidChannelBinding(ctx)
}
return nil
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 5a6e248..17475a5 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -67,6 +67,7 @@
mp *messagePipe
version version.RPCVersion
lBlessings, rBlessings security.Blessings
+ rPublicKey security.PublicKey
local, remote naming.Endpoint
closed chan struct{}
blessingsFlow *blessingsFlow
@@ -361,6 +362,14 @@
c.borrowing[msg.ID] = true
c.mu.Unlock()
+ rBlessings, _, err := c.blessingsFlow.get(ctx, msg.BlessingsKey, msg.DischargeKey)
+ if err != nil {
+ return err
+ }
+ if !reflect.DeepEqual(rBlessings.PublicKey(), c.rPublicKey) {
+ return NewErrBlessingsNotBound(ctx)
+ }
+
handler.HandleFlow(f)
if err := f.q.put(ctx, msg.Payload); err != nil {
return err
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 42d092f..42b56bf 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -25,4 +25,5 @@
DialingNonServer() {"en": "You are attempting to dial on a connection with no remote server."}
AcceptorBlessingsMissing() {"en": "The acceptor did not send blessings."}
UpdatingNilFlowHandler() {"en": "nil flowHandler cannot be updated to non-nil value."}
+ BlessingsNotBound() {"en": "blessings not bound to connection remote public key"}
)
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index ea7a30e..506f283 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -29,6 +29,7 @@
ErrDialingNonServer = verror.Register("v.io/x/ref/runtime/internal/flow/conn.DialingNonServer", verror.NoRetry, "{1:}{2:} You are attempting to dial on a connection with no remote server.")
ErrAcceptorBlessingsMissing = verror.Register("v.io/x/ref/runtime/internal/flow/conn.AcceptorBlessingsMissing", verror.NoRetry, "{1:}{2:} The acceptor did not send blessings.")
ErrUpdatingNilFlowHandler = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UpdatingNilFlowHandler", verror.NoRetry, "{1:}{2:} nil flowHandler cannot be updated to non-nil value.")
+ ErrBlessingsNotBound = verror.Register("v.io/x/ref/runtime/internal/flow/conn.BlessingsNotBound", verror.NoRetry, "{1:}{2:} blessings not bound to connection remote public key")
)
func init() {
@@ -46,6 +47,7 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDialingNonServer.ID), "{1:}{2:} You are attempting to dial on a connection with no remote server.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptorBlessingsMissing.ID), "{1:}{2:} The acceptor did not send blessings.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUpdatingNilFlowHandler.ID), "{1:}{2:} nil flowHandler cannot be updated to non-nil value.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBlessingsNotBound.ID), "{1:}{2:} blessings not bound to connection remote public key")
}
// NewErrMissingSetupOption returns an error with the ErrMissingSetupOption ID.
@@ -117,3 +119,8 @@
func NewErrUpdatingNilFlowHandler(ctx *context.T) error {
return verror.New(ErrUpdatingNilFlowHandler, ctx)
}
+
+// NewErrBlessingsNotBound returns an error with the ErrBlessingsNotBound ID.
+func NewErrBlessingsNotBound(ctx *context.T) error {
+ return verror.New(ErrBlessingsNotBound, ctx)
+}
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index b3975e2..257b681 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -19,11 +19,12 @@
// Multiple goroutines can invoke methods on the ConnCache simultaneously.
// TODO(suharshs): We should periodically look for closed connections and remove them.
type ConnCache struct {
- mu *sync.Mutex
- addrCache map[string]*connEntry // keyed by (protocol, address, blessingNames)
- ridCache map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
- started map[string]bool // keyed by (protocol, address, blessingNames)
- cond *sync.Cond
+ mu *sync.Mutex
+ cond *sync.Cond
+ addrCache map[string]*connEntry // keyed by (protocol, address, blessingNames)
+ ridCache map[naming.RoutingID]*connEntry // keyed by naming.RoutingID
+ started map[string]bool // keyed by (protocol, address, blessingNames)
+ unmappedConns map[*connEntry]bool // list of connEntries replaced by other entries
}
type connEntry struct {
@@ -36,11 +37,12 @@
mu := &sync.Mutex{}
cond := sync.NewCond(mu)
return &ConnCache{
- mu: mu,
- addrCache: make(map[string]*connEntry),
- ridCache: make(map[naming.RoutingID]*connEntry),
- started: make(map[string]bool),
- cond: cond,
+ mu: mu,
+ cond: cond,
+ addrCache: make(map[string]*connEntry),
+ ridCache: make(map[naming.RoutingID]*connEntry),
+ started: make(map[string]bool),
+ unmappedConns: make(map[*connEntry]bool),
}
}
@@ -101,6 +103,9 @@
rid: ep.RoutingID(),
addrKey: k,
}
+ if old := c.ridCache[entry.rid]; old != nil {
+ c.unmappedConns[old] = true
+ }
c.addrCache[k] = entry
c.ridCache[entry.rid] = entry
return nil
@@ -117,6 +122,9 @@
conn: conn,
rid: conn.RemoteEndpoint().RoutingID(),
}
+ if old := c.ridCache[entry.rid]; old != nil {
+ c.unmappedConns[old] = true
+ }
c.ridCache[entry.rid] = entry
return nil
}
@@ -126,8 +134,12 @@
defer c.mu.Unlock()
c.mu.Lock()
c.addrCache, c.started = nil, nil
+ err := NewErrCacheClosed(ctx)
for _, d := range c.ridCache {
- d.conn.Close(ctx, NewErrCacheClosed(ctx))
+ d.conn.Close(ctx, err)
+ }
+ for d := range c.unmappedConns {
+ d.conn.Close(ctx, err)
}
}
@@ -157,12 +169,23 @@
}
pq = append(pq, e)
}
+ for d := range c.unmappedConns {
+ if isClosed(d.conn) {
+ delete(c.unmappedConns, d)
+ continue
+ }
+ if d.conn.IsEncapsulated() {
+ continue
+ }
+ pq = append(pq, d)
+ }
sort.Sort(pq)
for i := 0; i < num; i++ {
d := pq[i]
d.conn.Close(ctx, err)
delete(c.addrCache, d.addrKey)
delete(c.ridCache, d.rid)
+ delete(c.unmappedConns, d)
}
return nil
}
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 8d02e9a..c66afa1 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -114,17 +114,27 @@
t.Errorf("got %v, want %v", cachedConn, otherConn)
}
+ // Insert a duplicate conn to ensure that replaced conns still get closed.
+ dupConn := makeConnAndFlow(t, ctx, remote).c
+ if err := c.Insert(dupConn); err != nil {
+ t.Fatal(err)
+ }
+
// Closing the cache should close all the connections in the cache.
// Ensure that the conns are not closed yet.
if isClosed(conn) {
- t.Fatalf("wanted conn to not be closed")
+ t.Fatal("wanted conn to not be closed")
+ }
+ if isClosed(dupConn) {
+ t.Fatal("wanted dupConn to not be closed")
}
if isClosed(otherConn) {
- t.Fatalf("wanted otherConn to not be closed")
+ t.Fatal("wanted otherConn to not be closed")
}
c.Close(ctx)
// Now the connections should be closed.
<-conn.Closed()
+ <-dupConn.Closed()
<-otherConn.Closed()
}