Share type stream for streaming messages.
Change-Id: Ia5e6ea58fdd02173fdb6698c6afb69b3f22c1fb0
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
index f456192..5eb77d1 100644
--- a/services/wspr/internal/app/app.go
+++ b/services/wspr/internal/app/app.go
@@ -166,7 +166,7 @@
if blessings, ok := item.(security.Blessings); ok {
item = principal.ConvertBlessingsToHandle(blessings, c.blessingsCache.GetOrAddHandle(blessings))
}
- vomItem, err := lib.HexVomEncode(item)
+ vomItem, err := lib.HexVomEncode(item, c.typeEncoder)
if err != nil {
w.Error(verror.New(marshallingError, ctx, item, err))
continue
@@ -284,7 +284,7 @@
id := c.lastGeneratedId
c.lastGeneratedId += 2
c.flowMap[id] = s
- os := newStream(c.blessingsCache)
+ os := newStream(c.blessingsCache, c.typeDecoder)
os.init(stream)
c.outstandingRequests[id] = &outstandingRequest{
stream: os,
@@ -430,10 +430,11 @@
// but currently none of the methods the controller exports require
// any of this context information.
type localCall struct {
- ctx *context.T
- vrpc *RpcRequest
- tags []*vdl.Value
- w lib.ClientWriter
+ ctx *context.T
+ vrpc *RpcRequest
+ tags []*vdl.Value
+ w lib.ClientWriter
+ typeEncoder *vom.TypeEncoder
}
var (
@@ -442,7 +443,7 @@
)
func (l *localCall) Send(item interface{}) error {
- vomItem, err := lib.HexVomEncode(item)
+ vomItem, err := lib.HexVomEncode(item, l.typeEncoder)
if err != nil {
err = verror.New(marshallingError, l.ctx, item, err)
l.w.Error(err)
@@ -483,7 +484,7 @@
return
}
}
- results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w}, msg.Method, argptrs)
+ results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w, c.typeEncoder}, msg.Method, argptrs)
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
return
@@ -579,7 +580,7 @@
// to put the outstanding stream in the map before we make the async call so that
// the future send know which queue to write to, even if the client call isn't
// actually ready yet.
- request.stream = newStream(c.blessingsCache)
+ request.stream = newStream(c.blessingsCache, c.typeDecoder)
}
c.Lock()
c.outstandingRequests[id] = request
@@ -730,16 +731,6 @@
server.HandleServerResponse(id, data)
}
-// parseVeyronRequest parses a json rpc request into a RpcRequest object.
-func (c *Controller) parseVeyronRequest(data string) (*RpcRequest, error) {
- var msg RpcRequest
- if err := lib.HexVomDecode(data, &msg); err != nil {
- return nil, err
- }
- vlog.VI(2).Infof("RpcRequest: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
- return &msg, nil
-}
-
// getSignature uses the signature manager to get and cache the signature of a remote server.
func (c *Controller) getSignature(ctx *context.T, name string) ([]signature.Interface, error) {
return c.signatureManager.Signature(ctx, name)
diff --git a/services/wspr/internal/app/app_test.go b/services/wspr/internal/app/app_test.go
index 8ee138e..537b8fc 100644
--- a/services/wspr/internal/app/app_test.go
+++ b/services/wspr/internal/app/app_test.go
@@ -204,13 +204,13 @@
}
var stream *outstandingStream
if len(testCase.streamingInputs) > 0 {
- stream = newStream(nil)
+ stream = newStream(nil, nil)
controller.outstandingRequests[0] = &outstandingRequest{
stream: stream,
}
go func() {
for _, value := range testCase.streamingInputs {
- controller.SendOnStream(0, lib.HexVomEncodeOrDie(value), &writer)
+ controller.SendOnStream(0, lib.HexVomEncodeOrDie(value, nil), &writer)
}
controller.CloseStream(0)
}()
@@ -291,19 +291,19 @@
numOutArgs: 1,
expectedStream: []lib.Response{
lib.Response{
- Message: lib.HexVomEncodeOrDie(int32(1)),
+ Message: lib.HexVomEncodeOrDie(int32(1), nil),
Type: lib.ResponseStream,
},
lib.Response{
- Message: lib.HexVomEncodeOrDie(int32(3)),
+ Message: lib.HexVomEncodeOrDie(int32(3), nil),
Type: lib.ResponseStream,
},
lib.Response{
- Message: lib.HexVomEncodeOrDie(int32(6)),
+ Message: lib.HexVomEncodeOrDie(int32(6), nil),
Type: lib.ResponseStream,
},
lib.Response{
- Message: lib.HexVomEncodeOrDie(int32(10)),
+ Message: lib.HexVomEncodeOrDie(int32(10), nil),
Type: lib.ResponseStream,
},
lib.Response{
@@ -435,7 +435,7 @@
vomClientStream := []string{}
for _, m := range testCase.clientStream {
- vomClientStream = append(vomClientStream, lib.HexVomEncodeOrDie(m))
+ vomClientStream = append(vomClientStream, lib.HexVomEncodeOrDie(m, nil))
}
mock := &mockJSServer{
t: t,
diff --git a/services/wspr/internal/app/granter.go b/services/wspr/internal/app/granter.go
index bd9d4a9..659c906 100644
--- a/services/wspr/internal/app/granter.go
+++ b/services/wspr/internal/app/granter.go
@@ -29,7 +29,7 @@
GranterHandle: g.granterHandle,
Call: server.ConvertSecurityCall(g.c, ctx, call, true),
}
- encoded, err := lib.HexVomEncode(request)
+ encoded, err := lib.HexVomEncode(request, nil)
if err != nil {
return security.Blessings{}, err
}
@@ -64,7 +64,7 @@
func (g *granterStream) Send(item interface{}) error {
dataString := item.(string)
var gr *GranterResponse
- if err := lib.HexVomDecode(dataString, &gr); err != nil {
+ if err := lib.HexVomDecode(dataString, &gr, nil); err != nil {
return fmt.Errorf("error decoding granter response: %v", err)
}
g.c <- gr
diff --git a/services/wspr/internal/app/mock_jsServer_test.go b/services/wspr/internal/app/mock_jsServer_test.go
index 1dc2b0f..663efea 100644
--- a/services/wspr/internal/app/mock_jsServer_test.go
+++ b/services/wspr/internal/app/mock_jsServer_test.go
@@ -90,7 +90,7 @@
return lib.HexVomEncodeOrDie(server.LookupReply{
Err: err,
- })
+ }, nil)
}
func (m *mockJSServer) Error(err error) {
@@ -137,7 +137,7 @@
Handle: 0,
Signature: m.serviceSignature,
HasAuthorizer: m.hasAuthorizer,
- })
+ }, nil)
m.controller.HandleLookupResponse(m.flowCount, lookupReply)
return nil
}
@@ -163,7 +163,7 @@
}
var msg server.AuthRequest
- if err := lib.HexVomDecode(v.(string), &msg); err != nil {
+ if err := lib.HexVomDecode(v.(string), &msg, nil); err != nil {
m.controller.HandleAuthResponse(m.flowCount, internalErr(fmt.Sprintf("error decoding %v:", err)))
return nil
}
@@ -207,7 +207,7 @@
authReply := lib.HexVomEncodeOrDie(server.AuthReply{
Err: m.authError,
- })
+ }, nil)
m.controller.HandleAuthResponse(m.flowCount, authReply)
return nil
@@ -224,7 +224,7 @@
}
var msg server.ServerRpcRequest
- if err := lib.HexVomDecode(v.(string), &msg); err != nil {
+ if err := lib.HexVomDecode(v.(string), &msg, nil); err != nil {
m.controller.HandleServerResponse(m.flowCount, internalErr(err))
return nil
}
@@ -295,7 +295,7 @@
defer m.sender.Done()
m.controllerReady.RLock()
for _, v := range m.serverStream {
- m.controller.SendOnStream(m.rpcFlow, lib.HexVomEncodeOrDie(v), m)
+ m.controller.SendOnStream(m.rpcFlow, lib.HexVomEncodeOrDie(v, nil), m)
}
m.controllerReady.RUnlock()
}
@@ -323,7 +323,7 @@
}
m.controllerReady.RLock()
- m.controller.HandleServerResponse(m.rpcFlow, lib.HexVomEncodeOrDie(reply))
+ m.controller.HandleServerResponse(m.rpcFlow, lib.HexVomEncodeOrDie(reply, nil))
m.controllerReady.RUnlock()
return nil
}
diff --git a/services/wspr/internal/app/stream.go b/services/wspr/internal/app/stream.go
index 74852c7..74ca481 100644
--- a/services/wspr/internal/app/stream.go
+++ b/services/wspr/internal/app/stream.go
@@ -8,6 +8,7 @@
"fmt"
"v.io/v23/rpc"
+ "v.io/v23/vom"
"v.io/x/ref/services/wspr/internal/lib"
"v.io/x/ref/services/wspr/internal/principal"
)
@@ -41,11 +42,12 @@
// true if the stream has been closed.
closed bool
+ typeDecoder *vom.TypeDecoder
// Used to translate from JsBlesssings to Blessings
blessingsCache *principal.JSBlessingsHandles
}
-func newStream(cache *principal.JSBlessingsHandles) *outstandingStream {
+func newStream(cache *principal.JSBlessingsHandles, typeDecoder *vom.TypeDecoder) *outstandingStream {
os := &outstandingStream{
initChan: make(chan *initConfig, 1),
// We allow queueing up to 100 messages before init is called.
@@ -53,6 +55,7 @@
messages: make(chan *message, 100),
done: make(chan bool),
blessingsCache: cache,
+ typeDecoder: typeDecoder,
}
go os.loop()
return os
@@ -81,7 +84,7 @@
config := <-os.initChan
for msg := range os.messages {
var item interface{}
- if err := lib.HexVomDecode(msg.data, &item); err != nil {
+ if err := lib.HexVomDecode(msg.data, &item, os.typeDecoder); err != nil {
msg.writer.Error(fmt.Errorf("failed to decode stream arg from %v: %v", msg.data, err))
break
}
diff --git a/services/wspr/internal/lib/hex_vom.go b/services/wspr/internal/lib/hex_vom.go
index 70d3333..2dcbb82 100644
--- a/services/wspr/internal/lib/hex_vom.go
+++ b/services/wspr/internal/lib/hex_vom.go
@@ -14,29 +14,40 @@
"v.io/v23/vom"
)
-func HexVomEncode(v interface{}) (string, error) {
+func HexVomEncode(v interface{}, te *vom.TypeEncoder) (string, error) {
var buf bytes.Buffer
- encoder := vom.NewEncoder(&buf)
+ var encoder *vom.Encoder
+ if te != nil {
+ encoder = vom.NewEncoderWithTypeEncoder(&buf, te)
+ } else {
+
+ encoder = vom.NewEncoder(&buf)
+ }
if err := encoder.Encode(v); err != nil {
return "", err
}
return hex.EncodeToString(buf.Bytes()), nil
}
-func HexVomEncodeOrDie(v interface{}) string {
- s, err := HexVomEncode(v)
+func HexVomEncodeOrDie(v interface{}, te *vom.TypeEncoder) string {
+ s, err := HexVomEncode(v, te)
if err != nil {
panic(err)
}
return s
}
-func HexVomDecode(data string, v interface{}) error {
+func HexVomDecode(data string, v interface{}, td *vom.TypeDecoder) error {
binbytes, err := hex.DecodeString(data)
if err != nil {
return fmt.Errorf("Error decoding hex string %q: %v", data, err)
}
- decoder := vom.NewDecoder(bytes.NewReader(binbytes))
+ var decoder *vom.Decoder
+ if td != nil {
+ decoder = vom.NewDecoderWithTypeDecoder(bytes.NewReader(binbytes), td)
+ } else {
+ decoder = vom.NewDecoder(bytes.NewReader(binbytes))
+ }
return decoder.Decode(v)
}
diff --git a/services/wspr/internal/rpc/server/dispatcher.go b/services/wspr/internal/rpc/server/dispatcher.go
index fa76d75..cb1c457 100644
--- a/services/wspr/internal/rpc/server/dispatcher.go
+++ b/services/wspr/internal/rpc/server/dispatcher.go
@@ -128,7 +128,7 @@
}
var lookupReply LookupReply
- if err := lib.HexVomDecode(data, &lookupReply); err != nil {
+ if err := lib.HexVomDecode(data, &lookupReply, nil); err != nil {
err2 := verror.Convert(verror.ErrInternal, nil, err)
lookupReply = LookupReply{Err: err2}
vlog.Errorf("unmarshaling invoke request failed: %v, %s", err, data)
diff --git a/services/wspr/internal/rpc/server/dispatcher_test.go b/services/wspr/internal/rpc/server/dispatcher_test.go
index df69179..0e6615b 100644
--- a/services/wspr/internal/rpc/server/dispatcher_test.go
+++ b/services/wspr/internal/rpc/server/dispatcher_test.go
@@ -94,7 +94,7 @@
HasAuthorizer: false,
Signature: expectedSig,
}
- d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply))
+ d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply, nil))
}()
invoker, auth, err := d.Lookup("a/b")
@@ -143,7 +143,7 @@
HasAuthorizer: true,
Signature: expectedSig,
}
- d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply))
+ d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply, nil))
}()
invoker, auth, err := d.Lookup("a/b")
@@ -187,7 +187,7 @@
reply := LookupReply{
Err: verror.New(verror.ErrNoExist, nil),
}
- d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply))
+ d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply, nil))
}()
_, _, err := d.Lookup("a/b")
diff --git a/services/wspr/internal/rpc/server/server.go b/services/wspr/internal/rpc/server/server.go
index f056c91..c709731 100644
--- a/services/wspr/internal/rpc/server/server.go
+++ b/services/wspr/internal/rpc/server/server.go
@@ -174,7 +174,7 @@
Args: vdlValArgs,
Call: rpcCall,
}
- vomMessage, err := lib.HexVomEncode(message)
+ vomMessage, err := lib.HexVomEncode(message, nil)
if err != nil {
return errHandler(err)
}
@@ -283,7 +283,7 @@
Args: []*vdl.Value{vdl.ValueOf(pattern)},
Call: rpcCall,
}
- vomMessage, err := lib.HexVomEncode(message)
+ vomMessage, err := lib.HexVomEncode(message, nil)
if err != nil {
return errHandler(err)
}
@@ -320,7 +320,7 @@
item = principal.ConvertBlessingsToHandle(blessings, blessingsCache.GetOrAddBlessingsHandle(blessings))
}
- vomItem, err := lib.HexVomEncode(item)
+ vomItem, err := lib.HexVomEncode(item, nil)
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, nil, err))
return
@@ -558,7 +558,7 @@
}
vlog.VI(0).Infof("Sending out auth request for %v, %v", flow.ID, message)
- vomMessage, err := lib.HexVomEncode(message)
+ vomMessage, err := lib.HexVomEncode(message, nil)
if err != nil {
replyChan <- verror.Convert(verror.ErrInternal, nil, err)
} else if err := flow.Writer.Send(lib.ResponseAuthRequest, vomMessage); err != nil {
@@ -648,7 +648,7 @@
// Decode the result and send it through the channel
var reply lib.ServerRpcReply
- if err := lib.HexVomDecode(data, &reply); err != nil {
+ if err := lib.HexVomDecode(data, &reply, nil); err != nil {
reply.Err = err
}
@@ -690,7 +690,7 @@
}
// Decode the result and send it through the channel
var reply AuthReply
- if err := lib.HexVomDecode(data, &reply); err != nil {
+ if err := lib.HexVomDecode(data, &reply, nil); err != nil {
err = verror.Convert(verror.ErrInternal, nil, err)
reply = AuthReply{Err: err}
}
@@ -720,7 +720,7 @@
}
var reply CaveatValidationResponse
- if err := lib.HexVomDecode(data, &reply); err != nil {
+ if err := lib.HexVomDecode(data, &reply, nil); err != nil {
vlog.Errorf("failed to decode validation response %q: error %v", data, err)
ch <- []error{}
return