veyron/services/wspr/wsprd: Transcode stream messages

Add transcoding for the stream messages coming from the javascript
client to the go server.

Change-Id: If839aa26e6b9793fad211a77af43bd1204eccc2e
diff --git a/services/wspr/wsprd/lib/wspr.go b/services/wspr/wsprd/lib/wspr.go
index 6fc0764..f0533ea 100644
--- a/services/wspr/wsprd/lib/wspr.go
+++ b/services/wspr/wsprd/lib/wspr.go
@@ -328,6 +328,11 @@
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 }
 
+type outstandingStream struct {
+	stream sender
+	inType vom.Type
+}
+
 type websocketPipe struct {
 	// Protects outstandingStreams and outstandingServerRequests.
 	sync.Mutex
@@ -339,7 +344,7 @@
 	lastGeneratedId int64
 
 	// Streams for the outstanding requests.
-	outstandingStreams map[int64]sender
+	outstandingStreams map[int64]outstandingStream
 
 	// Maps flowids to the server that owns them.
 	flowMap map[int64]*server
@@ -363,7 +368,7 @@
 	id := wsp.lastGeneratedId
 	wsp.lastGeneratedId += 2
 	wsp.flowMap[id] = server
-	wsp.outstandingStreams[id] = stream
+	wsp.outstandingStreams[id] = outstandingStream{stream: stream}
 	return &flow{id: id, writer: wsp.writerCreator(id)}
 }
 
@@ -387,7 +392,7 @@
 	wsp.Lock()
 	defer wsp.Unlock()
 	for _, stream := range wsp.outstandingStreams {
-		if call, ok := stream.(ipc.Call); ok {
+		if call, ok := stream.stream.(ipc.Call); ok {
 			call.Cancel()
 		}
 	}
@@ -400,7 +405,7 @@
 func (wsp *websocketPipe) setup() {
 	wsp.ctx.logger.Info("identity is ", wsp.ctx.rt.Identity())
 	wsp.signatureManager = newSignatureManager()
-	wsp.outstandingStreams = make(map[int64]sender)
+	wsp.outstandingStreams = make(map[int64]outstandingStream)
 	wsp.flowMap = make(map[int64]*server)
 	wsp.servers = make(map[uint64]*server)
 
@@ -472,7 +477,7 @@
 func (wsp *websocketPipe) sendParsedMessageOnStream(id int64, msg interface{}, w clientWriter) {
 	wsp.Lock()
 	defer wsp.Unlock()
-	stream := wsp.outstandingStreams[id]
+	stream := wsp.outstandingStreams[id].stream
 	if stream == nil {
 		w.sendError(fmt.Errorf("unknown stream"))
 	}
@@ -483,10 +488,16 @@
 
 // sendOnStream writes data on id's stream.  Returns an error if the send failed.
 func (wsp *websocketPipe) sendOnStream(id int64, data string, w clientWriter) {
-	decoder := json.NewDecoder(bytes.NewBufferString(data))
-	var payload interface{}
-	if err := decoder.Decode(&payload); err != nil {
-		w.sendError(fmt.Errorf("can't unmarshal JSONMessage: %v", err))
+	wsp.Lock()
+	typ := wsp.outstandingStreams[id].inType
+	wsp.Unlock()
+	if typ == nil {
+		vlog.Errorf("no inType for stream %d (%q)", id, data)
+		return
+	}
+	payload, err := vom.JSONToObject(data, typ)
+	if err != nil {
+		vlog.Errorf("error while converting json to InStreamType (%s): %v", data, err)
 		return
 	}
 	wsp.sendParsedMessageOnStream(id, payload, w)
@@ -515,7 +526,7 @@
 
 // handleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
 func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w *websocketWriter) {
-	veyronMsg, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
+	veyronMsg, inStreamType, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
 	if err != nil {
 		w.sendError(verror.Internalf("can't parse Veyron Request: %v", err))
 		return
@@ -531,7 +542,10 @@
 	var signal chan ipc.Stream
 	if veyronMsg.IsStreaming {
 		signal = make(chan ipc.Stream)
-		wsp.outstandingStreams[id] = startQueueingStream(signal)
+		wsp.outstandingStreams[id] = outstandingStream{
+			stream: startQueueingStream(signal),
+			inType: inStreamType,
+		}
 	}
 	go wsp.sendVeyronRequest(id, veyronMsg, w, signal)
 }
@@ -539,7 +553,7 @@
 func (wsp *websocketPipe) closeStream(id int64) {
 	wsp.Lock()
 	defer wsp.Unlock()
-	stream := wsp.outstandingStreams[id]
+	stream := wsp.outstandingStreams[id].stream
 	if stream == nil {
 		wsp.ctx.logger.VI(0).Infof("close called on non-existent call: %v", id)
 		return
@@ -687,34 +701,34 @@
 }
 
 // parseVeyronRequest parses a json rpc request into a veyronRPC object.
-func (wsp *websocketPipe) parseVeyronRequest(r io.Reader) (*veyronRPC, error) {
+func (wsp *websocketPipe) parseVeyronRequest(r io.Reader) (*veyronRPC, vom.Type, error) {
 	var tempMsg veyronTempRPC
 	decoder := json.NewDecoder(r)
 	if err := decoder.Decode(&tempMsg); err != nil {
-		return nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
+		return nil, nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
 	}
 
 	client, err := wsp.ctx.newClient(tempMsg.PrivateId)
 	if err != nil {
-		return nil, verror.Internalf("error creating client: %v", err)
+		return nil, nil, verror.Internalf("error creating client: %v", err)
 	}
 
 	// Fetch and adapt signature from the SignatureManager
 	ctx := wsp.ctx.rt.TODOContext()
 	sig, err := wsp.signatureManager.signature(ctx, tempMsg.Name, client)
 	if err != nil {
-		return nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
+		return nil, nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
 	}
 
 	methName := uppercaseFirstCharacter(tempMsg.Method)
 	methSig, ok := sig.Methods[methName]
 	if !ok {
-		return nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
+		return nil, nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
 	}
 
 	var msg veyronRPC
 	if len(methSig.InArgs) != len(tempMsg.InArgs) {
-		return nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg)
+		return nil, nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg)
 	}
 	msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
 	td := wiretype_build.TypeDefs(sig.TypeDefs)
@@ -728,7 +742,7 @@
 
 		val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
 		if err != nil {
-			return nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err)
+			return nil, nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err)
 		}
 		msg.InArgs[i] = val
 	}
@@ -739,8 +753,13 @@
 	msg.NumOutArgs = tempMsg.NumOutArgs
 	msg.IsStreaming = tempMsg.IsStreaming
 
+	inStreamType := vom_wiretype.Type{
+		ID:   methSig.InStream,
+		Defs: &td,
+	}
+
 	wsp.ctx.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", msg.Name, msg.Method, len(msg.PrivateId) > 0, msg.IsStreaming)
-	return &msg, nil
+	return &msg, inStreamType, nil
 }
 
 type signatureRequest struct {
diff --git a/services/wspr/wsprd/lib/wspr_test.go b/services/wspr/wsprd/lib/wspr_test.go
index 171b39a..fe2e9c9 100644
--- a/services/wspr/wsprd/lib/wspr_test.go
+++ b/services/wspr/wsprd/lib/wspr_test.go
@@ -15,6 +15,8 @@
 	"veyron2/vdl"
 	"veyron2/verror"
 	"veyron2/vlog"
+	"veyron2/vom"
+	vom_wiretype "veyron2/vom/wiretype"
 	"veyron2/wiretype"
 
 	"veyron/runtimes/google/ipc/stream/proxy"
@@ -264,12 +266,13 @@
 }
 
 type goServerTestCase struct {
-	method          string
-	inArgs          []interface{}
-	numOutArgs      int32
-	streamingInputs []string
-	expectedStream  []response
-	expectedError   error
+	method             string
+	inArgs             []interface{}
+	numOutArgs         int32
+	streamingInputs    []string
+	streamingInputType vom.Type
+	expectedStream     []response
+	expectedError      error
 }
 
 func runGoServerTestCase(t *testing.T, test goServerTestCase) {
@@ -292,7 +295,10 @@
 	var signal chan ipc.Stream
 	if len(test.streamingInputs) > 0 {
 		signal = make(chan ipc.Stream, 1)
-		wsp.outstandingStreams[0] = startQueueingStream(signal)
+		wsp.outstandingStreams[0] = outstandingStream{
+			stream: startQueueingStream(signal),
+			inType: test.streamingInputType,
+		}
 		go func() {
 			for _, value := range test.streamingInputs {
 				wsp.sendOnStream(0, value, &writer)
@@ -337,10 +343,11 @@
 
 func TestCallingGoWithStreaming(t *testing.T) {
 	runGoServerTestCase(t, goServerTestCase{
-		method:          "StreamingAdd",
-		inArgs:          []interface{}{},
-		streamingInputs: []string{"1", "2", "3", "4"},
-		numOutArgs:      2,
+		method:             "StreamingAdd",
+		inArgs:             []interface{}{},
+		streamingInputs:    []string{"1", "2", "3", "4"},
+		streamingInputType: vom_wiretype.Type{ID: 36},
+		numOutArgs:         2,
 		expectedStream: []response{
 			response{
 				Message: 1.0,