ref/runtime/internal/rpc: Add a small test showing basic functionality of
xclient and xserver.

Change-Id: I5cf3036aa0f397e79876d3d07bd13dba4c92f925
diff --git a/runtime/factories/fake/rpc.go b/runtime/factories/fake/rpc.go
index 3ce9bcc..903348f 100644
--- a/runtime/factories/fake/rpc.go
+++ b/runtime/factories/fake/rpc.go
@@ -37,17 +37,24 @@
 
 func (r *Runtime) GetListenSpec(ctx *context.T) rpc.ListenSpec {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	return rpc.ListenSpec{}
+	ls, _ := ctx.Value(listenSpecKey).(rpc.ListenSpec)
+	return ls
 }
 
 func (r *Runtime) WithListenSpec(ctx *context.T, ls rpc.ListenSpec) *context.T {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	return context.WithValue(ctx, listenSpecKey, ls)
 	return ctx
 }
 
+func SetFlowManager(ctx *context.T, manager flow.Manager) *context.T {
+	return context.WithValue(ctx, flowManagerKey, manager)
+}
+
 func (r *Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	panic("unimplemented")
+	fm, _ := ctx.Value(flowManagerKey).(flow.Manager)
+	return fm
 }
 
 func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index 8cb7fec..464ab11 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -23,6 +23,8 @@
 	principalKey
 	loggerKey
 	backgroundKey
+	listenSpecKey
+	flowManagerKey
 )
 
 type Runtime struct {
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
new file mode 100644
index 0000000..4517ae0
--- /dev/null
+++ b/runtime/internal/rpc/x_test.go
@@ -0,0 +1,47 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+	"testing"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
+	"v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/runtime/internal/flow/manager"
+)
+
+type testService struct{}
+
+func (t *testService) Echo(ctx *context.T, call rpc.ServerCall, arg string) (string, error) {
+	return "response:" + arg, nil
+}
+
+func TestXClientServer(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+	ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x1)))
+	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
+		Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
+	})
+	_, err := NewServer(ctx, "server", &testService{}, nil, nil, "")
+	if err != nil {
+		t.Fatal(verror.DebugString(err))
+	}
+	client, err := NewXClient(ctx)
+	if err != nil {
+		t.Fatal(verror.DebugString(err))
+	}
+	var result string
+	if err = client.Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
+		t.Fatal(verror.DebugString(err))
+	}
+	if want := "response:hello"; result != want {
+		t.Errorf("got %q wanted %q", result, want)
+	}
+}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 1f5dcb5..1cd90dd 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -46,7 +46,7 @@
 
 var _ rpc.Client = (*xclient)(nil)
 
-func InternalNewXClient(ctx *context.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewXClient(ctx *context.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
 	c := &xclient{
 		flowMgr: v23.ExperimentalGetFlowManager(ctx),
 		ns:      v23.GetNamespace(ctx),
@@ -287,7 +287,7 @@
 		for _, r := range responses {
 			if r != nil {
 				numResponses++
-				if verror.ErrorID(r.serverErr.Err) == message.ErrWrongProtocol.ID {
+				if r.serverErr != nil && verror.ErrorID(r.serverErr.Err) == message.ErrWrongProtocol.ID {
 					return nil, verror.NoRetry, false, r.serverErr.Err
 				}
 			}
@@ -463,7 +463,9 @@
 	subErr.Name = "remote=" + fc.flow.Conn().RemoteEndpoint().String()
 	// TODO(toddw): cancel context instead?
 	if _, cerr := fc.flow.WriteMsgAndClose(); cerr != nil && err == nil {
-		return verror.New(verror.ErrInternal, fc.ctx, subErr)
+		// TODO(mattr): The context is often already canceled here, in
+		// which case we'll get an error.  Not clear what to do.
+		//return verror.New(verror.ErrInternal, fc.ctx, subErr)
 	}
 	if err == nil {
 		return nil
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 6b98b09..8660a2a 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -74,7 +74,7 @@
 	stats *rpcStats // stats for this server.
 }
 
-func InternalNewXServer(ctx *context.T, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+func NewServer(ctx *context.T, name string, object interface{}, authorizer security.Authorizer, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
 	ctx, cancel := context.WithRootCancel(ctx)
 	flowMgr := v23.ExperimentalGetFlowManager(ctx)
 	ns, principal := v23.GetNamespace(ctx), v23.GetPrincipal(ctx)
@@ -119,6 +119,14 @@
 	stats.NewStringFunc(blessingsStatsName, func() string {
 		return fmt.Sprintf("%s (default)", s.principal.BlessingStore().Default())
 	})
+	if err = s.listen(ctx, v23.GetListenSpec(ctx)); err != nil {
+		s.Stop()
+		return nil, err
+	}
+	if err = s.serve(name, object, authorizer); err != nil {
+		s.Stop()
+		return nil, err
+	}
 	return s, nil
 }
 
@@ -212,7 +220,6 @@
 func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) error {
 	s.Lock()
 	defer s.Unlock()
-
 	var lastErr error
 	for _, addr := range listenSpec.Addrs {
 		if len(addr.Address) > 0 {
@@ -281,7 +288,7 @@
 	}
 }
 
-func (s *server) serve(name string, obj interface{}, authorizer security.Authorizer) error {
+func (s *xserver) serve(name string, obj interface{}, authorizer security.Authorizer) error {
 	if obj == nil {
 		return verror.New(verror.ErrBadArg, s.ctx, "nil object")
 	}
@@ -293,7 +300,7 @@
 	s.Lock()
 	s.isLeaf = true
 	s.Unlock()
-	return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
+	return s.serveDispatcher(name, &leafDispatcher{invoker, authorizer})
 }
 
 func (s *xserver) serveDispatcher(name string, disp rpc.Dispatcher) error {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 37a7d0a..482736c 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -415,7 +415,7 @@
 		client, err = irpc.NewTransitionClient(ctx, sm, ns, otherOpts...)
 		deps = append(deps, fm, sm)
 	case fm != nil:
-		client, err = irpc.InternalNewXClient(ctx, otherOpts...)
+		client, err = irpc.NewXClient(ctx, otherOpts...)
 		deps = append(deps, fm)
 	case sm != nil:
 		client, err = irpc.InternalNewClient(sm, ns, otherOpts...)