flow/manager: Add NullRoutingID flow.Manager that doesn't send client
blessings and can't accept connections.

Change-Id: I681da1476fa0c239278541af205802ab448156bb
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 76a5206..a7991c7 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -159,6 +159,12 @@
 // RemoteEndpoint returns the remote vanadium Endpoint
 func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
 
+// LocalBlessings returns the local blessings.
+func (c *Conn) LocalBlessings() security.Blessings { return c.lBlessings }
+
+// RemoteBlessings returns the remote blessings.
+func (c *Conn) RemoteBlessings() security.Blessings { return c.rBlessings }
+
 // CommonVersion returns the RPCVersion negotiated between the local and remote endpoints.
 func (c *Conn) CommonVersion() version.RPCVersion { return c.version }
 
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index cdb81a7..92f8348 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -183,6 +183,9 @@
 // is identified by the provided rid. nil is returned if there is no such Conn.
 // FindWithRoutingID will return an error iff the cache is closed.
 func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*conn.Conn, error) {
+	if rid == naming.NullRoutingID {
+		return nil, nil
+	}
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	if c.addrCache == nil {
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 6ec43fb..d39b1ad 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -266,7 +266,11 @@
 // The flow.Manager associated with ctx must be the receiver of the method,
 // otherwise an error is returned.
 func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
-	return m.internalDial(ctx, remote, fn, &flowHandler{q: m.q, closed: m.closed})
+	var fh conn.FlowHandler
+	if m.rid != naming.NullRoutingID {
+		fh = &flowHandler{q: m.q, closed: m.closed}
+	}
+	return m.internalDial(ctx, remote, fn, fh)
 }
 
 func (m *manager) internalDial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer, fh conn.FlowHandler) (flow.Flow, error) {
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 41c470f..62403da 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -15,6 +15,7 @@
 	"v.io/v23/naming"
 
 	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 	"v.io/x/ref/test"
 )
@@ -82,6 +83,29 @@
 	testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
 }
 
+func TestNullClientBlessings(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	am := New(ctx, naming.FixedRoutingID(0x5555))
+	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+		t.Fatal(err)
+	}
+	dm := New(ctx, naming.NullRoutingID)
+	_, af := testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	// Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
+	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
+		t.Errorf("got %v, want zero-value blessings", rBlessings)
+	}
+	dm = New(ctx, naming.FixedRoutingID(0x1111))
+	_, af = testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	// Ensure that the remote blessings of the underlying conn of the accepted flow are
+	// non-zero if we did specify a RoutingID.
+	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
+		t.Errorf("got %v, want non-zero blessings", rBlessings)
+	}
+}
+
 func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
 	eps := am.ListeningEndpoints()
 	if len(eps) == 0 {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index f3452b0..c4f358e 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -190,7 +190,13 @@
 	}
 
 	// Add the flow.Manager to the context.
-	ctx, _, err = r.ExperimentalWithNewFlowManager(ctx)
+	// This initial Flow Manager can only be used as a client.
+	ctx, _, err = r.setNewClientFlowManager(ctx)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	// Add the Client to the context.
+	ctx, _, err = r.WithNewClient(ctx)
 	if err != nil {
 		return nil, nil, nil, err
 	}
@@ -321,21 +327,22 @@
 	return sm, nil
 }
 
-func newFlowManager(ctx *context.T) (flow.Manager, error) {
-	rid, err := naming.NewRoutingID()
-	if err != nil {
-		return nil, err
-	}
-	return manager.New(ctx, rid), nil
+func (r *Runtime) setNewClientFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+	return r.setNewFlowManager(ctx, naming.NullRoutingID)
 }
 
-func (r *Runtime) setNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
-	fm, err := newFlowManager(ctx)
+func (r *Runtime) setNewBidiFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+	rid, err := naming.NewRoutingID()
 	if err != nil {
 		return nil, nil, err
 	}
+	return r.setNewFlowManager(ctx, rid)
+}
+
+func (r *Runtime) setNewFlowManager(ctx *context.T, rid naming.RoutingID) (*context.T, flow.Manager, error) {
+	fm := manager.New(ctx, rid)
 	// TODO(mattr): How can we close a flow manager.
-	if err = r.addChild(ctx, fm, func() {}); err != nil {
+	if err := r.addChild(ctx, fm, func() {}); err != nil {
 		return ctx, nil, err
 	}
 	newctx := context.WithValue(ctx, flowManagerKey, fm)
@@ -396,7 +403,12 @@
 	if newctx, err = r.setNewStreamManager(newctx); err != nil {
 		return ctx, err
 	}
-	if newctx, _, err = r.setNewFlowManager(newctx); err != nil {
+	if rid := r.ExperimentalGetFlowManager(newctx).RoutingID(); rid == naming.NullRoutingID {
+		newctx, _, err = r.setNewClientFlowManager(newctx)
+	} else {
+		newctx, _, err = r.setNewBidiFlowManager(newctx)
+	}
+	if err != nil {
 		return ctx, err
 	}
 	if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
@@ -559,7 +571,7 @@
 
 func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	newctx, m, err := r.setNewFlowManager(ctx)
+	newctx, m, err := r.setNewBidiFlowManager(ctx)
 	if err != nil {
 		return ctx, nil, err
 	}
diff --git a/runtime/internal/rt/runtime_test.go b/runtime/internal/rt/runtime_test.go
index 9b7f6cd..fc70a76 100644
--- a/runtime/internal/rt/runtime_test.go
+++ b/runtime/internal/rt/runtime_test.go
@@ -161,6 +161,9 @@
 	if oldman == nil {
 		t.Error("ExperimentalGetFlowManager should have returned a non-nil value")
 	}
+	if rid := oldman.RoutingID(); rid != naming.NullRoutingID {
+		t.Errorf("Initial flow.Manager should have NullRoutingID, got %v", rid)
+	}
 	newctx, newman, err := r.ExperimentalWithNewFlowManager(ctx)
 	if err != nil || newman == nil || newman == oldman {
 		t.Fatalf("Could not create flow manager: %v", err)
@@ -172,4 +175,7 @@
 	if man != newman || man == oldman {
 		t.Error("ExperimentalWithNewFlowManager didn't update the context properly")
 	}
+	if man.RoutingID() == naming.NullRoutingID {
+		t.Error("Newly created flow.Manager should not have NullRoutingID")
+	}
 }
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index 4257409..43f613a 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -43,14 +43,6 @@
 func TestMultipleProxies(t *testing.T) {
 	pctx, shutdown := v23.Init()
 	defer shutdown()
-	p2ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
-	if err != nil {
-		t.Fatal(err)
-	}
-	p3ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
-	if err != nil {
-		t.Fatal(err)
-	}
 	actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
 	if err != nil {
 		t.Fatal(err)
@@ -62,9 +54,9 @@
 
 	pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
 
-	p2ep := startProxy(t, p2ctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
+	p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
 
-	p3ep := startProxy(t, p3ctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
+	p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
 
 	if err := am.Listen(actx, "v23", p3ep.String()); err != nil {
 		t.Fatal(err)
@@ -147,7 +139,7 @@
 		ls.Addrs = append(ls.Addrs, addr)
 	}
 	ctx = v23.WithListenSpec(ctx, ls)
-	proxy, err := xproxyd.New(ctx)
+	proxy, _, err := xproxyd.New(ctx)
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index fc97ceb..d05a561 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -24,41 +24,45 @@
 	proxyEndpoints []naming.Endpoint
 }
 
-func New(ctx *context.T) (*proxy, error) {
+func New(ctx *context.T) (*proxy, *context.T, error) {
+	ctx, mgr, err := v23.ExperimentalWithNewFlowManager(ctx)
+	if err != nil {
+		return nil, nil, err
+	}
 	p := &proxy{
-		m: v23.ExperimentalGetFlowManager(ctx),
+		m: mgr,
 	}
 	for _, addr := range v23.GetListenSpec(ctx).Addrs {
 		if addr.Protocol == "v23" {
 			ep, err := v23.NewEndpoint(addr.Address)
 			if err != nil {
-				return nil, err
+				return nil, nil, err
 			}
 			f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
 			if err != nil {
-				return nil, err
+				return nil, nil, err
 			}
 			// Send a byte telling the acceptor that we are a proxy.
 			if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
-				return nil, err
+				return nil, nil, err
 			}
 			msg, err := readMessage(ctx, f)
 			if err != nil {
-				return nil, err
+				return nil, nil, err
 			}
 			m, ok := msg.(*message.ProxyResponse)
 			if !ok {
-				return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
+				return nil, nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
 			}
 			p.mu.Lock()
 			p.proxyEndpoints = append(p.proxyEndpoints, m.Endpoints...)
 			p.mu.Unlock()
 		} else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
-			return nil, err
+			return nil, nil, err
 		}
 	}
 	go p.listenLoop(ctx)
-	return p, nil
+	return p, ctx, nil
 }
 
 func (p *proxy) ListeningEndpoints() []naming.Endpoint {