wspr: Treat WSPR as an RPC service.  Part 1 of many.

This series of CLs creates an internal RPC service for javascript apps to use.
In this CL we only create an RPC method for Serve (javascript creating a server
through wspr).  I am doing this to reduce the number of paths between wspr and
javascript to make it easier to implement cross cutting features like
deadlines, cancellation, and vtrace.

MultiPart: 2/2
Change-Id: I080074bb91178cb83c7fb8419236f9367911325d
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index e5513f3..97414a2 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -3,6 +3,8 @@
 package app
 
 import (
+	"bytes"
+	"encoding/hex"
 	"encoding/json"
 	"flag"
 	"fmt"
@@ -14,11 +16,13 @@
 	"v.io/core/veyron2"
 	"v.io/core/veyron2/context"
 	"v.io/core/veyron2/ipc"
+	"v.io/core/veyron2/naming"
 	"v.io/core/veyron2/options"
 	"v.io/core/veyron2/security"
 	"v.io/core/veyron2/vdl/vdlroot/src/signature"
 	"v.io/core/veyron2/verror"
 	"v.io/core/veyron2/vlog"
+	"v.io/core/veyron2/vom"
 	"v.io/core/veyron2/vtrace"
 	"v.io/wspr/veyron/services/wsprd/ipc/server"
 	"v.io/wspr/veyron/services/wsprd/lib"
@@ -178,7 +182,10 @@
 		w.Error(err)
 		return
 	}
+	c.sendRPCResponse(ctx, w, results)
+}
 
+func (c *Controller) sendRPCResponse(ctx *context.T, w lib.ClientWriter, results []interface{}) {
 	// for now we assume last out argument is always error
 	if err, ok := results[len(results)-1].(error); ok {
 		// return the call Application error as is
@@ -196,18 +203,12 @@
 	}
 }
 
-func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC) (ipc.Call, error) {
+func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC, inArgs []interface{}) (ipc.Call, error) {
 	methodName := lib.UppercaseFirstCharacter(msg.Method)
 	retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
-
-	// Convert inArgs from []vdl.AnyRep to []interface{}
-	inArgs := make([]interface{}, len(msg.InArgs))
-	for i, inArg := range msg.InArgs {
-		inArgs[i] = interface{}(inArg)
-	}
 	clientCall, err := veyron2.GetClient(ctx).StartCall(ctx, msg.Name, methodName, inArgs, retryTimeoutOpt)
 	if err != nil {
-		return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, msg.InArgs, err)
+		return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, inArgs, err)
 	}
 
 	return clientCall, nil
@@ -309,7 +310,7 @@
 
 // SendVeyronRequest makes a veyron request for the given flowId.  If signal is non-nil, it will receive
 // the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, w lib.ClientWriter, stream *outstandingStream) {
+func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream) {
 	sig, err := c.getSignature(ctx, msg.Name)
 	if err != nil {
 		w.Error(err)
@@ -321,14 +322,14 @@
 		w.Error(fmt.Errorf("method %q not found in signature: %#v", methName, sig))
 		return
 	}
-	if len(methSig.InArgs) != len(msg.InArgs) {
+	if len(methSig.InArgs) != len(inArgs) {
 		w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, *msg))
 		return
 	}
 
 	// We have to make the start call synchronous so we can make sure that we populate
 	// the call map before we can Handle a recieve call.
-	call, err := c.startCall(ctx, w, msg)
+	call, err := c.startCall(ctx, w, msg, inArgs)
 	if err != nil {
 		w.Error(verror.Convert(verror.Internal, ctx, err))
 		return
@@ -349,13 +350,76 @@
 	c.Unlock()
 }
 
-// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
-func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
-	msg, err := c.parseVeyronRequest(data)
+// TODO(mattr): This is a very limited implementation of ServerCall,
+// but currently none of the methods the controller exports require
+// any of this context information.
+type localCall struct {
+	ctx  *context.T
+	vrpc *VeyronRPC
+	tags []interface{}
+}
+
+func (l *localCall) Send(interface{}) error                          { return nil }
+func (l *localCall) Recv(interface{}) error                          { return nil }
+func (l *localCall) Blessings() security.Blessings                   { return nil }
+func (l *localCall) Server() ipc.Server                              { return nil }
+func (l *localCall) Context() *context.T                             { return l.ctx }
+func (l *localCall) Timestamp() (t time.Time)                        { return }
+func (l *localCall) Method() string                                  { return l.vrpc.Method }
+func (l *localCall) MethodTags() []interface{}                       { return l.tags }
+func (l *localCall) Name() string                                    { return l.vrpc.Name }
+func (l *localCall) Suffix() string                                  { return "" }
+func (l *localCall) RemoteDischarges() map[string]security.Discharge { return nil }
+func (l *localCall) LocalPrincipal() security.Principal              { return nil }
+func (l *localCall) LocalBlessings() security.Blessings              { return nil }
+func (l *localCall) RemoteBlessings() security.Blessings             { return nil }
+func (l *localCall) LocalEndpoint() naming.Endpoint                  { return nil }
+func (l *localCall) RemoteEndpoint() naming.Endpoint                 { return nil }
+
+func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPC, decoder *vom.Decoder, w lib.ClientWriter) {
+	invoker, err := ipc.ReflectInvoker(ControllerServer(c))
 	if err != nil {
 		w.Error(verror.Convert(verror.Internal, ctx, err))
 		return
 	}
+	argptrs, tags, err := invoker.Prepare(msg.Method, int(msg.NumInArgs))
+	if err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+	for _, argptr := range argptrs {
+		if err := decoder.Decode(argptr); err != nil {
+			w.Error(verror.Convert(verror.Internal, ctx, err))
+			return
+		}
+	}
+	results, err := invoker.Invoke(msg.Method, &localCall{ctx, msg, tags}, argptrs)
+	if err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+	c.sendRPCResponse(ctx, w, results)
+}
+
+// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
+func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
+	binbytes, err := hex.DecodeString(data)
+	if err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+	decoder, err := vom.NewDecoder(bytes.NewReader(binbytes))
+	if err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+
+	var msg VeyronRPC
+	if err := decoder.Decode(&msg); err != nil {
+		w.Error(verror.Convert(verror.Internal, ctx, err))
+		return
+	}
+	vlog.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
 
 	var cctx *context.T
 	var cancel context.CancelFunc
@@ -369,6 +433,20 @@
 		cctx, cancel = context.WithTimeout(ctx, lib.JSToGoDuration(msg.Timeout))
 	}
 
+	// If this message is for an internal service, do a short-circuit dispatch here.
+	if msg.Name == "controller" {
+		c.handleInternalCall(ctx, &msg, decoder, w)
+		return
+	}
+
+	inArgs := make([]interface{}, msg.NumInArgs)
+	for i := range inArgs {
+		if err := decoder.Decode(&inArgs[i]); err != nil {
+			w.Error(err)
+			return
+		}
+	}
+
 	request := &outstandingRequest{
 		cancel: cancel,
 	}
@@ -382,7 +460,7 @@
 	}
 	c.Lock()
 	c.outstandingRequests[id] = request
-	go c.sendVeyronRequest(cctx, id, msg, w, request.stream)
+	go c.sendVeyronRequest(cctx, id, &msg, inArgs, w, request.stream)
 	c.Unlock()
 }
 
@@ -434,36 +512,18 @@
 	server.Stop()
 }
 
-func (c *Controller) serve(serveRequest serveRequest, w lib.ClientWriter) {
-	// Create a server for the pipe, if it does not exist already.
-	server, err := c.maybeCreateServer(serveRequest.ServerId)
-	if err != nil {
-		w.Error(verror.Convert(verror.Internal, nil, err))
-	}
-
-	vlog.VI(2).Infof("serving under name: %q", serveRequest.Name)
-
-	if err := server.Serve(serveRequest.Name); err != nil {
-		w.Error(verror.Convert(verror.Internal, nil, err))
-		return
-	}
-	// Send true to indicate the serve has succeeded.
-	if err := w.Send(lib.ResponseFinal, true); err != nil {
-		w.Error(verror.Convert(verror.Internal, nil, err))
-		return
-	}
-}
-
 // HandleServeRequest takes a request to serve a server, creates a server,
 // registers the provided services and sends true if everything succeeded.
-func (c *Controller) HandleServeRequest(data string, w lib.ClientWriter) {
-	// Decode the serve request which includes VDL, registered services and name
-	var serveRequest serveRequest
-	if err := json.Unmarshal([]byte(data), &serveRequest); err != nil {
-		w.Error(verror.Convert(verror.Internal, nil, err))
-		return
+func (c *Controller) Serve(ctx ipc.ServerContext, name string, serverId uint32) error {
+	server, err := c.maybeCreateServer(serverId)
+	if err != nil {
+		return verror.Convert(verror.Internal, nil, err)
 	}
-	c.serve(serveRequest, w)
+	vlog.VI(2).Infof("serving under name: %q", name)
+	if err := server.Serve(name); err != nil {
+		return verror.Convert(verror.Internal, nil, err)
+	}
+	return nil
 }
 
 // HandleLookupResponse handles the result of a Dispatcher.Lookup call that was