WSPR VOM2 + Signature
All tests pass.
MultiPart: 3/4
Change-Id: I34647d2df9e876de084d20eaf09ebb14cf3bd1f3
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 3322afb..44c58ff 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -3,8 +3,6 @@
package app
import (
- "bytes"
- "encoding/hex"
"encoding/json"
"flag"
"fmt"
@@ -19,17 +17,13 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/rt"
"veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
"veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
- "veyron.io/veyron/veyron2/vom"
- vom_wiretype "veyron.io/veyron/veyron2/vom/wiretype"
- "veyron.io/veyron/veyron2/vom2"
- wiretype_build "veyron.io/veyron/veyron2/wiretype/build"
"veyron.io/wspr/veyron/services/wsprd/ipc/server"
"veyron.io/wspr/veyron/services/wsprd/lib"
"veyron.io/wspr/veyron/services/wsprd/namespace"
"veyron.io/wspr/veyron/services/wsprd/principal"
- "veyron.io/wspr/veyron/services/wsprd/signature"
)
// pkgPath is the prefix os errors in this package.
@@ -39,7 +33,6 @@
var (
marshallingError = verror2.Register(pkgPath+".marshallingError", verror2.NoRetry, "{1} {2} marshalling error {_}")
noResults = verror2.Register(pkgPath+".noResults", verror2.NoRetry, "{1} {2} no results from call {_}")
- signatureError = verror2.Register(pkgPath+".signatureError", verror2.NoRetry, "{1} {2} signature error {_}")
badCaveatType = verror2.Register(pkgPath+".badCaveatType", verror2.NoRetry, "{1} {2} bad caveat type {_}")
unknownBlessings = verror2.Register(pkgPath+".unknownBlessings", verror2.NoRetry, "{1} {2} unknown public id {_}")
invalidBlessingsHandle = verror2.Register(pkgPath+".invalidBlessingsHandle", verror2.NoRetry, "{1} {2} invalid blessings handle {_}")
@@ -57,22 +50,13 @@
retryTimeout = flag.Int("retry-timeout", 2, "Duration in seconds to retry starting an RPC call. 0 means never retry.")
}
-// Temporary holder of RPC so that we can store the unprocessed args.
-type veyronTempRPC struct {
- Name string
- Method string
- InArgs []json.RawMessage
- NumOutArgs int32
- IsStreaming bool
- Timeout int64
-}
-
-type veyronRPC struct {
+type VeyronRPC struct {
Name string
Method string
InArgs []interface{}
NumOutArgs int32
IsStreaming bool
+ Timeout int64
}
type serveRequest struct {
@@ -182,7 +166,7 @@
}
// finishCall waits for the call to finish and write out the response to w.
-func (c *Controller) finishCall(ctx context.T, w lib.ClientWriter, clientCall ipc.Call, msg *veyronRPC) {
+func (c *Controller) finishCall(ctx context.T, w lib.ClientWriter, clientCall ipc.Call, msg *VeyronRPC) {
if msg.IsStreaming {
for {
var item interface{}
@@ -193,24 +177,15 @@
w.Error(err) // Send streaming error as is
return
}
-
- var buf bytes.Buffer
- encoder, err := vom2.NewBinaryEncoder(&buf)
+ vomItem, err := lib.VomEncode(item)
if err != nil {
- w.Error(verror2.Make(marshallingError, ctx, item))
+ w.Error(verror2.Make(marshallingError, ctx, item, err))
continue
}
-
- if err := encoder.Encode(item); err != nil {
- w.Error(verror2.Make(marshallingError, ctx, item))
- continue
- }
-
- if err := w.Send(lib.ResponseStream, hex.EncodeToString(buf.Bytes())); err != nil {
+ if err := w.Send(lib.ResponseStream, vomItem); err != nil {
w.Error(verror2.Make(marshallingError, ctx, item))
}
}
-
if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
w.Error(verror2.Make(marshallingError, ctx, "ResponseStreamClose"))
}
@@ -227,36 +202,29 @@
w.Error(err)
return
}
+
// for now we assume last out argument is always error
if len(results) < 1 {
w.Error(verror2.Make(noResults, ctx))
return
}
-
if err, ok := results[len(results)-1].(error); ok {
// return the call Application error as is
w.Error(err)
return
}
- var buf bytes.Buffer
- encoder, err := vom2.NewBinaryEncoder(&buf)
+ vomResults, err := lib.VomEncode(results[:len(results)-1])
if err != nil {
w.Error(err)
return
}
-
- if err := encoder.Encode(results[:len(results)-1]); err != nil {
- w.Error(err)
- return
- }
-
- if err := w.Send(lib.ResponseFinal, hex.EncodeToString(buf.Bytes())); err != nil {
+ if err := w.Send(lib.ResponseFinal, vomResults); err != nil {
w.Error(verror2.Convert(marshallingError, ctx, err))
}
}
-func (c *Controller) startCall(ctx context.T, w lib.ClientWriter, msg *veyronRPC) (ipc.Call, error) {
+func (c *Controller) startCall(ctx context.T, w lib.ClientWriter, msg *VeyronRPC) (ipc.Call, error) {
if c.client == nil {
return nil, verror2.Make(verror2.BadArg, ctx, "app.Controller.client")
}
@@ -281,7 +249,7 @@
c.lastGeneratedId += 2
c.flowMap[id] = s
os := newStream()
- os.init(stream, vom_wiretype.Type{ID: 1})
+ os.init(stream)
c.outstandingRequests[id] = &outstandingRequest{
stream: os,
}
@@ -364,67 +332,36 @@
// SendVeyronRequest makes a veyron request for the given flowId. If signal is non-nil, it will receive
// the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx context.T, id int64, tempMsg *veyronTempRPC, w lib.ClientWriter, stream *outstandingStream) {
- // Fetch and adapt signature from the SignatureManager
- retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
- sig, err := c.signatureManager.Signature(ctx, tempMsg.Name, c.client, retryTimeoutOpt)
+func (c *Controller) sendVeyronRequest(ctx context.T, id int64, msg *VeyronRPC, w lib.ClientWriter, stream *outstandingStream) {
+ sig, err := c.getSignature(ctx, msg.Name)
if err != nil {
- w.Error(verror2.Make(signatureError, ctx, tempMsg.Name, err))
+ w.Error(err)
return
}
- methName := lib.UppercaseFirstCharacter(tempMsg.Method)
- methSig, ok := sig.Methods[methName]
+ methName := lib.UppercaseFirstCharacter(msg.Method)
+ methSig, ok := signature.FirstMethod(sig, methName)
if !ok {
- w.Error(fmt.Errorf("method not found in signature: %v (full sig: %v)", methName, sig))
+ w.Error(fmt.Errorf("method %q not found in signature: %#v", methName, sig))
return
}
-
- var msg veyronRPC
- if len(methSig.InArgs) != len(tempMsg.InArgs) {
- w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, tempMsg))
+ if len(methSig.InArgs) != len(msg.InArgs) {
+ w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, *msg))
return
}
- msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
- td := wiretype_build.TypeDefs(sig.TypeDefs)
-
- for i := 0; i < len(tempMsg.InArgs); i++ {
- argTypeId := methSig.InArgs[i].Type
- argType := vom_wiretype.Type{
- ID: argTypeId,
- Defs: &td,
- }
-
- val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
- if err != nil {
- w.Error(fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err))
- return
- }
- msg.InArgs[i] = val
- }
-
- msg.Name = tempMsg.Name
- msg.Method = tempMsg.Method
- msg.NumOutArgs = tempMsg.NumOutArgs
- msg.IsStreaming = tempMsg.IsStreaming
-
- inStreamType := vom_wiretype.Type{
- ID: methSig.InStream,
- Defs: &td,
- }
// We have to make the start call synchronous so we can make sure that we populate
// the call map before we can Handle a recieve call.
- call, err := c.startCall(ctx, w, &msg)
+ call, err := c.startCall(ctx, w, msg)
if err != nil {
w.Error(verror2.Convert(verror2.Internal, ctx, err))
return
}
if stream != nil {
- stream.init(call, inStreamType)
+ stream.init(call)
}
- c.finishCall(ctx, w, call, &msg)
+ c.finishCall(ctx, w, call, msg)
c.Lock()
if request, ok := c.outstandingRequests[id]; ok {
delete(c.outstandingRequests, id)
@@ -437,7 +374,7 @@
// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
func (c *Controller) HandleVeyronRequest(ctx context.T, id int64, data string, w lib.ClientWriter) {
- veyronTempMsg, err := c.parseVeyronRequest(ctx, bytes.NewBufferString(data))
+ msg, err := c.parseVeyronRequest(data)
if err != nil {
w.Error(verror2.Convert(verror2.Internal, ctx, err))
return
@@ -449,16 +386,16 @@
// TODO(mattr): To be consistent with go, we should not ignore 0 timeouts.
// However as a rollout strategy we must, otherwise there is a circular
// dependency between the WSPR change and the JS change that will follow.
- if veyronTempMsg.Timeout == lib.JSIPCNoTimeout || veyronTempMsg.Timeout == 0 {
+ if msg.Timeout == lib.JSIPCNoTimeout || msg.Timeout == 0 {
cctx, cancel = ctx.WithCancel()
} else {
- cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(veyronTempMsg.Timeout))
+ cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(msg.Timeout))
}
request := &outstandingRequest{
cancel: cancel,
}
- if veyronTempMsg.IsStreaming {
+ if msg.IsStreaming {
// If this rpc is streaming, we would expect that the client would try to send
// on this stream. Since the initial handshake is done asynchronously, we have
// to put the outstanding stream in the map before we make the async call so that
@@ -468,7 +405,7 @@
}
c.Lock()
c.outstandingRequests[id] = request
- go c.sendVeyronRequest(cctx, id, veyronTempMsg, w, request.stream)
+ go c.sendVeyronRequest(cctx, id, msg, w, request.stream)
c.Unlock()
}
@@ -543,7 +480,7 @@
// HandleServeRequest takes a request to serve a server, creates a server,
// registers the provided services and sends true if everything succeeded.
func (c *Controller) HandleServeRequest(data string, w lib.ClientWriter) {
- // Decode the serve request which includes IDL, registered services and name
+ // Decode the serve request which includes VDL, registered services and name
var serveRequest serveRequest
if err := json.Unmarshal([]byte(data), &serveRequest); err != nil {
w.Error(verror2.Convert(verror2.Internal, nil, err))
@@ -673,30 +610,23 @@
server.HandleServerResponse(id, data)
}
-// parseVeyronRequest parses a json rpc request into a veyronRPC object.
-func (c *Controller) parseVeyronRequest(ctx context.T, r io.Reader) (*veyronTempRPC, error) {
- var tempMsg veyronTempRPC
- decoder := json.NewDecoder(r)
- if err := decoder.Decode(&tempMsg); err != nil {
- return nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
+// parseVeyronRequest parses a json rpc request into a VeyronRPC object.
+func (c *Controller) parseVeyronRequest(data string) (*VeyronRPC, error) {
+ var msg VeyronRPC
+ if err := lib.VomDecode(data, &msg); err != nil {
+ return nil, err
}
- c.logger.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", tempMsg.Name, tempMsg.Method, tempMsg.IsStreaming)
- return &tempMsg, nil
+ c.logger.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+ return &msg, nil
}
type signatureRequest struct {
Name string
}
-func (c *Controller) getSignature(ctx context.T, name string) (signature.JSONServiceSignature, error) {
- // Fetch and adapt signature from the SignatureManager
+func (c *Controller) getSignature(ctx context.T, name string) ([]signature.Interface, error) {
retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
- sig, err := c.signatureManager.Signature(ctx, name, c.client, retryTimeoutOpt)
- if err != nil {
- return nil, verror2.Convert(verror2.Internal, ctx, err)
- }
-
- return signature.NewJSONServiceSignature(*sig), nil
+ return c.signatureManager.Signature(ctx, name, c.client, retryTimeoutOpt)
}
// HandleSignatureRequest uses signature manager to get and cache signature of a remote server
@@ -709,26 +639,18 @@
}
c.logger.VI(2).Infof("requesting Signature for %q", request.Name)
- jsSig, err := c.getSignature(ctx, request.Name)
- if err != nil {
- w.Error(verror2.Convert(verror2.Internal, ctx, err))
- return
- }
-
- var buf bytes.Buffer
- encoder, err := vom2.NewBinaryEncoder(&buf)
+ sig, err := c.getSignature(ctx, request.Name)
if err != nil {
w.Error(err)
return
}
-
- if err := encoder.Encode(jsSig); err != nil {
+ vomSig, err := lib.VomEncode(sig)
+ if err != nil {
w.Error(err)
return
}
-
// Send the signature back
- if err := w.Send(lib.ResponseFinal, hex.EncodeToString(buf.Bytes())); err != nil {
+ if err := w.Send(lib.ResponseFinal, vomSig); err != nil {
w.Error(verror2.Convert(verror2.Internal, ctx, err))
return
}