wspr: Treat WSPR as an RPC service. Part 1 of many.
This series of CLs creates an internal RPC service for javascript apps to use.
In this CL we only create an RPC method for Serve (javascript creating a server
through wspr). I am doing this to reduce the number of paths between wspr and
javascript to make it easier to implement cross cutting features like
deadlines, cancellation, and vtrace.
MultiPart: 2/2
Change-Id: I080074bb91178cb83c7fb8419236f9367911325d
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index e5513f3..97414a2 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -3,6 +3,8 @@
package app
import (
+ "bytes"
+ "encoding/hex"
"encoding/json"
"flag"
"fmt"
@@ -14,11 +16,13 @@
"v.io/core/veyron2"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
+ "v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
"v.io/core/veyron2/vdl/vdlroot/src/signature"
"v.io/core/veyron2/verror"
"v.io/core/veyron2/vlog"
+ "v.io/core/veyron2/vom"
"v.io/core/veyron2/vtrace"
"v.io/wspr/veyron/services/wsprd/ipc/server"
"v.io/wspr/veyron/services/wsprd/lib"
@@ -178,7 +182,10 @@
w.Error(err)
return
}
+ c.sendRPCResponse(ctx, w, results)
+}
+func (c *Controller) sendRPCResponse(ctx *context.T, w lib.ClientWriter, results []interface{}) {
// for now we assume last out argument is always error
if err, ok := results[len(results)-1].(error); ok {
// return the call Application error as is
@@ -196,18 +203,12 @@
}
}
-func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC) (ipc.Call, error) {
+func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC, inArgs []interface{}) (ipc.Call, error) {
methodName := lib.UppercaseFirstCharacter(msg.Method)
retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
-
- // Convert inArgs from []vdl.AnyRep to []interface{}
- inArgs := make([]interface{}, len(msg.InArgs))
- for i, inArg := range msg.InArgs {
- inArgs[i] = interface{}(inArg)
- }
clientCall, err := veyron2.GetClient(ctx).StartCall(ctx, msg.Name, methodName, inArgs, retryTimeoutOpt)
if err != nil {
- return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, msg.InArgs, err)
+ return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, inArgs, err)
}
return clientCall, nil
@@ -309,7 +310,7 @@
// SendVeyronRequest makes a veyron request for the given flowId. If signal is non-nil, it will receive
// the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, w lib.ClientWriter, stream *outstandingStream) {
+func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream) {
sig, err := c.getSignature(ctx, msg.Name)
if err != nil {
w.Error(err)
@@ -321,14 +322,14 @@
w.Error(fmt.Errorf("method %q not found in signature: %#v", methName, sig))
return
}
- if len(methSig.InArgs) != len(msg.InArgs) {
+ if len(methSig.InArgs) != len(inArgs) {
w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, *msg))
return
}
// We have to make the start call synchronous so we can make sure that we populate
// the call map before we can Handle a recieve call.
- call, err := c.startCall(ctx, w, msg)
+ call, err := c.startCall(ctx, w, msg, inArgs)
if err != nil {
w.Error(verror.Convert(verror.Internal, ctx, err))
return
@@ -349,13 +350,76 @@
c.Unlock()
}
-// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
-func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
- msg, err := c.parseVeyronRequest(data)
+// TODO(mattr): This is a very limited implementation of ServerCall,
+// but currently none of the methods the controller exports require
+// any of this context information.
+type localCall struct {
+ ctx *context.T
+ vrpc *VeyronRPC
+ tags []interface{}
+}
+
+func (l *localCall) Send(interface{}) error { return nil }
+func (l *localCall) Recv(interface{}) error { return nil }
+func (l *localCall) Blessings() security.Blessings { return nil }
+func (l *localCall) Server() ipc.Server { return nil }
+func (l *localCall) Context() *context.T { return l.ctx }
+func (l *localCall) Timestamp() (t time.Time) { return }
+func (l *localCall) Method() string { return l.vrpc.Method }
+func (l *localCall) MethodTags() []interface{} { return l.tags }
+func (l *localCall) Name() string { return l.vrpc.Name }
+func (l *localCall) Suffix() string { return "" }
+func (l *localCall) RemoteDischarges() map[string]security.Discharge { return nil }
+func (l *localCall) LocalPrincipal() security.Principal { return nil }
+func (l *localCall) LocalBlessings() security.Blessings { return nil }
+func (l *localCall) RemoteBlessings() security.Blessings { return nil }
+func (l *localCall) LocalEndpoint() naming.Endpoint { return nil }
+func (l *localCall) RemoteEndpoint() naming.Endpoint { return nil }
+
+func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPC, decoder *vom.Decoder, w lib.ClientWriter) {
+ invoker, err := ipc.ReflectInvoker(ControllerServer(c))
if err != nil {
w.Error(verror.Convert(verror.Internal, ctx, err))
return
}
+ argptrs, tags, err := invoker.Prepare(msg.Method, int(msg.NumInArgs))
+ if err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ for _, argptr := range argptrs {
+ if err := decoder.Decode(argptr); err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ }
+ results, err := invoker.Invoke(msg.Method, &localCall{ctx, msg, tags}, argptrs)
+ if err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ c.sendRPCResponse(ctx, w, results)
+}
+
+// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
+func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
+ binbytes, err := hex.DecodeString(data)
+ if err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ decoder, err := vom.NewDecoder(bytes.NewReader(binbytes))
+ if err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+
+ var msg VeyronRPC
+ if err := decoder.Decode(&msg); err != nil {
+ w.Error(verror.Convert(verror.Internal, ctx, err))
+ return
+ }
+ vlog.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
var cctx *context.T
var cancel context.CancelFunc
@@ -369,6 +433,20 @@
cctx, cancel = context.WithTimeout(ctx, lib.JSToGoDuration(msg.Timeout))
}
+ // If this message is for an internal service, do a short-circuit dispatch here.
+ if msg.Name == "controller" {
+ c.handleInternalCall(ctx, &msg, decoder, w)
+ return
+ }
+
+ inArgs := make([]interface{}, msg.NumInArgs)
+ for i := range inArgs {
+ if err := decoder.Decode(&inArgs[i]); err != nil {
+ w.Error(err)
+ return
+ }
+ }
+
request := &outstandingRequest{
cancel: cancel,
}
@@ -382,7 +460,7 @@
}
c.Lock()
c.outstandingRequests[id] = request
- go c.sendVeyronRequest(cctx, id, msg, w, request.stream)
+ go c.sendVeyronRequest(cctx, id, &msg, inArgs, w, request.stream)
c.Unlock()
}
@@ -434,36 +512,18 @@
server.Stop()
}
-func (c *Controller) serve(serveRequest serveRequest, w lib.ClientWriter) {
- // Create a server for the pipe, if it does not exist already.
- server, err := c.maybeCreateServer(serveRequest.ServerId)
- if err != nil {
- w.Error(verror.Convert(verror.Internal, nil, err))
- }
-
- vlog.VI(2).Infof("serving under name: %q", serveRequest.Name)
-
- if err := server.Serve(serveRequest.Name); err != nil {
- w.Error(verror.Convert(verror.Internal, nil, err))
- return
- }
- // Send true to indicate the serve has succeeded.
- if err := w.Send(lib.ResponseFinal, true); err != nil {
- w.Error(verror.Convert(verror.Internal, nil, err))
- return
- }
-}
-
// HandleServeRequest takes a request to serve a server, creates a server,
// registers the provided services and sends true if everything succeeded.
-func (c *Controller) HandleServeRequest(data string, w lib.ClientWriter) {
- // Decode the serve request which includes VDL, registered services and name
- var serveRequest serveRequest
- if err := json.Unmarshal([]byte(data), &serveRequest); err != nil {
- w.Error(verror.Convert(verror.Internal, nil, err))
- return
+func (c *Controller) Serve(ctx ipc.ServerContext, name string, serverId uint32) error {
+ server, err := c.maybeCreateServer(serverId)
+ if err != nil {
+ return verror.Convert(verror.Internal, nil, err)
}
- c.serve(serveRequest, w)
+ vlog.VI(2).Infof("serving under name: %q", name)
+ if err := server.Serve(name); err != nil {
+ return verror.Convert(verror.Internal, nil, err)
+ }
+ return nil
}
// HandleLookupResponse handles the result of a Dispatcher.Lookup call that was
diff --git a/services/wsprd/app/app.vdl b/services/wsprd/app/app.vdl
index 610933f..f9629cb 100644
--- a/services/wsprd/app/app.vdl
+++ b/services/wsprd/app/app.vdl
@@ -9,7 +9,7 @@
type VeyronRPC struct {
Name string
Method string
- InArgs []any
+ NumInArgs int32
NumOutArgs int32
IsStreaming bool
Timeout int64
diff --git a/services/wsprd/app/app.vdl.go b/services/wsprd/app/app.vdl.go
index 0c37f97..edbfab7 100644
--- a/services/wsprd/app/app.vdl.go
+++ b/services/wsprd/app/app.vdl.go
@@ -16,7 +16,7 @@
type VeyronRPC struct {
Name string
Method string
- InArgs []vdl.AnyRep
+ NumInArgs int32
NumOutArgs int32
IsStreaming bool
Timeout int64
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index 0182fef..6ae9d6d 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -1,6 +1,8 @@
package app
import (
+ "bytes"
+ "encoding/hex"
"fmt"
"reflect"
"testing"
@@ -14,6 +16,7 @@
"v.io/core/veyron2/vdl"
"v.io/core/veyron2/vdl/vdlroot/src/signature"
"v.io/core/veyron2/verror"
+ "v.io/core/veyron2/vom"
"v.io/wspr/veyron/services/wsprd/ipc/server"
"v.io/wspr/veyron/services/wsprd/lib"
"v.io/wspr/veyron/services/wsprd/lib/testwriter"
@@ -184,7 +187,7 @@
type goServerTestCase struct {
method string
- inArgs []vdl.AnyRep
+ inArgs []interface{}
numOutArgs int32
streamingInputs []interface{}
expectedStream []lib.Response
@@ -230,11 +233,11 @@
request := VeyronRPC{
Name: "/" + endpoint.String(),
Method: test.method,
- InArgs: test.inArgs,
+ NumInArgs: int32(len(test.inArgs)),
NumOutArgs: test.numOutArgs,
IsStreaming: stream != nil,
}
- controller.sendVeyronRequest(ctx, 0, &request, &writer, stream)
+ controller.sendVeyronRequest(ctx, 0, &request, test.inArgs, &writer, stream)
if err := testwriter.CheckResponses(&writer, test.expectedStream, test.expectedError); err != nil {
t.Error(err)
@@ -244,7 +247,7 @@
func TestCallingGoServer(t *testing.T) {
runGoServerTestCase(t, goServerTestCase{
method: "Add",
- inArgs: []vdl.AnyRep{2, 3},
+ inArgs: []interface{}{2, 3},
numOutArgs: 1,
expectedStream: []lib.Response{
lib.Response{
@@ -258,7 +261,7 @@
func TestCallingGoServerWithError(t *testing.T) {
runGoServerTestCase(t, goServerTestCase{
method: "Divide",
- inArgs: []vdl.AnyRep{1, 0},
+ inArgs: []interface{}{1, 0},
numOutArgs: 1,
expectedError: verror.New(verror.BadArg, nil, "div 0"),
})
@@ -305,6 +308,23 @@
proxyServer *proxy.Proxy
}
+func makeRequest(rpc VeyronRPC, args ...interface{}) (string, error) {
+ var buf bytes.Buffer
+ encoder, err := vom.NewBinaryEncoder(&buf)
+ if err != nil {
+ return "", err
+ }
+ if err := encoder.Encode(rpc); err != nil {
+ return "", err
+ }
+ for _, arg := range args {
+ if err := encoder.Encode(arg); err != nil {
+ return "", err
+ }
+ }
+ return hex.EncodeToString(buf.Bytes()), nil
+}
+
func serveServer(ctx *context.T) (*runningTest, error) {
mounttableServer, endpoint, err := startMountTableServer(ctx)
if err != nil {
@@ -332,9 +352,14 @@
veyron2.GetNamespace(controller.Context()).SetRoots("/" + endpoint.String())
- controller.serve(serveRequest{
- Name: "adder",
- }, &writer)
+ req, err := makeRequest(VeyronRPC{
+ Name: "controller",
+ Method: "Serve",
+ NumInArgs: 2,
+ NumOutArgs: 1,
+ Timeout: 20000000000,
+ }, "adder", 0)
+ controller.HandleVeyronRequest(ctx, 0, req, &writer)
return &runningTest{
controller, &writer, mounttableServer, proxyServer,
diff --git a/services/wsprd/app/controller.vdl b/services/wsprd/app/controller.vdl
new file mode 100644
index 0000000..8dcb760
--- /dev/null
+++ b/services/wsprd/app/controller.vdl
@@ -0,0 +1,5 @@
+package app
+
+type Controller interface {
+ Serve(name string, serverId uint32) error
+}
diff --git a/services/wsprd/app/controller.vdl.go b/services/wsprd/app/controller.vdl.go
new file mode 100644
index 0000000..73607db
--- /dev/null
+++ b/services/wsprd/app/controller.vdl.go
@@ -0,0 +1,131 @@
+// This file was auto-generated by the veyron vdl tool.
+// Source: controller.vdl
+
+package app
+
+import (
+ // VDL system imports
+ "v.io/core/veyron2"
+ "v.io/core/veyron2/context"
+ "v.io/core/veyron2/ipc"
+)
+
+// ControllerClientMethods is the client interface
+// containing Controller methods.
+type ControllerClientMethods interface {
+ Serve(ctx *context.T, name string, serverId uint32, opts ...ipc.CallOpt) error
+}
+
+// ControllerClientStub adds universal methods to ControllerClientMethods.
+type ControllerClientStub interface {
+ ControllerClientMethods
+ ipc.UniversalServiceMethods
+}
+
+// ControllerClient returns a client stub for Controller.
+func ControllerClient(name string, opts ...ipc.BindOpt) ControllerClientStub {
+ var client ipc.Client
+ for _, opt := range opts {
+ if clientOpt, ok := opt.(ipc.Client); ok {
+ client = clientOpt
+ }
+ }
+ return implControllerClientStub{name, client}
+}
+
+type implControllerClientStub struct {
+ name string
+ client ipc.Client
+}
+
+func (c implControllerClientStub) c(ctx *context.T) ipc.Client {
+ if c.client != nil {
+ return c.client
+ }
+ return veyron2.GetClient(ctx)
+}
+
+func (c implControllerClientStub) Serve(ctx *context.T, i0 string, i1 uint32, opts ...ipc.CallOpt) (err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "Serve", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+// ControllerServerMethods is the interface a server writer
+// implements for Controller.
+type ControllerServerMethods interface {
+ Serve(ctx ipc.ServerContext, name string, serverId uint32) error
+}
+
+// ControllerServerStubMethods is the server interface containing
+// Controller methods, as expected by ipc.Server.
+// There is no difference between this interface and ControllerServerMethods
+// since there are no streaming methods.
+type ControllerServerStubMethods ControllerServerMethods
+
+// ControllerServerStub adds universal methods to ControllerServerStubMethods.
+type ControllerServerStub interface {
+ ControllerServerStubMethods
+ // Describe the Controller interfaces.
+ Describe__() []ipc.InterfaceDesc
+}
+
+// ControllerServer returns a server stub for Controller.
+// It converts an implementation of ControllerServerMethods into
+// an object that may be used by ipc.Server.
+func ControllerServer(impl ControllerServerMethods) ControllerServerStub {
+ stub := implControllerServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := ipc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := ipc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implControllerServerStub struct {
+ impl ControllerServerMethods
+ gs *ipc.GlobState
+}
+
+func (s implControllerServerStub) Serve(ctx ipc.ServerContext, i0 string, i1 uint32) error {
+ return s.impl.Serve(ctx, i0, i1)
+}
+
+func (s implControllerServerStub) Globber() *ipc.GlobState {
+ return s.gs
+}
+
+func (s implControllerServerStub) Describe__() []ipc.InterfaceDesc {
+ return []ipc.InterfaceDesc{ControllerDesc}
+}
+
+// ControllerDesc describes the Controller interface.
+var ControllerDesc ipc.InterfaceDesc = descController
+
+// descController hides the desc to keep godoc clean.
+var descController = ipc.InterfaceDesc{
+ Name: "Controller",
+ PkgPath: "v.io/wspr/veyron/services/wsprd/app",
+ Methods: []ipc.MethodDesc{
+ {
+ Name: "Serve",
+ InArgs: []ipc.ArgDesc{
+ {"name", ``}, // string
+ {"serverId", ``}, // uint32
+ },
+ OutArgs: []ipc.ArgDesc{
+ {"", ``}, // error
+ },
+ },
+ },
+}
diff --git a/services/wsprd/app/messaging.go b/services/wsprd/app/messaging.go
index dccca87..a8a84b3 100644
--- a/services/wsprd/app/messaging.go
+++ b/services/wsprd/app/messaging.go
@@ -108,8 +108,6 @@
c.SendOnStream(msg.Id, msg.Data, w)
case StreamCloseMessage:
c.CloseStream(msg.Id)
- case ServeMessage:
- go c.HandleServeRequest(msg.Data, w)
case StopServerMessage:
go c.HandleStopRequest(msg.Data, w)
case AddName:
diff --git a/services/wsprd/browspr/browspr_test.go b/services/wsprd/browspr/browspr_test.go
index 76ae9e2..c0a1600 100644
--- a/services/wsprd/browspr/browspr_test.go
+++ b/services/wsprd/browspr/browspr_test.go
@@ -1,6 +1,8 @@
package browspr
import (
+ "bytes"
+ "encoding/hex"
"encoding/json"
"reflect"
"strings"
@@ -12,7 +14,7 @@
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
- "v.io/core/veyron2/vdl"
+ "v.io/core/veyron2/vom"
"v.io/core/veyron/lib/testutil"
_ "v.io/core/veyron/profiles"
@@ -177,16 +179,24 @@
rpc := app.VeyronRPC{
Name: mockServerName,
Method: "BasicCall",
- InArgs: []vdl.AnyRep{"InputValue"},
+ NumInArgs: 1,
NumOutArgs: 1,
IsStreaming: false,
Timeout: (1 << 31) - 1,
}
- vomRPC, err := lib.VomEncode(rpc)
+ var buf bytes.Buffer
+ encoder, err := vom.NewBinaryEncoder(&buf)
if err != nil {
t.Fatalf("Failed to vom encode rpc message: %v", err)
}
+ if err := encoder.Encode(rpc); err != nil {
+ t.Fatalf("Failed to vom encode rpc message: %v", err)
+ }
+ if err := encoder.Encode("InputValue"); err != nil {
+ t.Fatalf("Failed to vom encode rpc message: %v", err)
+ }
+ vomRPC := hex.EncodeToString(buf.Bytes())
msg, err := json.Marshal(app.Message{
Id: 1,