ref/runtime/internal/rpc: Add a small test showing basic functionality of
xclient and xserver.
Change-Id: I5cf3036aa0f397e79876d3d07bd13dba4c92f925
diff --git a/runtime/factories/fake/rpc.go b/runtime/factories/fake/rpc.go
index 3ce9bcc..903348f 100644
--- a/runtime/factories/fake/rpc.go
+++ b/runtime/factories/fake/rpc.go
@@ -37,17 +37,24 @@
func (r *Runtime) GetListenSpec(ctx *context.T) rpc.ListenSpec {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- return rpc.ListenSpec{}
+ ls, _ := ctx.Value(listenSpecKey).(rpc.ListenSpec)
+ return ls
}
func (r *Runtime) WithListenSpec(ctx *context.T, ls rpc.ListenSpec) *context.T {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ return context.WithValue(ctx, listenSpecKey, ls)
return ctx
}
+func SetFlowManager(ctx *context.T, manager flow.Manager) *context.T {
+ return context.WithValue(ctx, flowManagerKey, manager)
+}
+
func (r *Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- panic("unimplemented")
+ fm, _ := ctx.Value(flowManagerKey).(flow.Manager)
+ return fm
}
func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index 8cb7fec..464ab11 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -23,6 +23,8 @@
principalKey
loggerKey
backgroundKey
+ listenSpecKey
+ flowManagerKey
)
type Runtime struct {
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
new file mode 100644
index 0000000..4517ae0
--- /dev/null
+++ b/runtime/internal/rpc/x_test.go
@@ -0,0 +1,47 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/verror"
+ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/runtime/internal/flow/manager"
+)
+
+type testService struct{}
+
+func (t *testService) Echo(ctx *context.T, call rpc.ServerCall, arg string) (string, error) {
+ return "response:" + arg, nil
+}
+
+func TestXClientServer(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x1)))
+ ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
+ Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
+ })
+ _, err := NewServer(ctx, "server", &testService{}, nil, nil, "")
+ if err != nil {
+ t.Fatal(verror.DebugString(err))
+ }
+ client, err := NewXClient(ctx)
+ if err != nil {
+ t.Fatal(verror.DebugString(err))
+ }
+ var result string
+ if err = client.Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
+ t.Fatal(verror.DebugString(err))
+ }
+ if want := "response:hello"; result != want {
+ t.Errorf("got %q wanted %q", result, want)
+ }
+}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 1f5dcb5..1cd90dd 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -46,7 +46,7 @@
var _ rpc.Client = (*xclient)(nil)
-func InternalNewXClient(ctx *context.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewXClient(ctx *context.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
c := &xclient{
flowMgr: v23.ExperimentalGetFlowManager(ctx),
ns: v23.GetNamespace(ctx),
@@ -287,7 +287,7 @@
for _, r := range responses {
if r != nil {
numResponses++
- if verror.ErrorID(r.serverErr.Err) == message.ErrWrongProtocol.ID {
+ if r.serverErr != nil && verror.ErrorID(r.serverErr.Err) == message.ErrWrongProtocol.ID {
return nil, verror.NoRetry, false, r.serverErr.Err
}
}
@@ -463,7 +463,9 @@
subErr.Name = "remote=" + fc.flow.Conn().RemoteEndpoint().String()
// TODO(toddw): cancel context instead?
if _, cerr := fc.flow.WriteMsgAndClose(); cerr != nil && err == nil {
- return verror.New(verror.ErrInternal, fc.ctx, subErr)
+ // TODO(mattr): The context is often already canceled here, in
+ // which case we'll get an error. Not clear what to do.
+ //return verror.New(verror.ErrInternal, fc.ctx, subErr)
}
if err == nil {
return nil
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 6b98b09..8660a2a 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -74,7 +74,7 @@
stats *rpcStats // stats for this server.
}
-func InternalNewXServer(ctx *context.T, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+func NewServer(ctx *context.T, name string, object interface{}, authorizer security.Authorizer, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
ctx, cancel := context.WithRootCancel(ctx)
flowMgr := v23.ExperimentalGetFlowManager(ctx)
ns, principal := v23.GetNamespace(ctx), v23.GetPrincipal(ctx)
@@ -119,6 +119,14 @@
stats.NewStringFunc(blessingsStatsName, func() string {
return fmt.Sprintf("%s (default)", s.principal.BlessingStore().Default())
})
+ if err = s.listen(ctx, v23.GetListenSpec(ctx)); err != nil {
+ s.Stop()
+ return nil, err
+ }
+ if err = s.serve(name, object, authorizer); err != nil {
+ s.Stop()
+ return nil, err
+ }
return s, nil
}
@@ -212,7 +220,6 @@
func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) error {
s.Lock()
defer s.Unlock()
-
var lastErr error
for _, addr := range listenSpec.Addrs {
if len(addr.Address) > 0 {
@@ -281,7 +288,7 @@
}
}
-func (s *server) serve(name string, obj interface{}, authorizer security.Authorizer) error {
+func (s *xserver) serve(name string, obj interface{}, authorizer security.Authorizer) error {
if obj == nil {
return verror.New(verror.ErrBadArg, s.ctx, "nil object")
}
@@ -293,7 +300,7 @@
s.Lock()
s.isLeaf = true
s.Unlock()
- return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
+ return s.serveDispatcher(name, &leafDispatcher{invoker, authorizer})
}
func (s *xserver) serveDispatcher(name string, disp rpc.Dispatcher) error {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 37a7d0a..482736c 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -415,7 +415,7 @@
client, err = irpc.NewTransitionClient(ctx, sm, ns, otherOpts...)
deps = append(deps, fm, sm)
case fm != nil:
- client, err = irpc.InternalNewXClient(ctx, otherOpts...)
+ client, err = irpc.NewXClient(ctx, otherOpts...)
deps = append(deps, fm)
case sm != nil:
client, err = irpc.InternalNewClient(sm, ns, otherOpts...)