flow/manager: Add NullRoutingID flow.Manager that doesn't send client
blessings and can't accept connections.

Change-Id: I681da1476fa0c239278541af205802ab448156bb
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 76a5206..a7991c7 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -159,6 +159,12 @@
 // RemoteEndpoint returns the remote vanadium Endpoint
 func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
 
+// LocalBlessings returns the local blessings.
+func (c *Conn) LocalBlessings() security.Blessings { return c.lBlessings }
+
+// RemoteBlessings returns the remote blessings.
+func (c *Conn) RemoteBlessings() security.Blessings { return c.rBlessings }
+
 // CommonVersion returns the RPCVersion negotiated between the local and remote endpoints.
 func (c *Conn) CommonVersion() version.RPCVersion { return c.version }
 
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index cdb81a7..92f8348 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -183,6 +183,9 @@
 // is identified by the provided rid. nil is returned if there is no such Conn.
 // FindWithRoutingID will return an error iff the cache is closed.
 func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*conn.Conn, error) {
+	if rid == naming.NullRoutingID {
+		return nil, nil
+	}
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	if c.addrCache == nil {
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 6ec43fb..d39b1ad 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -266,7 +266,11 @@
 // The flow.Manager associated with ctx must be the receiver of the method,
 // otherwise an error is returned.
 func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
-	return m.internalDial(ctx, remote, fn, &flowHandler{q: m.q, closed: m.closed})
+	var fh conn.FlowHandler
+	if m.rid != naming.NullRoutingID {
+		fh = &flowHandler{q: m.q, closed: m.closed}
+	}
+	return m.internalDial(ctx, remote, fn, fh)
 }
 
 func (m *manager) internalDial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer, fh conn.FlowHandler) (flow.Flow, error) {
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 41c470f..62403da 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -15,6 +15,7 @@
 	"v.io/v23/naming"
 
 	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 	"v.io/x/ref/test"
 )
@@ -82,6 +83,29 @@
 	testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
 }
 
+func TestNullClientBlessings(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	am := New(ctx, naming.FixedRoutingID(0x5555))
+	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+		t.Fatal(err)
+	}
+	dm := New(ctx, naming.NullRoutingID)
+	_, af := testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	// Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
+	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
+		t.Errorf("got %v, want zero-value blessings", rBlessings)
+	}
+	dm = New(ctx, naming.FixedRoutingID(0x1111))
+	_, af = testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+	// Ensure that the remote blessings of the underlying conn of the accepted flow are
+	// non-zero if we did specify a RoutingID.
+	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
+		t.Errorf("got %v, want non-zero blessings", rBlessings)
+	}
+}
+
 func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
 	eps := am.ListeningEndpoints()
 	if len(eps) == 0 {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index f3452b0..c4f358e 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -190,7 +190,13 @@
 	}
 
 	// Add the flow.Manager to the context.
-	ctx, _, err = r.ExperimentalWithNewFlowManager(ctx)
+	// This initial Flow Manager can only be used as a client.
+	ctx, _, err = r.setNewClientFlowManager(ctx)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	// Add the Client to the context.
+	ctx, _, err = r.WithNewClient(ctx)
 	if err != nil {
 		return nil, nil, nil, err
 	}
@@ -321,21 +327,22 @@
 	return sm, nil
 }
 
-func newFlowManager(ctx *context.T) (flow.Manager, error) {
-	rid, err := naming.NewRoutingID()
-	if err != nil {
-		return nil, err
-	}
-	return manager.New(ctx, rid), nil
+func (r *Runtime) setNewClientFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+	return r.setNewFlowManager(ctx, naming.NullRoutingID)
 }
 
-func (r *Runtime) setNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
-	fm, err := newFlowManager(ctx)
+func (r *Runtime) setNewBidiFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+	rid, err := naming.NewRoutingID()
 	if err != nil {
 		return nil, nil, err
 	}
+	return r.setNewFlowManager(ctx, rid)
+}
+
+func (r *Runtime) setNewFlowManager(ctx *context.T, rid naming.RoutingID) (*context.T, flow.Manager, error) {
+	fm := manager.New(ctx, rid)
 	// TODO(mattr): How can we close a flow manager.
-	if err = r.addChild(ctx, fm, func() {}); err != nil {
+	if err := r.addChild(ctx, fm, func() {}); err != nil {
 		return ctx, nil, err
 	}
 	newctx := context.WithValue(ctx, flowManagerKey, fm)
@@ -396,7 +403,12 @@
 	if newctx, err = r.setNewStreamManager(newctx); err != nil {
 		return ctx, err
 	}
-	if newctx, _, err = r.setNewFlowManager(newctx); err != nil {
+	if rid := r.ExperimentalGetFlowManager(newctx).RoutingID(); rid == naming.NullRoutingID {
+		newctx, _, err = r.setNewClientFlowManager(newctx)
+	} else {
+		newctx, _, err = r.setNewBidiFlowManager(newctx)
+	}
+	if err != nil {
 		return ctx, err
 	}
 	if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
@@ -559,7 +571,7 @@
 
 func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	newctx, m, err := r.setNewFlowManager(ctx)
+	newctx, m, err := r.setNewBidiFlowManager(ctx)
 	if err != nil {
 		return ctx, nil, err
 	}
diff --git a/runtime/internal/rt/runtime_test.go b/runtime/internal/rt/runtime_test.go
index 9b7f6cd..fc70a76 100644
--- a/runtime/internal/rt/runtime_test.go
+++ b/runtime/internal/rt/runtime_test.go
@@ -161,6 +161,9 @@
 	if oldman == nil {
 		t.Error("ExperimentalGetFlowManager should have returned a non-nil value")
 	}
+	if rid := oldman.RoutingID(); rid != naming.NullRoutingID {
+		t.Errorf("Initial flow.Manager should have NullRoutingID, got %v", rid)
+	}
 	newctx, newman, err := r.ExperimentalWithNewFlowManager(ctx)
 	if err != nil || newman == nil || newman == oldman {
 		t.Fatalf("Could not create flow manager: %v", err)
@@ -172,4 +175,7 @@
 	if man != newman || man == oldman {
 		t.Error("ExperimentalWithNewFlowManager didn't update the context properly")
 	}
+	if man.RoutingID() == naming.NullRoutingID {
+		t.Error("Newly created flow.Manager should not have NullRoutingID")
+	}
 }