wspr: Treat WSPR as an RPC service. Part 1 of many.
This series of CLs creates an internal RPC service for javascript apps to use.
In this CL we only create an RPC method for Serve (javascript creating a server
through wspr). I am doing this to reduce the number of paths between wspr and
javascript to make it easier to implement cross cutting features like
deadlines, cancellation, and vtrace.
MultiPart: 2/2
Change-Id: I080074bb91178cb83c7fb8419236f9367911325d
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index e5513f3..97414a2 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -3,6 +3,8 @@
package app
import (
+ "bytes"
+ "encoding/hex"
"encoding/json"
"flag"
"fmt"
@@ -14,11 +16,13 @@
"v.io/core/veyron2"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
+ "v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
"v.io/core/veyron2/vdl/vdlroot/src/signature"
"v.io/core/veyron2/verror"
"v.io/core/veyron2/vlog"
+ "v.io/core/veyron2/vom"
"v.io/core/veyron2/vtrace"
"v.io/wspr/veyron/services/wsprd/ipc/server"
"v.io/wspr/veyron/services/wsprd/lib"
@@ -178,7 +182,10 @@
w.Error(err)
return
}
+ c.sendRPCResponse(ctx, w, results)
+}
+func (c *Controller) sendRPCResponse(ctx *context.T, w lib.ClientWriter, results []interface{}) {
// for now we assume last out argument is always error
if err, ok := results[len(results)-1].(error); ok {
// return the call Application error as is
@@ -196,18 +203,12 @@
}
}
-func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC) (ipc.Call, error) {
+func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC, inArgs []interface{}) (ipc.Call, error) {
methodName := lib.UppercaseFirstCharacter(msg.Method)
retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
-
- // Convert inArgs from []vdl.AnyRep to []interface{}
- inArgs := make([]interface{}, len(msg.InArgs))
- for i, inArg := range msg.InArgs {
- inArgs[i] = interface{}(inArg)
- }
clientCall, err := veyron2.GetClient(ctx).StartCall(ctx, msg.Name, methodName, inArgs, retryTimeoutOpt)
if err != nil {
- return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, msg.InArgs, err)
+ return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, inArgs, err)
}
return clientCall, nil
@@ -309,7 +310,7 @@
// SendVeyronRequest makes a veyron request for the given flowId. If signal is non-nil, it will receive
// the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, w lib.ClientWriter, stream *outstandingStream) {
+func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream) {
sig, err := c.getSignature(ctx, msg.Name)
if err != nil {
w.Error(err)
@@ -321,14 +322,14 @@
w.Error(fmt.Errorf("method %q not found in signature: %#v", methName, sig))
return
}
- if len(methSig.InArgs) != len(msg.InArgs) {
+ if len(methSig.InArgs) != len(inArgs) {
w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, *msg))
return
}
// We have to make the start call synchronous so we can make sure that we populate
// the call map before we can Handle a recieve call.
- call, err := c.startCall(ctx, w, msg)
+ call, err := c.startCall(ctx, w, msg, inArgs)
if err != nil {
w.Error(verror.Convert(verror.Internal, ctx, err))
return
@@ -349,13 +350,76 @@
c.Unlock()
}
-// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
-func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
- msg, err := c.parseVeyronRequest(data)
+// TODO(mattr): This is a very limited implementation of ServerCall,
+// but currently none of the methods the controller exports require
+// any of this context information.
+type localCall struct {
+ ctx *context.T
+ vrpc *VeyronRPC
+ tags []interface{}
+}
+
+func (l *localCall) Send(interface{}) error { return nil }
+func (l *localCall) Recv(interface{}) error { return nil }
+func (l *localCall) Blessings() security.Blessings { return nil }
+func (l *localCall) Server() ipc.Server { return nil }
+func (l *localCall) Context() *context.T { return l.ctx }
+func (l *localCall) Timestamp() (t time.Time) { return }
+func (l *localCall) Method() string { return l.vrpc.Method }
+func (l *localCall) MethodTags() []interface{} { return l.tags }
+func (l *localCall) Name() string { return l.vrpc.Name }
+func (l *localCall) Suffix() string { return "" }
+func (l *localCall) RemoteDischarges() map[string]security.Discharge { return nil }
+func (l *localCall) LocalPrincipal() security.Principal { return nil }
+func (l *localCall) LocalBlessings() security.Blessings { return nil }
+func (l *localCall) RemoteBlessings() security.Blessings { return nil }
+func (l *localCall) LocalEndpoint() naming.Endpoint { return nil }
+func (l *localCall) RemoteEndpoint() naming.Endpoint { return nil }
+
+func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPC, decoder *vom.Decoder, w lib.ClientWriter) {
+ invoker, err := ipc.ReflectInvoker(ControllerServer(c))
if err != nil {
w.Error(verror.Convert(verror.Internal, ctx, err))
return
}
+ argptrs, tags, err := invoker.Prepare(msg.Method, int(msg.NumInArgs))
+ if err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ for _, argptr := range argptrs {
+ if err := decoder.Decode(argptr); err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ }
+ results, err := invoker.Invoke(msg.Method, &localCall{ctx, msg, tags}, argptrs)
+ if err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ c.sendRPCResponse(ctx, w, results)
+}
+
+// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
+func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
+ binbytes, err := hex.DecodeString(data)
+ if err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ decoder, err := vom.NewDecoder(bytes.NewReader(binbytes))
+ if err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+
+ var msg VeyronRPC
+ if err := decoder.Decode(&msg); err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ vlog.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
var cctx *context.T
var cancel context.CancelFunc
@@ -369,6 +433,20 @@
cctx, cancel = context.WithTimeout(ctx, lib.JSToGoDuration(msg.Timeout))
}
+ // If this message is for an internal service, do a short-circuit dispatch here.
+ if msg.Name == "controller" {
+ c.handleInternalCall(ctx, &msg, decoder, w)
+ return
+ }
+
+ inArgs := make([]interface{}, msg.NumInArgs)
+ for i := range inArgs {
+ if err := decoder.Decode(&inArgs[i]); err != nil {
+ w.Error(err)
+ return
+ }
+ }
+
request := &outstandingRequest{
cancel: cancel,
}
@@ -382,7 +460,7 @@
}
c.Lock()
c.outstandingRequests[id] = request
- go c.sendVeyronRequest(cctx, id, msg, w, request.stream)
+ go c.sendVeyronRequest(cctx, id, &msg, inArgs, w, request.stream)
c.Unlock()
}
@@ -434,36 +512,18 @@
server.Stop()
}
-func (c *Controller) serve(serveRequest serveRequest, w lib.ClientWriter) {
- // Create a server for the pipe, if it does not exist already.
- server, err := c.maybeCreateServer(serveRequest.ServerId)
- if err != nil {
- w.Error(verror.Convert(verror.Internal, nil, err))
- }
-
- vlog.VI(2).Infof("serving under name: %q", serveRequest.Name)
-
- if err := server.Serve(serveRequest.Name); err != nil {
- w.Error(verror.Convert(verror.Internal, nil, err))
- return
- }
- // Send true to indicate the serve has succeeded.
- if err := w.Send(lib.ResponseFinal, true); err != nil {
- w.Error(verror.Convert(verror.Internal, nil, err))
- return
- }
-}
-
// HandleServeRequest takes a request to serve a server, creates a server,
// registers the provided services and sends true if everything succeeded.
-func (c *Controller) HandleServeRequest(data string, w lib.ClientWriter) {
- // Decode the serve request which includes VDL, registered services and name
- var serveRequest serveRequest
- if err := json.Unmarshal([]byte(data), &serveRequest); err != nil {
- w.Error(verror.Convert(verror.Internal, nil, err))
- return
+func (c *Controller) Serve(ctx ipc.ServerContext, name string, serverId uint32) error {
+ server, err := c.maybeCreateServer(serverId)
+ if err != nil {
+ return verror.Convert(verror.Internal, nil, err)
}
- c.serve(serveRequest, w)
+ vlog.VI(2).Infof("serving under name: %q", name)
+ if err := server.Serve(name); err != nil {
+ return verror.Convert(verror.Internal, nil, err)
+ }
+ return nil
}
// HandleLookupResponse handles the result of a Dispatcher.Lookup call that was