wsprd: Vom encode the args/results of ipc messages to JS.
We can't yet vom encode the JS parts of the IPC layer because there is
nothing we can decode a vom2 stream into that can be encoded in vom1.
Change-Id: Ifb01d088eeaad8ceab4cd3570b991d44ad599419
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 9acd92b..71452f4 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -4,6 +4,7 @@
import (
"bytes"
+ "encoding/hex"
"encoding/json"
"flag"
"fmt"
@@ -22,6 +23,7 @@
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron2/vom"
vom_wiretype "veyron.io/veyron/veyron2/vom/wiretype"
+ "veyron.io/veyron/veyron2/vom2"
wiretype_build "veyron.io/veyron/veyron2/wiretype/build"
"veyron.io/wspr/veyron/services/wsprd/ipc/server"
"veyron.io/wspr/veyron/services/wsprd/lib"
@@ -188,7 +190,20 @@
w.Error(err) // Send streaming error as is
return
}
- if err := w.Send(lib.ResponseStream, item); err != nil {
+
+ var buf bytes.Buffer
+ encoder, err := vom2.NewBinaryEncoder(&buf)
+ if err != nil {
+ w.Error(verror2.Make(marshallingError, ctx, item))
+ continue
+ }
+
+ if err := encoder.Encode(item); err != nil {
+ w.Error(verror2.Make(marshallingError, ctx, item))
+ continue
+ }
+
+ if err := w.Send(lib.ResponseStream, hex.EncodeToString(buf.Bytes())); err != nil {
w.Error(verror2.Make(marshallingError, ctx, item))
}
}
@@ -221,7 +236,19 @@
return
}
- if err := w.Send(lib.ResponseFinal, results[0:len(results)-1]); err != nil {
+ var buf bytes.Buffer
+ encoder, err := vom2.NewBinaryEncoder(&buf)
+ if err != nil {
+ w.Error(err)
+ return
+ }
+
+ if err := encoder.Encode(results[:len(results)-1]); err != nil {
+ w.Error(err)
+ return
+ }
+
+ if err := w.Send(lib.ResponseFinal, hex.EncodeToString(buf.Bytes())); err != nil {
w.Error(verror2.Convert(marshallingError, ctx, err))
}
}
@@ -686,8 +713,20 @@
return
}
+ var buf bytes.Buffer
+ encoder, err := vom2.NewBinaryEncoder(&buf)
+ if err != nil {
+ w.Error(err)
+ return
+ }
+
+ if err := encoder.Encode(jsSig); err != nil {
+ w.Error(err)
+ return
+ }
+
// Send the signature back
- if err := w.Send(lib.ResponseFinal, jsSig); err != nil {
+ if err := w.Send(lib.ResponseFinal, hex.EncodeToString(buf.Bytes())); err != nil {
w.Error(verror2.Convert(verror2.Internal, ctx, err))
return
}
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index 61368b2..8c91e31 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -1,7 +1,9 @@
package app
import (
+ "bytes"
"encoding/base64"
+ "encoding/hex"
"encoding/json"
"fmt"
"reflect"
@@ -17,6 +19,7 @@
"veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vom"
vom_wiretype "veyron.io/veyron/veyron2/vom/wiretype"
+ "veyron.io/veyron/veyron2/vom2"
"veyron.io/veyron/veyron2/wiretype"
"veyron.io/wspr/veyron/services/wsprd/lib"
"veyron.io/wspr/veyron/services/wsprd/lib/testwriter"
@@ -267,6 +270,19 @@
}
}
+func vomEncode(i interface{}) string {
+ var buf bytes.Buffer
+ encoder, err := vom2.NewBinaryEncoder(&buf)
+ if err != nil {
+ panic(err)
+ }
+
+ if err := encoder.Encode(i); err != nil {
+ panic(err)
+ }
+ return hex.EncodeToString(buf.Bytes())
+
+}
func TestCallingGoServer(t *testing.T) {
runGoServerTestCase(t, goServerTestCase{
method: "Add",
@@ -274,7 +290,7 @@
numOutArgs: 2,
expectedStream: []testwriter.Response{
testwriter.Response{
- Message: []interface{}{5.0},
+ Message: vomEncode([]interface{}{int32(5)}),
Type: lib.ResponseFinal,
},
},
@@ -299,19 +315,19 @@
numOutArgs: 2,
expectedStream: []testwriter.Response{
testwriter.Response{
- Message: 1.0,
+ Message: vomEncode(int32(1)),
Type: lib.ResponseStream,
},
testwriter.Response{
- Message: 3.0,
+ Message: vomEncode(int32(3)),
Type: lib.ResponseStream,
},
testwriter.Response{
- Message: 6.0,
+ Message: vomEncode(int32(6)),
Type: lib.ResponseStream,
},
testwriter.Response{
- Message: 10.0,
+ Message: vomEncode(int32(10)),
Type: lib.ResponseStream,
},
testwriter.Response{
@@ -319,7 +335,7 @@
Type: lib.ResponseStreamClose,
},
testwriter.Response{
- Message: []interface{}{10.0},
+ Message: vomEncode([]interface{}{int32(10)}),
Type: lib.ResponseFinal,
},
},
@@ -726,7 +742,7 @@
"ServerId": 0.0,
"Method": lib.LowercaseFirstCharacter(test.method),
"Handle": 0.0,
- "Args": test.inArgs,
+ "Args": vomEncode(test.inArgs),
"Context": map[string]interface{}{
"Name": "adder",
"Suffix": "adder",
@@ -745,7 +761,7 @@
t.Errorf("didn't receive expected message: %v", err)
}
for _, msg := range test.clientStream {
- expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{Type: lib.ResponseStream, Message: msg})
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{Type: lib.ResponseStream, Message: vomEncode(msg)})
if err := call.Send(msg); err != nil {
t.Errorf("unexpected error while sending %v: %v", msg, err)
}
diff --git a/services/wsprd/ipc/server/server.go b/services/wsprd/ipc/server/server.go
index 70bbfdc..c9c5d07 100644
--- a/services/wsprd/ipc/server/server.go
+++ b/services/wsprd/ipc/server/server.go
@@ -3,6 +3,8 @@
package server
import (
+ "bytes"
+ "encoding/hex"
"encoding/json"
"sync"
"time"
@@ -16,6 +18,7 @@
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vom2"
)
type Flow struct {
@@ -28,7 +31,7 @@
ServerId uint64
Handle int64
Method string
- Args []interface{}
+ Args string
Context serverRPCRequestContext
}
@@ -157,23 +160,36 @@
RemoteBlessingStrings: call.RemoteBlessings().ForContext(call),
}
- // Send a invocation request to JavaScript
- message := serverRPCRequest{
- ServerId: s.id,
- Handle: handle,
- Method: lib.LowercaseFirstCharacter(methodName),
- Args: args,
- Context: context,
- }
-
- if err := flow.Writer.Send(lib.ResponseServerRequest, message); err != nil {
- // Error in marshaling, pass the error through the channel immediately
+ errHandler := func(err error) <-chan *serverRPCReply {
if ch := s.popServerRequest(flow.ID); ch != nil {
stdErr := verror2.Convert(verror2.Internal, call, err).(verror2.Standard)
ch <- &serverRPCReply{nil, &stdErr}
s.helper.CleanupFlow(flow.ID)
}
return replyChan
+
+ }
+ var buf bytes.Buffer
+ encoder, err := vom2.NewBinaryEncoder(&buf)
+ if err != nil {
+ return errHandler(err)
+ }
+
+ if err := encoder.Encode(args); err != nil {
+ return errHandler(err)
+ }
+
+ // Send a invocation request to JavaScript
+ message := serverRPCRequest{
+ ServerId: s.id,
+ Handle: handle,
+ Method: lib.LowercaseFirstCharacter(methodName),
+ Args: hex.EncodeToString(buf.Bytes()),
+ Context: context,
+ }
+
+ if err := flow.Writer.Send(lib.ResponseServerRequest, message); err != nil {
+ return errHandler(err)
}
s.helper.GetLogger().VI(3).Infof("request received to call method %q on "+
@@ -205,7 +221,19 @@
func proxyStream(stream ipc.Stream, w lib.ClientWriter, logger vlog.Logger) {
var item interface{}
for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
- if err := w.Send(lib.ResponseStream, item); err != nil {
+ var buf bytes.Buffer
+ encoder, err := vom2.NewBinaryEncoder(&buf)
+ if err != nil {
+ w.Error(verror2.Convert(verror2.Internal, nil, err))
+ return
+ }
+
+ if err := encoder.Encode(item); err != nil {
+ w.Error(verror2.Convert(verror2.Internal, nil, err))
+ return
+ }
+
+ if err := w.Send(lib.ResponseStream, hex.EncodeToString(buf.Bytes())); err != nil {
w.Error(verror2.Convert(verror2.Internal, nil, err))
return
}