ref/runtime/interal/rpc: Implement enough to run the benchmarks with the
new client and server.

This forced me to at least temporarily make the namespace and flow manager
args to NewXClient explicit.

Change-Id: Ib49ccf3b7fb714e1572868b2963ffb56494ce8d9
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 3b2f121..cf2cb9b 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -305,7 +305,9 @@
 		b.mu.Lock()
 		if err != nil {
 			if err != io.EOF {
-				ctx.Errorf("Blessings flow closed: %v", err)
+				// TODO(mattr): In practice this is very spammy,
+				// figure out how to log it more effectively.
+				ctx.VI(3).Infof("Blessings flow closed: %v", err)
 			}
 			b.closed = true
 			b.mu.Unlock()
diff --git a/runtime/internal/rpc/benchmark/simple/main.go b/runtime/internal/rpc/benchmark/simple/main.go
index 7b624fc..c904a7d 100644
--- a/runtime/internal/rpc/benchmark/simple/main.go
+++ b/runtime/internal/rpc/benchmark/simple/main.go
@@ -16,8 +16,11 @@
 	"v.io/v23/naming"
 	"v.io/x/ref/lib/security/securityflag"
 	_ "v.io/x/ref/runtime/factories/static"
+	"v.io/x/ref/runtime/internal/flow/flowtest"
+	fmanager "v.io/x/ref/runtime/internal/flow/manager"
 	"v.io/x/ref/runtime/internal/rpc/benchmark/internal"
 	"v.io/x/ref/runtime/internal/rpc/stream/manager"
+	"v.io/x/ref/runtime/internal/rt"
 	"v.io/x/ref/test"
 	"v.io/x/ref/test/benchmark"
 	"v.io/x/ref/test/testutil"
@@ -49,19 +52,28 @@
 	principal := testutil.NewPrincipal("test")
 	nctx, _ := v23.WithPrincipal(ctx, principal)
 
+	b.StopTimer()
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
-		client := manager.InternalNew(ctx, naming.FixedRoutingID(0xc))
-
-		b.StartTimer()
-		_, err := client.Dial(nctx, serverEP)
-		if err != nil {
-			ctx.Fatalf("Dial failed: %v", err)
+		if rt.TransitionState >= rt.XServers {
+			m := fmanager.New(nctx, naming.FixedRoutingID(0xc))
+			b.StartTimer()
+			_, err := m.Dial(nctx, serverEP, flowtest.BlessingsForPeer)
+			if err != nil {
+				ctx.Fatalf("Dial failed: %v", err)
+			}
+			b.StopTimer()
+			// TODO(mattr): close m.
+		} else {
+			client := manager.InternalNew(ctx, naming.FixedRoutingID(0xc))
+			b.StartTimer()
+			_, err := client.Dial(nctx, serverEP)
+			if err != nil {
+				ctx.Fatalf("Dial failed: %v", err)
+			}
+			b.StopTimer()
+			client.Shutdown()
 		}
-
-		b.StopTimer()
-
-		client.Shutdown()
 	}
 }
 
diff --git a/runtime/internal/rpc/transitionclient.go b/runtime/internal/rpc/transitionclient.go
index 2a68dcf..0c41746 100644
--- a/runtime/internal/rpc/transitionclient.go
+++ b/runtime/internal/rpc/transitionclient.go
@@ -6,6 +6,7 @@
 
 import (
 	"v.io/v23/context"
+	"v.io/v23/flow"
 	"v.io/v23/flow/message"
 	"v.io/v23/namespace"
 	"v.io/v23/rpc"
@@ -19,10 +20,10 @@
 
 var _ = rpc.Client((*transitionClient)(nil))
 
-func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, flowMgr flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
 	var err error
 	ret := &transitionClient{}
-	if ret.xc, err = NewXClient(ctx, opts...); err != nil {
+	if ret.xc, err = NewXClient(ctx, flowMgr, ns, opts...); err != nil {
 		return nil, err
 	}
 	if ret.c, err = InternalNewClient(streamMgr, ns, opts...); err != nil {
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
index 7284c05..e36c3f9 100644
--- a/runtime/internal/rpc/x_test.go
+++ b/runtime/internal/rpc/x_test.go
@@ -34,7 +34,7 @@
 	if err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
-	client, err := NewXClient(ctx)
+	client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
 	if err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
@@ -64,7 +64,7 @@
 	if err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
-	client, err := NewXClient(ctx)
+	client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
 	if err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 1cd90dd..afde0e4 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -46,10 +46,10 @@
 
 var _ rpc.Client = (*xclient)(nil)
 
-func NewXClient(ctx *context.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewXClient(ctx *context.T, fm flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
 	c := &xclient{
-		flowMgr: v23.ExperimentalGetFlowManager(ctx),
-		ns:      v23.GetNamespace(ctx),
+		flowMgr: fm,
+		ns:      ns,
 	}
 	ipNets, err := ipNetworks()
 	if err != nil {
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 3afb638..1bdf4a4 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -151,7 +151,11 @@
 }
 
 func (s *xserver) Status() rpc.ServerStatus {
-	return rpc.ServerStatus{}
+	ret := rpc.ServerStatus{}
+	for _, e := range s.chosenEndpoints {
+		ret.Endpoints = append(ret.Endpoints, e)
+	}
+	return ret
 }
 
 func (s *xserver) WatchNetwork(ch chan<- rpc.NetworkChange) {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 5952c8f..f3452b0 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -44,21 +44,21 @@
 )
 
 const (
-	none = iota
-	xclients
-	xservers
+	None = iota
+	XClients
+	XServers
 )
 
-var transitionState = none
+var TransitionState = None
 
 func init() {
 	switch ts := os.Getenv("V23_RPC_TRANSITION_STATE"); ts {
 	case "xclients":
-		transitionState = xclients
+		TransitionState = XClients
 	case "xservers":
-		transitionState = xservers
+		TransitionState = XServers
 	case "":
-		transitionState = none
+		TransitionState = None
 	default:
 		panic("Unknown transition state: " + ts)
 	}
@@ -432,8 +432,8 @@
 	var err error
 	deps := []interface{}{vtraceDependency{}}
 
-	if fm != nil && transitionState >= xclients {
-		client, err = irpc.NewTransitionClient(ctx, sm, ns, otherOpts...)
+	if fm != nil && TransitionState >= XClients {
+		client, err = irpc.NewTransitionClient(ctx, sm, fm, ns, otherOpts...)
 		deps = append(deps, fm, sm)
 	} else {
 		client, err = irpc.InternalNewClient(sm, ns, otherOpts...)
@@ -591,7 +591,7 @@
 
 func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	if transitionState >= xservers {
+	if TransitionState >= XServers {
 		// TODO(mattr): Deal with shutdown deps.
 		newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
 		if err != nil {
@@ -621,7 +621,7 @@
 
 func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	if transitionState >= xservers {
+	if TransitionState >= XServers {
 		// TODO(mattr): Deal with shutdown deps.
 		newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
 		if err != nil {