wspr: Treat WSPR as an RPC service.  Part 1 of many.

This series of CLs creates an internal RPC service for javascript apps to use.
In this CL we only create an RPC method for Serve (javascript creating a server
through wspr).  I am doing this to reduce the number of paths between wspr and
javascript to make it easier to implement cross cutting features like
deadlines, cancellation, and vtrace.

MultiPart: 2/2
Change-Id: I080074bb91178cb83c7fb8419236f9367911325d
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index e5513f3..97414a2 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -3,6 +3,8 @@
 package app
 
 import (
+	"bytes"
+	"encoding/hex"
 	"encoding/json"
 	"flag"
 	"fmt"
@@ -14,11 +16,13 @@
 	"v.io/core/veyron2"
 	"v.io/core/veyron2/context"
 	"v.io/core/veyron2/ipc"
+	"v.io/core/veyron2/naming"
 	"v.io/core/veyron2/options"
 	"v.io/core/veyron2/security"
 	"v.io/core/veyron2/vdl/vdlroot/src/signature"
 	"v.io/core/veyron2/verror"
 	"v.io/core/veyron2/vlog"
+	"v.io/core/veyron2/vom"
 	"v.io/core/veyron2/vtrace"
 	"v.io/wspr/veyron/services/wsprd/ipc/server"
 	"v.io/wspr/veyron/services/wsprd/lib"
@@ -178,7 +182,10 @@
 		w.Error(err)
 		return
 	}
+	c.sendRPCResponse(ctx, w, results)
+}
 
+func (c *Controller) sendRPCResponse(ctx *context.T, w lib.ClientWriter, results []interface{}) {
 	// for now we assume last out argument is always error
 	if err, ok := results[len(results)-1].(error); ok {
 		// return the call Application error as is
@@ -196,18 +203,12 @@
 	}
 }
 
-func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC) (ipc.Call, error) {
+func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC, inArgs []interface{}) (ipc.Call, error) {
 	methodName := lib.UppercaseFirstCharacter(msg.Method)
 	retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
-
-	// Convert inArgs from []vdl.AnyRep to []interface{}
-	inArgs := make([]interface{}, len(msg.InArgs))
-	for i, inArg := range msg.InArgs {
-		inArgs[i] = interface{}(inArg)
-	}
 	clientCall, err := veyron2.GetClient(ctx).StartCall(ctx, msg.Name, methodName, inArgs, retryTimeoutOpt)
 	if err != nil {
-		return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, msg.InArgs, err)
+		return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, inArgs, err)
 	}
 
 	return clientCall, nil
@@ -309,7 +310,7 @@
 
 // SendVeyronRequest makes a veyron request for the given flowId.  If signal is non-nil, it will receive
 // the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, w lib.ClientWriter, stream *outstandingStream) {
+func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream) {
 	sig, err := c.getSignature(ctx, msg.Name)
 	if err != nil {
 		w.Error(err)
@@ -321,14 +322,14 @@
 		w.Error(fmt.Errorf("method %q not found in signature: %#v", methName, sig))
 		return
 	}
-	if len(methSig.InArgs) != len(msg.InArgs) {
+	if len(methSig.InArgs) != len(inArgs) {
 		w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, *msg))
 		return
 	}
 
 	// We have to make the start call synchronous so we can make sure that we populate
 	// the call map before we can Handle a recieve call.
-	call, err := c.startCall(ctx, w, msg)
+	call, err := c.startCall(ctx, w, msg, inArgs)
 	if err != nil {
 		w.Error(verror.Convert(verror.Internal, ctx, err))
 		return
@@ -349,13 +350,76 @@
 	c.Unlock()
 }
 
-// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
-func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
-	msg, err := c.parseVeyronRequest(data)
+// TODO(mattr): This is a very limited implementation of ServerCall,
+// but currently none of the methods the controller exports require
+// any of this context information.
+type localCall struct {
+	ctx  *context.T
+	vrpc *VeyronRPC
+	tags []interface{}
+}
+
+func (l *localCall) Send(interface{}) error                          { return nil }
+func (l *localCall) Recv(interface{}) error                          { return nil }
+func (l *localCall) Blessings() security.Blessings                   { return nil }
+func (l *localCall) Server() ipc.Server                              { return nil }
+func (l *localCall) Context() *context.T                             { return l.ctx }
+func (l *localCall) Timestamp() (t time.Time)                        { return }
+func (l *localCall) Method() string                                  { return l.vrpc.Method }
+func (l *localCall) MethodTags() []interface{}                       { return l.tags }
+func (l *localCall) Name() string                                    { return l.vrpc.Name }
+func (l *localCall) Suffix() string                                  { return "" }
+func (l *localCall) RemoteDischarges() map[string]security.Discharge { return nil }
+func (l *localCall) LocalPrincipal() security.Principal              { return nil }
+func (l *localCall) LocalBlessings() security.Blessings              { return nil }
+func (l *localCall) RemoteBlessings() security.Blessings             { return nil }
+func (l *localCall) LocalEndpoint() naming.Endpoint                  { return nil }
+func (l *localCall) RemoteEndpoint() naming.Endpoint                 { return nil }
+
+func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPC, decoder *vom.Decoder, w lib.ClientWriter) {
+	invoker, err := ipc.ReflectInvoker(ControllerServer(c))
 	if err != nil {
 		w.Error(verror.Convert(verror.Internal, ctx, err))
 		return
 	}
+	argptrs, tags, err := invoker.Prepare(msg.Method, int(msg.NumInArgs))
+	if err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+	for _, argptr := range argptrs {
+		if err := decoder.Decode(argptr); err != nil {
+			w.Error(verror.Convert(verror.Internal, ctx, err))
+			return
+		}
+	}
+	results, err := invoker.Invoke(msg.Method, &localCall{ctx, msg, tags}, argptrs)
+	if err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+	c.sendRPCResponse(ctx, w, results)
+}
+
+// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
+func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
+	binbytes, err := hex.DecodeString(data)
+	if err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+	decoder, err := vom.NewDecoder(bytes.NewReader(binbytes))
+	if err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+
+	var msg VeyronRPC
+	if err := decoder.Decode(&msg); err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+	vlog.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
 
 	var cctx *context.T
 	var cancel context.CancelFunc
@@ -369,6 +433,20 @@
 		cctx, cancel = context.WithTimeout(ctx, lib.JSToGoDuration(msg.Timeout))
 	}
 
+	// If this message is for an internal service, do a short-circuit dispatch here.
+	if msg.Name == "controller" {
+		c.handleInternalCall(ctx, &msg, decoder, w)
+		return
+	}
+
+	inArgs := make([]interface{}, msg.NumInArgs)
+	for i := range inArgs {
+		if err := decoder.Decode(&inArgs[i]); err != nil {
+			w.Error(err)
+			return
+		}
+	}
+
 	request := &outstandingRequest{
 		cancel: cancel,
 	}
@@ -382,7 +460,7 @@
 	}
 	c.Lock()
 	c.outstandingRequests[id] = request
-	go c.sendVeyronRequest(cctx, id, msg, w, request.stream)
+	go c.sendVeyronRequest(cctx, id, &msg, inArgs, w, request.stream)
 	c.Unlock()
 }
 
@@ -434,36 +512,18 @@
 	server.Stop()
 }
 
-func (c *Controller) serve(serveRequest serveRequest, w lib.ClientWriter) {
-	// Create a server for the pipe, if it does not exist already.
-	server, err := c.maybeCreateServer(serveRequest.ServerId)
-	if err != nil {
-		w.Error(verror.Convert(verror.Internal, nil, err))
-	}
-
-	vlog.VI(2).Infof("serving under name: %q", serveRequest.Name)
-
-	if err := server.Serve(serveRequest.Name); err != nil {
-		w.Error(verror.Convert(verror.Internal, nil, err))
-		return
-	}
-	// Send true to indicate the serve has succeeded.
-	if err := w.Send(lib.ResponseFinal, true); err != nil {
-		w.Error(verror.Convert(verror.Internal, nil, err))
-		return
-	}
-}
-
 // HandleServeRequest takes a request to serve a server, creates a server,
 // registers the provided services and sends true if everything succeeded.
-func (c *Controller) HandleServeRequest(data string, w lib.ClientWriter) {
-	// Decode the serve request which includes VDL, registered services and name
-	var serveRequest serveRequest
-	if err := json.Unmarshal([]byte(data), &serveRequest); err != nil {
-		w.Error(verror.Convert(verror.Internal, nil, err))
-		return
+func (c *Controller) Serve(ctx ipc.ServerContext, name string, serverId uint32) error {
+	server, err := c.maybeCreateServer(serverId)
+	if err != nil {
+		return verror.Convert(verror.Internal, nil, err)
 	}
-	c.serve(serveRequest, w)
+	vlog.VI(2).Infof("serving under name: %q", name)
+	if err := server.Serve(name); err != nil {
+		return verror.Convert(verror.Internal, nil, err)
+	}
+	return nil
 }
 
 // HandleLookupResponse handles the result of a Dispatcher.Lookup call that was
diff --git a/services/wsprd/app/app.vdl b/services/wsprd/app/app.vdl
index 610933f..f9629cb 100644
--- a/services/wsprd/app/app.vdl
+++ b/services/wsprd/app/app.vdl
@@ -9,7 +9,7 @@
 type VeyronRPC struct {
   Name        string
   Method      string
-  InArgs      []any
+  NumInArgs   int32
   NumOutArgs  int32
   IsStreaming bool
   Timeout     int64
diff --git a/services/wsprd/app/app.vdl.go b/services/wsprd/app/app.vdl.go
index 0c37f97..edbfab7 100644
--- a/services/wsprd/app/app.vdl.go
+++ b/services/wsprd/app/app.vdl.go
@@ -16,7 +16,7 @@
 type VeyronRPC struct {
 	Name        string
 	Method      string
-	InArgs      []vdl.AnyRep
+	NumInArgs   int32
 	NumOutArgs  int32
 	IsStreaming bool
 	Timeout     int64
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index 0182fef..6ae9d6d 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -1,6 +1,8 @@
 package app
 
 import (
+	"bytes"
+	"encoding/hex"
 	"fmt"
 	"reflect"
 	"testing"
@@ -14,6 +16,7 @@
 	"v.io/core/veyron2/vdl"
 	"v.io/core/veyron2/vdl/vdlroot/src/signature"
 	"v.io/core/veyron2/verror"
+	"v.io/core/veyron2/vom"
 	"v.io/wspr/veyron/services/wsprd/ipc/server"
 	"v.io/wspr/veyron/services/wsprd/lib"
 	"v.io/wspr/veyron/services/wsprd/lib/testwriter"
@@ -184,7 +187,7 @@
 
 type goServerTestCase struct {
 	method          string
-	inArgs          []vdl.AnyRep
+	inArgs          []interface{}
 	numOutArgs      int32
 	streamingInputs []interface{}
 	expectedStream  []lib.Response
@@ -230,11 +233,11 @@
 	request := VeyronRPC{
 		Name:        "/" + endpoint.String(),
 		Method:      test.method,
-		InArgs:      test.inArgs,
+		NumInArgs:   int32(len(test.inArgs)),
 		NumOutArgs:  test.numOutArgs,
 		IsStreaming: stream != nil,
 	}
-	controller.sendVeyronRequest(ctx, 0, &request, &writer, stream)
+	controller.sendVeyronRequest(ctx, 0, &request, test.inArgs, &writer, stream)
 
 	if err := testwriter.CheckResponses(&writer, test.expectedStream, test.expectedError); err != nil {
 		t.Error(err)
@@ -244,7 +247,7 @@
 func TestCallingGoServer(t *testing.T) {
 	runGoServerTestCase(t, goServerTestCase{
 		method:     "Add",
-		inArgs:     []vdl.AnyRep{2, 3},
+		inArgs:     []interface{}{2, 3},
 		numOutArgs: 1,
 		expectedStream: []lib.Response{
 			lib.Response{
@@ -258,7 +261,7 @@
 func TestCallingGoServerWithError(t *testing.T) {
 	runGoServerTestCase(t, goServerTestCase{
 		method:        "Divide",
-		inArgs:        []vdl.AnyRep{1, 0},
+		inArgs:        []interface{}{1, 0},
 		numOutArgs:    1,
 		expectedError: verror.New(verror.BadArg, nil, "div 0"),
 	})
@@ -305,6 +308,23 @@
 	proxyServer      *proxy.Proxy
 }
 
+func makeRequest(rpc VeyronRPC, args ...interface{}) (string, error) {
+	var buf bytes.Buffer
+	encoder, err := vom.NewBinaryEncoder(&buf)
+	if err != nil {
+		return "", err
+	}
+	if err := encoder.Encode(rpc); err != nil {
+		return "", err
+	}
+	for _, arg := range args {
+		if err := encoder.Encode(arg); err != nil {
+			return "", err
+		}
+	}
+	return hex.EncodeToString(buf.Bytes()), nil
+}
+
 func serveServer(ctx *context.T) (*runningTest, error) {
 	mounttableServer, endpoint, err := startMountTableServer(ctx)
 	if err != nil {
@@ -332,9 +352,14 @@
 
 	veyron2.GetNamespace(controller.Context()).SetRoots("/" + endpoint.String())
 
-	controller.serve(serveRequest{
-		Name: "adder",
-	}, &writer)
+	req, err := makeRequest(VeyronRPC{
+		Name:       "controller",
+		Method:     "Serve",
+		NumInArgs:  2,
+		NumOutArgs: 1,
+		Timeout:    20000000000,
+	}, "adder", 0)
+	controller.HandleVeyronRequest(ctx, 0, req, &writer)
 
 	return &runningTest{
 		controller, &writer, mounttableServer, proxyServer,
diff --git a/services/wsprd/app/controller.vdl b/services/wsprd/app/controller.vdl
new file mode 100644
index 0000000..8dcb760
--- /dev/null
+++ b/services/wsprd/app/controller.vdl
@@ -0,0 +1,5 @@
+package app
+
+type Controller interface {
+     Serve(name string, serverId uint32) error
+}
diff --git a/services/wsprd/app/controller.vdl.go b/services/wsprd/app/controller.vdl.go
new file mode 100644
index 0000000..73607db
--- /dev/null
+++ b/services/wsprd/app/controller.vdl.go
@@ -0,0 +1,131 @@
+// This file was auto-generated by the veyron vdl tool.
+// Source: controller.vdl
+
+package app
+
+import (
+	// VDL system imports
+	"v.io/core/veyron2"
+	"v.io/core/veyron2/context"
+	"v.io/core/veyron2/ipc"
+)
+
+// ControllerClientMethods is the client interface
+// containing Controller methods.
+type ControllerClientMethods interface {
+	Serve(ctx *context.T, name string, serverId uint32, opts ...ipc.CallOpt) error
+}
+
+// ControllerClientStub adds universal methods to ControllerClientMethods.
+type ControllerClientStub interface {
+	ControllerClientMethods
+	ipc.UniversalServiceMethods
+}
+
+// ControllerClient returns a client stub for Controller.
+func ControllerClient(name string, opts ...ipc.BindOpt) ControllerClientStub {
+	var client ipc.Client
+	for _, opt := range opts {
+		if clientOpt, ok := opt.(ipc.Client); ok {
+			client = clientOpt
+		}
+	}
+	return implControllerClientStub{name, client}
+}
+
+type implControllerClientStub struct {
+	name   string
+	client ipc.Client
+}
+
+func (c implControllerClientStub) c(ctx *context.T) ipc.Client {
+	if c.client != nil {
+		return c.client
+	}
+	return veyron2.GetClient(ctx)
+}
+
+func (c implControllerClientStub) Serve(ctx *context.T, i0 string, i1 uint32, opts ...ipc.CallOpt) (err error) {
+	var call ipc.Call
+	if call, err = c.c(ctx).StartCall(ctx, c.name, "Serve", []interface{}{i0, i1}, opts...); err != nil {
+		return
+	}
+	if ierr := call.Finish(&err); ierr != nil {
+		err = ierr
+	}
+	return
+}
+
+// ControllerServerMethods is the interface a server writer
+// implements for Controller.
+type ControllerServerMethods interface {
+	Serve(ctx ipc.ServerContext, name string, serverId uint32) error
+}
+
+// ControllerServerStubMethods is the server interface containing
+// Controller methods, as expected by ipc.Server.
+// There is no difference between this interface and ControllerServerMethods
+// since there are no streaming methods.
+type ControllerServerStubMethods ControllerServerMethods
+
+// ControllerServerStub adds universal methods to ControllerServerStubMethods.
+type ControllerServerStub interface {
+	ControllerServerStubMethods
+	// Describe the Controller interfaces.
+	Describe__() []ipc.InterfaceDesc
+}
+
+// ControllerServer returns a server stub for Controller.
+// It converts an implementation of ControllerServerMethods into
+// an object that may be used by ipc.Server.
+func ControllerServer(impl ControllerServerMethods) ControllerServerStub {
+	stub := implControllerServerStub{
+		impl: impl,
+	}
+	// Initialize GlobState; always check the stub itself first, to handle the
+	// case where the user has the Glob method defined in their VDL source.
+	if gs := ipc.NewGlobState(stub); gs != nil {
+		stub.gs = gs
+	} else if gs := ipc.NewGlobState(impl); gs != nil {
+		stub.gs = gs
+	}
+	return stub
+}
+
+type implControllerServerStub struct {
+	impl ControllerServerMethods
+	gs   *ipc.GlobState
+}
+
+func (s implControllerServerStub) Serve(ctx ipc.ServerContext, i0 string, i1 uint32) error {
+	return s.impl.Serve(ctx, i0, i1)
+}
+
+func (s implControllerServerStub) Globber() *ipc.GlobState {
+	return s.gs
+}
+
+func (s implControllerServerStub) Describe__() []ipc.InterfaceDesc {
+	return []ipc.InterfaceDesc{ControllerDesc}
+}
+
+// ControllerDesc describes the Controller interface.
+var ControllerDesc ipc.InterfaceDesc = descController
+
+// descController hides the desc to keep godoc clean.
+var descController = ipc.InterfaceDesc{
+	Name:    "Controller",
+	PkgPath: "v.io/wspr/veyron/services/wsprd/app",
+	Methods: []ipc.MethodDesc{
+		{
+			Name: "Serve",
+			InArgs: []ipc.ArgDesc{
+				{"name", ``},     // string
+				{"serverId", ``}, // uint32
+			},
+			OutArgs: []ipc.ArgDesc{
+				{"", ``}, // error
+			},
+		},
+	},
+}
diff --git a/services/wsprd/app/messaging.go b/services/wsprd/app/messaging.go
index dccca87..a8a84b3 100644
--- a/services/wsprd/app/messaging.go
+++ b/services/wsprd/app/messaging.go
@@ -108,8 +108,6 @@
 		c.SendOnStream(msg.Id, msg.Data, w)
 	case StreamCloseMessage:
 		c.CloseStream(msg.Id)
-	case ServeMessage:
-		go c.HandleServeRequest(msg.Data, w)
 	case StopServerMessage:
 		go c.HandleStopRequest(msg.Data, w)
 	case AddName:
diff --git a/services/wsprd/browspr/browspr_test.go b/services/wsprd/browspr/browspr_test.go
index 76ae9e2..c0a1600 100644
--- a/services/wsprd/browspr/browspr_test.go
+++ b/services/wsprd/browspr/browspr_test.go
@@ -1,6 +1,8 @@
 package browspr
 
 import (
+	"bytes"
+	"encoding/hex"
 	"encoding/json"
 	"reflect"
 	"strings"
@@ -12,7 +14,7 @@
 	"v.io/core/veyron2/ipc"
 	"v.io/core/veyron2/naming"
 	"v.io/core/veyron2/options"
-	"v.io/core/veyron2/vdl"
+	"v.io/core/veyron2/vom"
 
 	"v.io/core/veyron/lib/testutil"
 	_ "v.io/core/veyron/profiles"
@@ -177,16 +179,24 @@
 	rpc := app.VeyronRPC{
 		Name:        mockServerName,
 		Method:      "BasicCall",
-		InArgs:      []vdl.AnyRep{"InputValue"},
+		NumInArgs:   1,
 		NumOutArgs:  1,
 		IsStreaming: false,
 		Timeout:     (1 << 31) - 1,
 	}
 
-	vomRPC, err := lib.VomEncode(rpc)
+	var buf bytes.Buffer
+	encoder, err := vom.NewBinaryEncoder(&buf)
 	if err != nil {
 		t.Fatalf("Failed to vom encode rpc message: %v", err)
 	}
+	if err := encoder.Encode(rpc); err != nil {
+		t.Fatalf("Failed to vom encode rpc message: %v", err)
+	}
+	if err := encoder.Encode("InputValue"); err != nil {
+		t.Fatalf("Failed to vom encode rpc message: %v", err)
+	}
+	vomRPC := hex.EncodeToString(buf.Bytes())
 
 	msg, err := json.Marshal(app.Message{
 		Id:   1,