WSPR VOM2 + Signature

All tests pass.

MultiPart: 3/4
Change-Id: I34647d2df9e876de084d20eaf09ebb14cf3bd1f3
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 3322afb..44c58ff 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -3,8 +3,6 @@
 package app
 
 import (
-	"bytes"
-	"encoding/hex"
 	"encoding/json"
 	"flag"
 	"fmt"
@@ -19,17 +17,13 @@
 	"veyron.io/veyron/veyron2/options"
 	"veyron.io/veyron/veyron2/rt"
 	"veyron.io/veyron/veyron2/security"
+	"veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
 	"veyron.io/veyron/veyron2/verror2"
 	"veyron.io/veyron/veyron2/vlog"
-	"veyron.io/veyron/veyron2/vom"
-	vom_wiretype "veyron.io/veyron/veyron2/vom/wiretype"
-	"veyron.io/veyron/veyron2/vom2"
-	wiretype_build "veyron.io/veyron/veyron2/wiretype/build"
 	"veyron.io/wspr/veyron/services/wsprd/ipc/server"
 	"veyron.io/wspr/veyron/services/wsprd/lib"
 	"veyron.io/wspr/veyron/services/wsprd/namespace"
 	"veyron.io/wspr/veyron/services/wsprd/principal"
-	"veyron.io/wspr/veyron/services/wsprd/signature"
 )
 
 // pkgPath is the prefix os errors in this package.
@@ -39,7 +33,6 @@
 var (
 	marshallingError       = verror2.Register(pkgPath+".marshallingError", verror2.NoRetry, "{1} {2} marshalling error {_}")
 	noResults              = verror2.Register(pkgPath+".noResults", verror2.NoRetry, "{1} {2} no results from call {_}")
-	signatureError         = verror2.Register(pkgPath+".signatureError", verror2.NoRetry, "{1} {2} signature error {_}")
 	badCaveatType          = verror2.Register(pkgPath+".badCaveatType", verror2.NoRetry, "{1} {2} bad caveat type {_}")
 	unknownBlessings       = verror2.Register(pkgPath+".unknownBlessings", verror2.NoRetry, "{1} {2} unknown public id {_}")
 	invalidBlessingsHandle = verror2.Register(pkgPath+".invalidBlessingsHandle", verror2.NoRetry, "{1} {2} invalid blessings handle {_}")
@@ -57,22 +50,13 @@
 	retryTimeout = flag.Int("retry-timeout", 2, "Duration in seconds to retry starting an RPC call. 0 means never retry.")
 }
 
-// Temporary holder of RPC so that we can store the unprocessed args.
-type veyronTempRPC struct {
-	Name        string
-	Method      string
-	InArgs      []json.RawMessage
-	NumOutArgs  int32
-	IsStreaming bool
-	Timeout     int64
-}
-
-type veyronRPC struct {
+type VeyronRPC struct {
 	Name        string
 	Method      string
 	InArgs      []interface{}
 	NumOutArgs  int32
 	IsStreaming bool
+	Timeout     int64
 }
 
 type serveRequest struct {
@@ -182,7 +166,7 @@
 }
 
 // finishCall waits for the call to finish and write out the response to w.
-func (c *Controller) finishCall(ctx context.T, w lib.ClientWriter, clientCall ipc.Call, msg *veyronRPC) {
+func (c *Controller) finishCall(ctx context.T, w lib.ClientWriter, clientCall ipc.Call, msg *VeyronRPC) {
 	if msg.IsStreaming {
 		for {
 			var item interface{}
@@ -193,24 +177,15 @@
 				w.Error(err) // Send streaming error as is
 				return
 			}
-
-			var buf bytes.Buffer
-			encoder, err := vom2.NewBinaryEncoder(&buf)
+			vomItem, err := lib.VomEncode(item)
 			if err != nil {
-				w.Error(verror2.Make(marshallingError, ctx, item))
+				w.Error(verror2.Make(marshallingError, ctx, item, err))
 				continue
 			}
-
-			if err := encoder.Encode(item); err != nil {
-				w.Error(verror2.Make(marshallingError, ctx, item))
-				continue
-			}
-
-			if err := w.Send(lib.ResponseStream, hex.EncodeToString(buf.Bytes())); err != nil {
+			if err := w.Send(lib.ResponseStream, vomItem); err != nil {
 				w.Error(verror2.Make(marshallingError, ctx, item))
 			}
 		}
-
 		if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
 			w.Error(verror2.Make(marshallingError, ctx, "ResponseStreamClose"))
 		}
@@ -227,36 +202,29 @@
 		w.Error(err)
 		return
 	}
+
 	// for now we assume last out argument is always error
 	if len(results) < 1 {
 		w.Error(verror2.Make(noResults, ctx))
 		return
 	}
-
 	if err, ok := results[len(results)-1].(error); ok {
 		// return the call Application error as is
 		w.Error(err)
 		return
 	}
 
-	var buf bytes.Buffer
-	encoder, err := vom2.NewBinaryEncoder(&buf)
+	vomResults, err := lib.VomEncode(results[:len(results)-1])
 	if err != nil {
 		w.Error(err)
 		return
 	}
-
-	if err := encoder.Encode(results[:len(results)-1]); err != nil {
-		w.Error(err)
-		return
-	}
-
-	if err := w.Send(lib.ResponseFinal, hex.EncodeToString(buf.Bytes())); err != nil {
+	if err := w.Send(lib.ResponseFinal, vomResults); err != nil {
 		w.Error(verror2.Convert(marshallingError, ctx, err))
 	}
 }
 
-func (c *Controller) startCall(ctx context.T, w lib.ClientWriter, msg *veyronRPC) (ipc.Call, error) {
+func (c *Controller) startCall(ctx context.T, w lib.ClientWriter, msg *VeyronRPC) (ipc.Call, error) {
 	if c.client == nil {
 		return nil, verror2.Make(verror2.BadArg, ctx, "app.Controller.client")
 	}
@@ -281,7 +249,7 @@
 	c.lastGeneratedId += 2
 	c.flowMap[id] = s
 	os := newStream()
-	os.init(stream, vom_wiretype.Type{ID: 1})
+	os.init(stream)
 	c.outstandingRequests[id] = &outstandingRequest{
 		stream: os,
 	}
@@ -364,67 +332,36 @@
 
 // SendVeyronRequest makes a veyron request for the given flowId.  If signal is non-nil, it will receive
 // the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx context.T, id int64, tempMsg *veyronTempRPC, w lib.ClientWriter, stream *outstandingStream) {
-	// Fetch and adapt signature from the SignatureManager
-	retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
-	sig, err := c.signatureManager.Signature(ctx, tempMsg.Name, c.client, retryTimeoutOpt)
+func (c *Controller) sendVeyronRequest(ctx context.T, id int64, msg *VeyronRPC, w lib.ClientWriter, stream *outstandingStream) {
+	sig, err := c.getSignature(ctx, msg.Name)
 	if err != nil {
-		w.Error(verror2.Make(signatureError, ctx, tempMsg.Name, err))
+		w.Error(err)
 		return
 	}
-	methName := lib.UppercaseFirstCharacter(tempMsg.Method)
-	methSig, ok := sig.Methods[methName]
+	methName := lib.UppercaseFirstCharacter(msg.Method)
+	methSig, ok := signature.FirstMethod(sig, methName)
 	if !ok {
-		w.Error(fmt.Errorf("method not found in signature: %v (full sig: %v)", methName, sig))
+		w.Error(fmt.Errorf("method %q not found in signature: %#v", methName, sig))
 		return
 	}
-
-	var msg veyronRPC
-	if len(methSig.InArgs) != len(tempMsg.InArgs) {
-		w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, tempMsg))
+	if len(methSig.InArgs) != len(msg.InArgs) {
+		w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, *msg))
 		return
 	}
-	msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
-	td := wiretype_build.TypeDefs(sig.TypeDefs)
-
-	for i := 0; i < len(tempMsg.InArgs); i++ {
-		argTypeId := methSig.InArgs[i].Type
-		argType := vom_wiretype.Type{
-			ID:   argTypeId,
-			Defs: &td,
-		}
-
-		val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
-		if err != nil {
-			w.Error(fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err))
-			return
-		}
-		msg.InArgs[i] = val
-	}
-
-	msg.Name = tempMsg.Name
-	msg.Method = tempMsg.Method
-	msg.NumOutArgs = tempMsg.NumOutArgs
-	msg.IsStreaming = tempMsg.IsStreaming
-
-	inStreamType := vom_wiretype.Type{
-		ID:   methSig.InStream,
-		Defs: &td,
-	}
 
 	// We have to make the start call synchronous so we can make sure that we populate
 	// the call map before we can Handle a recieve call.
-	call, err := c.startCall(ctx, w, &msg)
+	call, err := c.startCall(ctx, w, msg)
 	if err != nil {
 		w.Error(verror2.Convert(verror2.Internal, ctx, err))
 		return
 	}
 
 	if stream != nil {
-		stream.init(call, inStreamType)
+		stream.init(call)
 	}
 
-	c.finishCall(ctx, w, call, &msg)
+	c.finishCall(ctx, w, call, msg)
 	c.Lock()
 	if request, ok := c.outstandingRequests[id]; ok {
 		delete(c.outstandingRequests, id)
@@ -437,7 +374,7 @@
 
 // HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
 func (c *Controller) HandleVeyronRequest(ctx context.T, id int64, data string, w lib.ClientWriter) {
-	veyronTempMsg, err := c.parseVeyronRequest(ctx, bytes.NewBufferString(data))
+	msg, err := c.parseVeyronRequest(data)
 	if err != nil {
 		w.Error(verror2.Convert(verror2.Internal, ctx, err))
 		return
@@ -449,16 +386,16 @@
 	// TODO(mattr): To be consistent with go, we should not ignore 0 timeouts.
 	// However as a rollout strategy we must, otherwise there is a circular
 	// dependency between the WSPR change and the JS change that will follow.
-	if veyronTempMsg.Timeout == lib.JSIPCNoTimeout || veyronTempMsg.Timeout == 0 {
+	if msg.Timeout == lib.JSIPCNoTimeout || msg.Timeout == 0 {
 		cctx, cancel = ctx.WithCancel()
 	} else {
-		cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(veyronTempMsg.Timeout))
+		cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(msg.Timeout))
 	}
 
 	request := &outstandingRequest{
 		cancel: cancel,
 	}
-	if veyronTempMsg.IsStreaming {
+	if msg.IsStreaming {
 		// If this rpc is streaming, we would expect that the client would try to send
 		// on this stream.  Since the initial handshake is done asynchronously, we have
 		// to put the outstanding stream in the map before we make the async call so that
@@ -468,7 +405,7 @@
 	}
 	c.Lock()
 	c.outstandingRequests[id] = request
-	go c.sendVeyronRequest(cctx, id, veyronTempMsg, w, request.stream)
+	go c.sendVeyronRequest(cctx, id, msg, w, request.stream)
 	c.Unlock()
 }
 
@@ -543,7 +480,7 @@
 // HandleServeRequest takes a request to serve a server, creates a server,
 // registers the provided services and sends true if everything succeeded.
 func (c *Controller) HandleServeRequest(data string, w lib.ClientWriter) {
-	// Decode the serve request which includes IDL, registered services and name
+	// Decode the serve request which includes VDL, registered services and name
 	var serveRequest serveRequest
 	if err := json.Unmarshal([]byte(data), &serveRequest); err != nil {
 		w.Error(verror2.Convert(verror2.Internal, nil, err))
@@ -673,30 +610,23 @@
 	server.HandleServerResponse(id, data)
 }
 
-// parseVeyronRequest parses a json rpc request into a veyronRPC object.
-func (c *Controller) parseVeyronRequest(ctx context.T, r io.Reader) (*veyronTempRPC, error) {
-	var tempMsg veyronTempRPC
-	decoder := json.NewDecoder(r)
-	if err := decoder.Decode(&tempMsg); err != nil {
-		return nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
+// parseVeyronRequest parses a json rpc request into a VeyronRPC object.
+func (c *Controller) parseVeyronRequest(data string) (*VeyronRPC, error) {
+	var msg VeyronRPC
+	if err := lib.VomDecode(data, &msg); err != nil {
+		return nil, err
 	}
-	c.logger.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", tempMsg.Name, tempMsg.Method, tempMsg.IsStreaming)
-	return &tempMsg, nil
+	c.logger.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+	return &msg, nil
 }
 
 type signatureRequest struct {
 	Name string
 }
 
-func (c *Controller) getSignature(ctx context.T, name string) (signature.JSONServiceSignature, error) {
-	// Fetch and adapt signature from the SignatureManager
+func (c *Controller) getSignature(ctx context.T, name string) ([]signature.Interface, error) {
 	retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
-	sig, err := c.signatureManager.Signature(ctx, name, c.client, retryTimeoutOpt)
-	if err != nil {
-		return nil, verror2.Convert(verror2.Internal, ctx, err)
-	}
-
-	return signature.NewJSONServiceSignature(*sig), nil
+	return c.signatureManager.Signature(ctx, name, c.client, retryTimeoutOpt)
 }
 
 // HandleSignatureRequest uses signature manager to get and cache signature of a remote server
@@ -709,26 +639,18 @@
 	}
 
 	c.logger.VI(2).Infof("requesting Signature for %q", request.Name)
-	jsSig, err := c.getSignature(ctx, request.Name)
-	if err != nil {
-		w.Error(verror2.Convert(verror2.Internal, ctx, err))
-		return
-	}
-
-	var buf bytes.Buffer
-	encoder, err := vom2.NewBinaryEncoder(&buf)
+	sig, err := c.getSignature(ctx, request.Name)
 	if err != nil {
 		w.Error(err)
 		return
 	}
-
-	if err := encoder.Encode(jsSig); err != nil {
+	vomSig, err := lib.VomEncode(sig)
+	if err != nil {
 		w.Error(err)
 		return
 	}
-
 	// Send the signature back
-	if err := w.Send(lib.ResponseFinal, hex.EncodeToString(buf.Bytes())); err != nil {
+	if err := w.Send(lib.ResponseFinal, vomSig); err != nil {
 		w.Error(verror2.Convert(verror2.Internal, ctx, err))
 		return
 	}