veyron/services/wspr/wsprd: Transcode stream messages
Add transcoding for the stream messages coming from the javascript
client to the go server.
Change-Id: If839aa26e6b9793fad211a77af43bd1204eccc2e
diff --git a/services/wspr/wsprd/lib/wspr.go b/services/wspr/wsprd/lib/wspr.go
index 6fc0764..f0533ea 100644
--- a/services/wspr/wsprd/lib/wspr.go
+++ b/services/wspr/wsprd/lib/wspr.go
@@ -328,6 +328,11 @@
w.Header().Set("Access-Control-Allow-Origin", "*")
}
+type outstandingStream struct {
+ stream sender
+ inType vom.Type
+}
+
type websocketPipe struct {
// Protects outstandingStreams and outstandingServerRequests.
sync.Mutex
@@ -339,7 +344,7 @@
lastGeneratedId int64
// Streams for the outstanding requests.
- outstandingStreams map[int64]sender
+ outstandingStreams map[int64]outstandingStream
// Maps flowids to the server that owns them.
flowMap map[int64]*server
@@ -363,7 +368,7 @@
id := wsp.lastGeneratedId
wsp.lastGeneratedId += 2
wsp.flowMap[id] = server
- wsp.outstandingStreams[id] = stream
+ wsp.outstandingStreams[id] = outstandingStream{stream: stream}
return &flow{id: id, writer: wsp.writerCreator(id)}
}
@@ -387,7 +392,7 @@
wsp.Lock()
defer wsp.Unlock()
for _, stream := range wsp.outstandingStreams {
- if call, ok := stream.(ipc.Call); ok {
+ if call, ok := stream.stream.(ipc.Call); ok {
call.Cancel()
}
}
@@ -400,7 +405,7 @@
func (wsp *websocketPipe) setup() {
wsp.ctx.logger.Info("identity is ", wsp.ctx.rt.Identity())
wsp.signatureManager = newSignatureManager()
- wsp.outstandingStreams = make(map[int64]sender)
+ wsp.outstandingStreams = make(map[int64]outstandingStream)
wsp.flowMap = make(map[int64]*server)
wsp.servers = make(map[uint64]*server)
@@ -472,7 +477,7 @@
func (wsp *websocketPipe) sendParsedMessageOnStream(id int64, msg interface{}, w clientWriter) {
wsp.Lock()
defer wsp.Unlock()
- stream := wsp.outstandingStreams[id]
+ stream := wsp.outstandingStreams[id].stream
if stream == nil {
w.sendError(fmt.Errorf("unknown stream"))
}
@@ -483,10 +488,16 @@
// sendOnStream writes data on id's stream. Returns an error if the send failed.
func (wsp *websocketPipe) sendOnStream(id int64, data string, w clientWriter) {
- decoder := json.NewDecoder(bytes.NewBufferString(data))
- var payload interface{}
- if err := decoder.Decode(&payload); err != nil {
- w.sendError(fmt.Errorf("can't unmarshal JSONMessage: %v", err))
+ wsp.Lock()
+ typ := wsp.outstandingStreams[id].inType
+ wsp.Unlock()
+ if typ == nil {
+ vlog.Errorf("no inType for stream %d (%q)", id, data)
+ return
+ }
+ payload, err := vom.JSONToObject(data, typ)
+ if err != nil {
+ vlog.Errorf("error while converting json to InStreamType (%s): %v", data, err)
return
}
wsp.sendParsedMessageOnStream(id, payload, w)
@@ -515,7 +526,7 @@
// handleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w *websocketWriter) {
- veyronMsg, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
+ veyronMsg, inStreamType, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
if err != nil {
w.sendError(verror.Internalf("can't parse Veyron Request: %v", err))
return
@@ -531,7 +542,10 @@
var signal chan ipc.Stream
if veyronMsg.IsStreaming {
signal = make(chan ipc.Stream)
- wsp.outstandingStreams[id] = startQueueingStream(signal)
+ wsp.outstandingStreams[id] = outstandingStream{
+ stream: startQueueingStream(signal),
+ inType: inStreamType,
+ }
}
go wsp.sendVeyronRequest(id, veyronMsg, w, signal)
}
@@ -539,7 +553,7 @@
func (wsp *websocketPipe) closeStream(id int64) {
wsp.Lock()
defer wsp.Unlock()
- stream := wsp.outstandingStreams[id]
+ stream := wsp.outstandingStreams[id].stream
if stream == nil {
wsp.ctx.logger.VI(0).Infof("close called on non-existent call: %v", id)
return
@@ -687,34 +701,34 @@
}
// parseVeyronRequest parses a json rpc request into a veyronRPC object.
-func (wsp *websocketPipe) parseVeyronRequest(r io.Reader) (*veyronRPC, error) {
+func (wsp *websocketPipe) parseVeyronRequest(r io.Reader) (*veyronRPC, vom.Type, error) {
var tempMsg veyronTempRPC
decoder := json.NewDecoder(r)
if err := decoder.Decode(&tempMsg); err != nil {
- return nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
+ return nil, nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
}
client, err := wsp.ctx.newClient(tempMsg.PrivateId)
if err != nil {
- return nil, verror.Internalf("error creating client: %v", err)
+ return nil, nil, verror.Internalf("error creating client: %v", err)
}
// Fetch and adapt signature from the SignatureManager
ctx := wsp.ctx.rt.TODOContext()
sig, err := wsp.signatureManager.signature(ctx, tempMsg.Name, client)
if err != nil {
- return nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
+ return nil, nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
}
methName := uppercaseFirstCharacter(tempMsg.Method)
methSig, ok := sig.Methods[methName]
if !ok {
- return nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
+ return nil, nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
}
var msg veyronRPC
if len(methSig.InArgs) != len(tempMsg.InArgs) {
- return nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg)
+ return nil, nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg)
}
msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
td := wiretype_build.TypeDefs(sig.TypeDefs)
@@ -728,7 +742,7 @@
val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
if err != nil {
- return nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err)
+ return nil, nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err)
}
msg.InArgs[i] = val
}
@@ -739,8 +753,13 @@
msg.NumOutArgs = tempMsg.NumOutArgs
msg.IsStreaming = tempMsg.IsStreaming
+ inStreamType := vom_wiretype.Type{
+ ID: methSig.InStream,
+ Defs: &td,
+ }
+
wsp.ctx.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", msg.Name, msg.Method, len(msg.PrivateId) > 0, msg.IsStreaming)
- return &msg, nil
+ return &msg, inStreamType, nil
}
type signatureRequest struct {
diff --git a/services/wspr/wsprd/lib/wspr_test.go b/services/wspr/wsprd/lib/wspr_test.go
index 171b39a..fe2e9c9 100644
--- a/services/wspr/wsprd/lib/wspr_test.go
+++ b/services/wspr/wsprd/lib/wspr_test.go
@@ -15,6 +15,8 @@
"veyron2/vdl"
"veyron2/verror"
"veyron2/vlog"
+ "veyron2/vom"
+ vom_wiretype "veyron2/vom/wiretype"
"veyron2/wiretype"
"veyron/runtimes/google/ipc/stream/proxy"
@@ -264,12 +266,13 @@
}
type goServerTestCase struct {
- method string
- inArgs []interface{}
- numOutArgs int32
- streamingInputs []string
- expectedStream []response
- expectedError error
+ method string
+ inArgs []interface{}
+ numOutArgs int32
+ streamingInputs []string
+ streamingInputType vom.Type
+ expectedStream []response
+ expectedError error
}
func runGoServerTestCase(t *testing.T, test goServerTestCase) {
@@ -292,7 +295,10 @@
var signal chan ipc.Stream
if len(test.streamingInputs) > 0 {
signal = make(chan ipc.Stream, 1)
- wsp.outstandingStreams[0] = startQueueingStream(signal)
+ wsp.outstandingStreams[0] = outstandingStream{
+ stream: startQueueingStream(signal),
+ inType: test.streamingInputType,
+ }
go func() {
for _, value := range test.streamingInputs {
wsp.sendOnStream(0, value, &writer)
@@ -337,10 +343,11 @@
func TestCallingGoWithStreaming(t *testing.T) {
runGoServerTestCase(t, goServerTestCase{
- method: "StreamingAdd",
- inArgs: []interface{}{},
- streamingInputs: []string{"1", "2", "3", "4"},
- numOutArgs: 2,
+ method: "StreamingAdd",
+ inArgs: []interface{}{},
+ streamingInputs: []string{"1", "2", "3", "4"},
+ streamingInputType: vom_wiretype.Type{ID: 36},
+ numOutArgs: 2,
expectedStream: []response{
response{
Message: 1.0,