ref/runtime/interal/rpc: Implement enough to run the benchmarks with the
new client and server.
This forced me to at least temporarily make the namespace and flow manager
args to NewXClient explicit.
Change-Id: Ib49ccf3b7fb714e1572868b2963ffb56494ce8d9
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 3b2f121..cf2cb9b 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -305,7 +305,9 @@
b.mu.Lock()
if err != nil {
if err != io.EOF {
- ctx.Errorf("Blessings flow closed: %v", err)
+ // TODO(mattr): In practice this is very spammy,
+ // figure out how to log it more effectively.
+ ctx.VI(3).Infof("Blessings flow closed: %v", err)
}
b.closed = true
b.mu.Unlock()
diff --git a/runtime/internal/rpc/benchmark/simple/main.go b/runtime/internal/rpc/benchmark/simple/main.go
index 7b624fc..c904a7d 100644
--- a/runtime/internal/rpc/benchmark/simple/main.go
+++ b/runtime/internal/rpc/benchmark/simple/main.go
@@ -16,8 +16,11 @@
"v.io/v23/naming"
"v.io/x/ref/lib/security/securityflag"
_ "v.io/x/ref/runtime/factories/static"
+ "v.io/x/ref/runtime/internal/flow/flowtest"
+ fmanager "v.io/x/ref/runtime/internal/flow/manager"
"v.io/x/ref/runtime/internal/rpc/benchmark/internal"
"v.io/x/ref/runtime/internal/rpc/stream/manager"
+ "v.io/x/ref/runtime/internal/rt"
"v.io/x/ref/test"
"v.io/x/ref/test/benchmark"
"v.io/x/ref/test/testutil"
@@ -49,19 +52,28 @@
principal := testutil.NewPrincipal("test")
nctx, _ := v23.WithPrincipal(ctx, principal)
+ b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
- client := manager.InternalNew(ctx, naming.FixedRoutingID(0xc))
-
- b.StartTimer()
- _, err := client.Dial(nctx, serverEP)
- if err != nil {
- ctx.Fatalf("Dial failed: %v", err)
+ if rt.TransitionState >= rt.XServers {
+ m := fmanager.New(nctx, naming.FixedRoutingID(0xc))
+ b.StartTimer()
+ _, err := m.Dial(nctx, serverEP, flowtest.BlessingsForPeer)
+ if err != nil {
+ ctx.Fatalf("Dial failed: %v", err)
+ }
+ b.StopTimer()
+ // TODO(mattr): close m.
+ } else {
+ client := manager.InternalNew(ctx, naming.FixedRoutingID(0xc))
+ b.StartTimer()
+ _, err := client.Dial(nctx, serverEP)
+ if err != nil {
+ ctx.Fatalf("Dial failed: %v", err)
+ }
+ b.StopTimer()
+ client.Shutdown()
}
-
- b.StopTimer()
-
- client.Shutdown()
}
}
diff --git a/runtime/internal/rpc/transitionclient.go b/runtime/internal/rpc/transitionclient.go
index 2a68dcf..0c41746 100644
--- a/runtime/internal/rpc/transitionclient.go
+++ b/runtime/internal/rpc/transitionclient.go
@@ -6,6 +6,7 @@
import (
"v.io/v23/context"
+ "v.io/v23/flow"
"v.io/v23/flow/message"
"v.io/v23/namespace"
"v.io/v23/rpc"
@@ -19,10 +20,10 @@
var _ = rpc.Client((*transitionClient)(nil))
-func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, flowMgr flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
var err error
ret := &transitionClient{}
- if ret.xc, err = NewXClient(ctx, opts...); err != nil {
+ if ret.xc, err = NewXClient(ctx, flowMgr, ns, opts...); err != nil {
return nil, err
}
if ret.c, err = InternalNewClient(streamMgr, ns, opts...); err != nil {
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
index 7284c05..e36c3f9 100644
--- a/runtime/internal/rpc/x_test.go
+++ b/runtime/internal/rpc/x_test.go
@@ -34,7 +34,7 @@
if err != nil {
t.Fatal(verror.DebugString(err))
}
- client, err := NewXClient(ctx)
+ client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
if err != nil {
t.Fatal(verror.DebugString(err))
}
@@ -64,7 +64,7 @@
if err != nil {
t.Fatal(verror.DebugString(err))
}
- client, err := NewXClient(ctx)
+ client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
if err != nil {
t.Fatal(verror.DebugString(err))
}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 1cd90dd..afde0e4 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -46,10 +46,10 @@
var _ rpc.Client = (*xclient)(nil)
-func NewXClient(ctx *context.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewXClient(ctx *context.T, fm flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
c := &xclient{
- flowMgr: v23.ExperimentalGetFlowManager(ctx),
- ns: v23.GetNamespace(ctx),
+ flowMgr: fm,
+ ns: ns,
}
ipNets, err := ipNetworks()
if err != nil {
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 3afb638..1bdf4a4 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -151,7 +151,11 @@
}
func (s *xserver) Status() rpc.ServerStatus {
- return rpc.ServerStatus{}
+ ret := rpc.ServerStatus{}
+ for _, e := range s.chosenEndpoints {
+ ret.Endpoints = append(ret.Endpoints, e)
+ }
+ return ret
}
func (s *xserver) WatchNetwork(ch chan<- rpc.NetworkChange) {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 5952c8f..f3452b0 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -44,21 +44,21 @@
)
const (
- none = iota
- xclients
- xservers
+ None = iota
+ XClients
+ XServers
)
-var transitionState = none
+var TransitionState = None
func init() {
switch ts := os.Getenv("V23_RPC_TRANSITION_STATE"); ts {
case "xclients":
- transitionState = xclients
+ TransitionState = XClients
case "xservers":
- transitionState = xservers
+ TransitionState = XServers
case "":
- transitionState = none
+ TransitionState = None
default:
panic("Unknown transition state: " + ts)
}
@@ -432,8 +432,8 @@
var err error
deps := []interface{}{vtraceDependency{}}
- if fm != nil && transitionState >= xclients {
- client, err = irpc.NewTransitionClient(ctx, sm, ns, otherOpts...)
+ if fm != nil && TransitionState >= XClients {
+ client, err = irpc.NewTransitionClient(ctx, sm, fm, ns, otherOpts...)
deps = append(deps, fm, sm)
} else {
client, err = irpc.InternalNewClient(sm, ns, otherOpts...)
@@ -591,7 +591,7 @@
func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- if transitionState >= xservers {
+ if TransitionState >= XServers {
// TODO(mattr): Deal with shutdown deps.
newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
if err != nil {
@@ -621,7 +621,7 @@
func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- if transitionState >= xservers {
+ if TransitionState >= XServers {
// TODO(mattr): Deal with shutdown deps.
newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
if err != nil {