ref/runtime/rpc: Implement the real constructor functinos for xserver.
Change-Id: I993b286e0826ee0b1a4e3ca27d4361b133673ecd
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
index 4517ae0..7284c05 100644
--- a/runtime/internal/rpc/x_test.go
+++ b/runtime/internal/rpc/x_test.go
@@ -11,6 +11,7 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
+ "v.io/v23/security"
"v.io/v23/verror"
"v.io/x/ref/runtime/factories/fake"
"v.io/x/ref/runtime/internal/flow/manager"
@@ -45,3 +46,33 @@
t.Errorf("got %q wanted %q", result, want)
}
}
+
+type testDispatcher struct{}
+
+func (t *testDispatcher) Lookup(ctx *context.T, suffix string) (interface{}, security.Authorizer, error) {
+ return &testService{}, nil, nil
+}
+
+func TestXClientDispatchingServer(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x1)))
+ ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
+ Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
+ })
+ _, err := NewDispatchingServer(ctx, "server", &testDispatcher{}, nil, "")
+ if err != nil {
+ t.Fatal(verror.DebugString(err))
+ }
+ client, err := NewXClient(ctx)
+ if err != nil {
+ t.Fatal(verror.DebugString(err))
+ }
+ var result string
+ if err = client.Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
+ t.Fatal(verror.DebugString(err))
+ }
+ if want := "response:hello"; result != want {
+ t.Errorf("got %q wanted %q", result, want)
+ }
+}
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 8660a2a..d466475 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -75,6 +75,22 @@
}
func NewServer(ctx *context.T, name string, object interface{}, authorizer security.Authorizer, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+ if object == nil {
+ return nil, verror.New(verror.ErrBadArg, ctx, "nil object")
+ }
+ invoker, err := objectToInvoker(object)
+ if err != nil {
+ return nil, verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("bad object: %v", err))
+ }
+ d := &leafDispatcher{invoker, authorizer}
+ opts = append([]rpc.ServerOpt{options.IsLeaf(true)}, opts...)
+ return NewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...)
+}
+
+func NewDispatchingServer(ctx *context.T, name string, dispatcher rpc.Dispatcher, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+ if dispatcher == nil {
+ return nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher")
+ }
ctx, cancel := context.WithRootCancel(ctx)
flowMgr := v23.ExperimentalGetFlowManager(ctx)
ns, principal := v23.GetNamespace(ctx), v23.GetPrincipal(ctx)
@@ -92,6 +108,7 @@
stats: newRPCStats(statsPrefix),
settingsPublisher: settingsPublisher,
settingsName: settingsName,
+ disp: dispatcher,
}
ipNets, err := ipNetworks()
if err != nil {
@@ -123,9 +140,12 @@
s.Stop()
return nil, err
}
- if err = s.serve(name, object, authorizer); err != nil {
- s.Stop()
- return nil, err
+ if len(name) > 0 {
+ for _, ep := range s.chosenEndpoints {
+ s.publisher.AddServer(ep.String())
+ }
+ s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+ vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
}
return s, nil
}
@@ -288,38 +308,6 @@
}
}
-func (s *xserver) serve(name string, obj interface{}, authorizer security.Authorizer) error {
- if obj == nil {
- return verror.New(verror.ErrBadArg, s.ctx, "nil object")
- }
- invoker, err := objectToInvoker(obj)
- if err != nil {
- return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
- }
- // TODO(mattr): Does this really need to be locked?
- s.Lock()
- s.isLeaf = true
- s.Unlock()
- return s.serveDispatcher(name, &leafDispatcher{invoker, authorizer})
-}
-
-func (s *xserver) serveDispatcher(name string, disp rpc.Dispatcher) error {
- if disp == nil {
- return verror.New(verror.ErrBadArg, s.ctx, "nil dispatcher")
- }
- s.Lock()
- defer s.Unlock()
- vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
- s.disp = disp
- if len(name) > 0 {
- for _, ep := range s.chosenEndpoints {
- s.publisher.AddServer(ep.String())
- }
- s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
- }
- return nil
-}
-
func (s *xserver) AddName(name string) error {
defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
if len(name) == 0 {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 482736c..07a83af 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -552,12 +552,50 @@
return newctx, m, nil
}
+func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*context.T, *pubsub.Publisher, string, []rpc.ServerOpt, error) {
+ newctx, _, err := r.ExperimentalWithNewFlowManager(ctx)
+ if err != nil {
+ return ctx, nil, "", nil, err
+ }
+ otherOpts := append([]rpc.ServerOpt{}, opts...)
+ if reservedDispatcher := r.GetReservedNameDispatcher(ctx); reservedDispatcher != nil {
+ otherOpts = append(otherOpts, irpc.ReservedNameDispatcher{
+ Dispatcher: reservedDispatcher,
+ })
+ }
+ id, _ := ctx.Value(initKey).(*initData)
+ if id.protocols != nil {
+ otherOpts = append(otherOpts, irpc.PreferredServerResolveProtocols(id.protocols))
+ }
+ return newctx, id.settingsPublisher, id.settingsName, otherOpts, nil
+}
+
func (r *Runtime) XWithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.XServer, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- panic("unimplemented")
+ // TODO(mattr): Deal with shutdown deps.
+ newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ if err != nil {
+ return ctx, nil, err
+ }
+ s, err := irpc.NewServer(newctx, name, object, auth, spub, sname, opts...)
+ if err != nil {
+ // TODO(mattr): Stop the flow manager.
+ return ctx, nil, err
+ }
+ return newctx, s, err
}
func (r *Runtime) XWithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.XServer, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- panic("unimplemented")
+ // TODO(mattr): Deal with shutdown deps.
+ newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ if err != nil {
+ return ctx, nil, err
+ }
+ s, err := irpc.NewDispatchingServer(newctx, name, disp, spub, sname, opts...)
+ if err != nil {
+ // TODO(mattr): Stop the flow manager.
+ return ctx, nil, err
+ }
+ return newctx, s, err
}