flow/manager: Add NullRoutingID flow.Manager that doesn't send client
blessings and can't accept connections.
Change-Id: I681da1476fa0c239278541af205802ab448156bb
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 76a5206..a7991c7 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -159,6 +159,12 @@
// RemoteEndpoint returns the remote vanadium Endpoint
func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
+// LocalBlessings returns the local blessings.
+func (c *Conn) LocalBlessings() security.Blessings { return c.lBlessings }
+
+// RemoteBlessings returns the remote blessings.
+func (c *Conn) RemoteBlessings() security.Blessings { return c.rBlessings }
+
// CommonVersion returns the RPCVersion negotiated between the local and remote endpoints.
func (c *Conn) CommonVersion() version.RPCVersion { return c.version }
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index cdb81a7..92f8348 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -183,6 +183,9 @@
// is identified by the provided rid. nil is returned if there is no such Conn.
// FindWithRoutingID will return an error iff the cache is closed.
func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*conn.Conn, error) {
+ if rid == naming.NullRoutingID {
+ return nil, nil
+ }
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 6ec43fb..d39b1ad 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -266,7 +266,11 @@
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
- return m.internalDial(ctx, remote, fn, &flowHandler{q: m.q, closed: m.closed})
+ var fh conn.FlowHandler
+ if m.rid != naming.NullRoutingID {
+ fh = &flowHandler{q: m.q, closed: m.closed}
+ }
+ return m.internalDial(ctx, remote, fn, fh)
}
func (m *manager) internalDial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer, fh conn.FlowHandler) (flow.Flow, error) {
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 41c470f..62403da 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -15,6 +15,7 @@
"v.io/v23/naming"
_ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/flowtest"
"v.io/x/ref/test"
)
@@ -82,6 +83,29 @@
testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
}
+func TestNullClientBlessings(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ am := New(ctx, naming.FixedRoutingID(0x5555))
+ if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+ t.Fatal(err)
+ }
+ dm := New(ctx, naming.NullRoutingID)
+ _, af := testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+ // Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
+ if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
+ t.Errorf("got %v, want zero-value blessings", rBlessings)
+ }
+ dm = New(ctx, naming.FixedRoutingID(0x1111))
+ _, af = testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+ // Ensure that the remote blessings of the underlying conn of the accepted flow are
+ // non-zero if we did specify a RoutingID.
+ if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
+ t.Errorf("got %v, want non-zero blessings", rBlessings)
+ }
+}
+
func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
eps := am.ListeningEndpoints()
if len(eps) == 0 {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index f3452b0..c4f358e 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -190,7 +190,13 @@
}
// Add the flow.Manager to the context.
- ctx, _, err = r.ExperimentalWithNewFlowManager(ctx)
+ // This initial Flow Manager can only be used as a client.
+ ctx, _, err = r.setNewClientFlowManager(ctx)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ // Add the Client to the context.
+ ctx, _, err = r.WithNewClient(ctx)
if err != nil {
return nil, nil, nil, err
}
@@ -321,21 +327,22 @@
return sm, nil
}
-func newFlowManager(ctx *context.T) (flow.Manager, error) {
- rid, err := naming.NewRoutingID()
- if err != nil {
- return nil, err
- }
- return manager.New(ctx, rid), nil
+func (r *Runtime) setNewClientFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+ return r.setNewFlowManager(ctx, naming.NullRoutingID)
}
-func (r *Runtime) setNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
- fm, err := newFlowManager(ctx)
+func (r *Runtime) setNewBidiFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+ rid, err := naming.NewRoutingID()
if err != nil {
return nil, nil, err
}
+ return r.setNewFlowManager(ctx, rid)
+}
+
+func (r *Runtime) setNewFlowManager(ctx *context.T, rid naming.RoutingID) (*context.T, flow.Manager, error) {
+ fm := manager.New(ctx, rid)
// TODO(mattr): How can we close a flow manager.
- if err = r.addChild(ctx, fm, func() {}); err != nil {
+ if err := r.addChild(ctx, fm, func() {}); err != nil {
return ctx, nil, err
}
newctx := context.WithValue(ctx, flowManagerKey, fm)
@@ -396,7 +403,12 @@
if newctx, err = r.setNewStreamManager(newctx); err != nil {
return ctx, err
}
- if newctx, _, err = r.setNewFlowManager(newctx); err != nil {
+ if rid := r.ExperimentalGetFlowManager(newctx).RoutingID(); rid == naming.NullRoutingID {
+ newctx, _, err = r.setNewClientFlowManager(newctx)
+ } else {
+ newctx, _, err = r.setNewBidiFlowManager(newctx)
+ }
+ if err != nil {
return ctx, err
}
if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
@@ -559,7 +571,7 @@
func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- newctx, m, err := r.setNewFlowManager(ctx)
+ newctx, m, err := r.setNewBidiFlowManager(ctx)
if err != nil {
return ctx, nil, err
}
diff --git a/runtime/internal/rt/runtime_test.go b/runtime/internal/rt/runtime_test.go
index 9b7f6cd..fc70a76 100644
--- a/runtime/internal/rt/runtime_test.go
+++ b/runtime/internal/rt/runtime_test.go
@@ -161,6 +161,9 @@
if oldman == nil {
t.Error("ExperimentalGetFlowManager should have returned a non-nil value")
}
+ if rid := oldman.RoutingID(); rid != naming.NullRoutingID {
+ t.Errorf("Initial flow.Manager should have NullRoutingID, got %v", rid)
+ }
newctx, newman, err := r.ExperimentalWithNewFlowManager(ctx)
if err != nil || newman == nil || newman == oldman {
t.Fatalf("Could not create flow manager: %v", err)
@@ -172,4 +175,7 @@
if man != newman || man == oldman {
t.Error("ExperimentalWithNewFlowManager didn't update the context properly")
}
+ if man.RoutingID() == naming.NullRoutingID {
+ t.Error("Newly created flow.Manager should not have NullRoutingID")
+ }
}
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index 4257409..43f613a 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -43,14 +43,6 @@
func TestMultipleProxies(t *testing.T) {
pctx, shutdown := v23.Init()
defer shutdown()
- p2ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
- if err != nil {
- t.Fatal(err)
- }
- p3ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
- if err != nil {
- t.Fatal(err)
- }
actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
if err != nil {
t.Fatal(err)
@@ -62,9 +54,9 @@
pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
- p2ep := startProxy(t, p2ctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
+ p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
- p3ep := startProxy(t, p3ctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
+ p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
if err := am.Listen(actx, "v23", p3ep.String()); err != nil {
t.Fatal(err)
@@ -147,7 +139,7 @@
ls.Addrs = append(ls.Addrs, addr)
}
ctx = v23.WithListenSpec(ctx, ls)
- proxy, err := xproxyd.New(ctx)
+ proxy, _, err := xproxyd.New(ctx)
if err != nil {
t.Fatal(err)
}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index fc97ceb..d05a561 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -24,41 +24,45 @@
proxyEndpoints []naming.Endpoint
}
-func New(ctx *context.T) (*proxy, error) {
+func New(ctx *context.T) (*proxy, *context.T, error) {
+ ctx, mgr, err := v23.ExperimentalWithNewFlowManager(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
p := &proxy{
- m: v23.ExperimentalGetFlowManager(ctx),
+ m: mgr,
}
for _, addr := range v23.GetListenSpec(ctx).Addrs {
if addr.Protocol == "v23" {
ep, err := v23.NewEndpoint(addr.Address)
if err != nil {
- return nil, err
+ return nil, nil, err
}
f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
if err != nil {
- return nil, err
+ return nil, nil, err
}
// Send a byte telling the acceptor that we are a proxy.
if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
- return nil, err
+ return nil, nil, err
}
msg, err := readMessage(ctx, f)
if err != nil {
- return nil, err
+ return nil, nil, err
}
m, ok := msg.(*message.ProxyResponse)
if !ok {
- return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
+ return nil, nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
}
p.mu.Lock()
p.proxyEndpoints = append(p.proxyEndpoints, m.Endpoints...)
p.mu.Unlock()
} else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
- return nil, err
+ return nil, nil, err
}
}
go p.listenLoop(ctx)
- return p, nil
+ return p, ctx, nil
}
func (p *proxy) ListeningEndpoints() []naming.Endpoint {