ref/runtime/rpc: Implement the real constructor functinos for xserver.

Change-Id: I993b286e0826ee0b1a4e3ca27d4361b133673ecd
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
index 4517ae0..7284c05 100644
--- a/runtime/internal/rpc/x_test.go
+++ b/runtime/internal/rpc/x_test.go
@@ -11,6 +11,7 @@
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
+	"v.io/v23/security"
 	"v.io/v23/verror"
 	"v.io/x/ref/runtime/factories/fake"
 	"v.io/x/ref/runtime/internal/flow/manager"
@@ -45,3 +46,33 @@
 		t.Errorf("got %q wanted %q", result, want)
 	}
 }
+
+type testDispatcher struct{}
+
+func (t *testDispatcher) Lookup(ctx *context.T, suffix string) (interface{}, security.Authorizer, error) {
+	return &testService{}, nil, nil
+}
+
+func TestXClientDispatchingServer(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x1)))
+	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
+		Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
+	})
+	_, err := NewDispatchingServer(ctx, "server", &testDispatcher{}, nil, "")
+	if err != nil {
+		t.Fatal(verror.DebugString(err))
+	}
+	client, err := NewXClient(ctx)
+	if err != nil {
+		t.Fatal(verror.DebugString(err))
+	}
+	var result string
+	if err = client.Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
+		t.Fatal(verror.DebugString(err))
+	}
+	if want := "response:hello"; result != want {
+		t.Errorf("got %q wanted %q", result, want)
+	}
+}
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 8660a2a..d466475 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -75,6 +75,22 @@
 }
 
 func NewServer(ctx *context.T, name string, object interface{}, authorizer security.Authorizer, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+	if object == nil {
+		return nil, verror.New(verror.ErrBadArg, ctx, "nil object")
+	}
+	invoker, err := objectToInvoker(object)
+	if err != nil {
+		return nil, verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("bad object: %v", err))
+	}
+	d := &leafDispatcher{invoker, authorizer}
+	opts = append([]rpc.ServerOpt{options.IsLeaf(true)}, opts...)
+	return NewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...)
+}
+
+func NewDispatchingServer(ctx *context.T, name string, dispatcher rpc.Dispatcher, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+	if dispatcher == nil {
+		return nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher")
+	}
 	ctx, cancel := context.WithRootCancel(ctx)
 	flowMgr := v23.ExperimentalGetFlowManager(ctx)
 	ns, principal := v23.GetNamespace(ctx), v23.GetPrincipal(ctx)
@@ -92,6 +108,7 @@
 		stats:             newRPCStats(statsPrefix),
 		settingsPublisher: settingsPublisher,
 		settingsName:      settingsName,
+		disp:              dispatcher,
 	}
 	ipNets, err := ipNetworks()
 	if err != nil {
@@ -123,9 +140,12 @@
 		s.Stop()
 		return nil, err
 	}
-	if err = s.serve(name, object, authorizer); err != nil {
-		s.Stop()
-		return nil, err
+	if len(name) > 0 {
+		for _, ep := range s.chosenEndpoints {
+			s.publisher.AddServer(ep.String())
+		}
+		s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+		vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
 	}
 	return s, nil
 }
@@ -288,38 +308,6 @@
 	}
 }
 
-func (s *xserver) serve(name string, obj interface{}, authorizer security.Authorizer) error {
-	if obj == nil {
-		return verror.New(verror.ErrBadArg, s.ctx, "nil object")
-	}
-	invoker, err := objectToInvoker(obj)
-	if err != nil {
-		return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
-	}
-	// TODO(mattr): Does this really need to be locked?
-	s.Lock()
-	s.isLeaf = true
-	s.Unlock()
-	return s.serveDispatcher(name, &leafDispatcher{invoker, authorizer})
-}
-
-func (s *xserver) serveDispatcher(name string, disp rpc.Dispatcher) error {
-	if disp == nil {
-		return verror.New(verror.ErrBadArg, s.ctx, "nil dispatcher")
-	}
-	s.Lock()
-	defer s.Unlock()
-	vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
-	s.disp = disp
-	if len(name) > 0 {
-		for _, ep := range s.chosenEndpoints {
-			s.publisher.AddServer(ep.String())
-		}
-		s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
-	}
-	return nil
-}
-
 func (s *xserver) AddName(name string) error {
 	defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	if len(name) == 0 {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 482736c..07a83af 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -552,12 +552,50 @@
 	return newctx, m, nil
 }
 
+func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*context.T, *pubsub.Publisher, string, []rpc.ServerOpt, error) {
+	newctx, _, err := r.ExperimentalWithNewFlowManager(ctx)
+	if err != nil {
+		return ctx, nil, "", nil, err
+	}
+	otherOpts := append([]rpc.ServerOpt{}, opts...)
+	if reservedDispatcher := r.GetReservedNameDispatcher(ctx); reservedDispatcher != nil {
+		otherOpts = append(otherOpts, irpc.ReservedNameDispatcher{
+			Dispatcher: reservedDispatcher,
+		})
+	}
+	id, _ := ctx.Value(initKey).(*initData)
+	if id.protocols != nil {
+		otherOpts = append(otherOpts, irpc.PreferredServerResolveProtocols(id.protocols))
+	}
+	return newctx, id.settingsPublisher, id.settingsName, otherOpts, nil
+}
+
 func (r *Runtime) XWithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.XServer, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	panic("unimplemented")
+	// TODO(mattr): Deal with shutdown deps.
+	newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+	if err != nil {
+		return ctx, nil, err
+	}
+	s, err := irpc.NewServer(newctx, name, object, auth, spub, sname, opts...)
+	if err != nil {
+		// TODO(mattr): Stop the flow manager.
+		return ctx, nil, err
+	}
+	return newctx, s, err
 }
 
 func (r *Runtime) XWithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.XServer, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	panic("unimplemented")
+	// TODO(mattr): Deal with shutdown deps.
+	newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+	if err != nil {
+		return ctx, nil, err
+	}
+	s, err := irpc.NewDispatchingServer(newctx, name, disp, spub, sname, opts...)
+	if err != nil {
+		// TODO(mattr): Stop the flow manager.
+		return ctx, nil, err
+	}
+	return newctx, s, err
 }