Merge branch 'decode2' into decode3
Change-Id: Iee5b4bf0476eb89a85202635ee75d4c1287133cc
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
index 565025b..f26d773 100644
--- a/services/wspr/internal/app/app.go
+++ b/services/wspr/internal/app/app.go
@@ -175,7 +175,7 @@
if blessings, ok := item.(security.Blessings); ok {
item = principal.ConvertBlessingsToHandle(blessings, c.blessingsHandles.GetOrAddHandle(blessings))
}
- vomItem, err := lib.HexVomEncode(item)
+ vomItem, err := lib.HexVomEncode(item, c.typeEncoder)
if err != nil {
w.Error(verror.New(marshallingError, ctx, item, err))
continue
@@ -294,7 +294,7 @@
id := c.lastGeneratedId
c.lastGeneratedId += 2
c.flowMap[id] = s
- os := newStream(c.blessingsHandles)
+ os := newStream(c.blessingsHandles, c.typeDecoder)
os.init(stream)
c.outstandingRequests[id] = &outstandingRequest{
stream: os,
@@ -445,10 +445,11 @@
// but currently none of the methods the controller exports require
// any of this context information.
type localCall struct {
- ctx *context.T
- vrpc *RpcRequest
- tags []*vdl.Value
- w lib.ClientWriter
+ ctx *context.T
+ vrpc *RpcRequest
+ tags []*vdl.Value
+ w lib.ClientWriter
+ typeEncoder *vom.TypeEncoder
}
var (
@@ -457,7 +458,7 @@
)
func (l *localCall) Send(item interface{}) error {
- vomItem, err := lib.HexVomEncode(item)
+ vomItem, err := lib.HexVomEncode(item, l.typeEncoder)
if err != nil {
err = verror.New(marshallingError, l.ctx, item, err)
l.w.Error(err)
@@ -498,7 +499,7 @@
return
}
}
- results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w}, msg.Method, argptrs)
+ results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w, c.typeEncoder}, msg.Method, argptrs)
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, ctx, err))
return
@@ -594,7 +595,7 @@
// to put the outstanding stream in the map before we make the async call so that
// the future send know which queue to write to, even if the client call isn't
// actually ready yet.
- request.stream = newStream(c.blessingsHandles)
+ request.stream = newStream(c.blessingsHandles, c.typeDecoder)
}
c.Lock()
c.outstandingRequests[id] = request
@@ -745,16 +746,6 @@
server.HandleServerResponse(id, data)
}
-// parseVeyronRequest parses a json rpc request into a RpcRequest object.
-func (c *Controller) parseVeyronRequest(data string) (*RpcRequest, error) {
- var msg RpcRequest
- if err := lib.HexVomDecode(data, &msg); err != nil {
- return nil, err
- }
- vlog.VI(2).Infof("RpcRequest: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
- return &msg, nil
-}
-
// getSignature uses the signature manager to get and cache the signature of a remote server.
func (c *Controller) getSignature(ctx *context.T, name string) ([]signature.Interface, error) {
return c.signatureManager.Signature(ctx, name)
@@ -986,3 +977,7 @@
vlog.Errorf("unexpected error sending blessings cache message: %v", err)
}
}
+
+func (c *Controller) TypeEncoder() *vom.TypeEncoder {
+ return c.typeEncoder
+}
diff --git a/services/wspr/internal/app/app_test.go b/services/wspr/internal/app/app_test.go
index d7e7333..b71c6ff 100644
--- a/services/wspr/internal/app/app_test.go
+++ b/services/wspr/internal/app/app_test.go
@@ -204,13 +204,13 @@
}
var stream *outstandingStream
if len(testCase.streamingInputs) > 0 {
- stream = newStream(nil)
+ stream = newStream(nil, nil)
controller.outstandingRequests[0] = &outstandingRequest{
stream: stream,
}
go func() {
for _, value := range testCase.streamingInputs {
- controller.SendOnStream(0, lib.HexVomEncodeOrDie(value), &writer)
+ controller.SendOnStream(0, lib.HexVomEncodeOrDie(value, nil), &writer)
}
controller.CloseStream(0)
}()
@@ -291,19 +291,19 @@
numOutArgs: 1,
expectedStream: []lib.Response{
lib.Response{
- Message: lib.HexVomEncodeOrDie(int32(1)),
+ Message: lib.HexVomEncodeOrDie(int32(1), nil),
Type: lib.ResponseStream,
},
lib.Response{
- Message: lib.HexVomEncodeOrDie(int32(3)),
+ Message: lib.HexVomEncodeOrDie(int32(3), nil),
Type: lib.ResponseStream,
},
lib.Response{
- Message: lib.HexVomEncodeOrDie(int32(6)),
+ Message: lib.HexVomEncodeOrDie(int32(6), nil),
Type: lib.ResponseStream,
},
lib.Response{
- Message: lib.HexVomEncodeOrDie(int32(10)),
+ Message: lib.HexVomEncodeOrDie(int32(10), nil),
Type: lib.ResponseStream,
},
lib.Response{
@@ -435,7 +435,7 @@
vomClientStream := []string{}
for _, m := range testCase.clientStream {
- vomClientStream = append(vomClientStream, lib.HexVomEncodeOrDie(m))
+ vomClientStream = append(vomClientStream, lib.HexVomEncodeOrDie(m, nil))
}
mock := &mockJSServer{
t: t,
diff --git a/services/wspr/internal/app/granter.go b/services/wspr/internal/app/granter.go
index bd9d4a9..659c906 100644
--- a/services/wspr/internal/app/granter.go
+++ b/services/wspr/internal/app/granter.go
@@ -29,7 +29,7 @@
GranterHandle: g.granterHandle,
Call: server.ConvertSecurityCall(g.c, ctx, call, true),
}
- encoded, err := lib.HexVomEncode(request)
+ encoded, err := lib.HexVomEncode(request, nil)
if err != nil {
return security.Blessings{}, err
}
@@ -64,7 +64,7 @@
func (g *granterStream) Send(item interface{}) error {
dataString := item.(string)
var gr *GranterResponse
- if err := lib.HexVomDecode(dataString, &gr); err != nil {
+ if err := lib.HexVomDecode(dataString, &gr, nil); err != nil {
return fmt.Errorf("error decoding granter response: %v", err)
}
g.c <- gr
diff --git a/services/wspr/internal/app/mock_jsServer_test.go b/services/wspr/internal/app/mock_jsServer_test.go
index b0f89c8..27891d2 100644
--- a/services/wspr/internal/app/mock_jsServer_test.go
+++ b/services/wspr/internal/app/mock_jsServer_test.go
@@ -89,7 +89,7 @@
return lib.HexVomEncodeOrDie(server.LookupReply{
Err: err,
- })
+ }, nil)
}
func (m *mockJSServer) Error(err error) {
@@ -136,7 +136,7 @@
Handle: 0,
Signature: m.serviceSignature,
HasAuthorizer: m.hasAuthorizer,
- })
+ }, nil)
m.controller.HandleLookupResponse(m.flowCount, lookupReply)
return nil
}
@@ -157,7 +157,7 @@
}
var msg server.AuthRequest
- if err := lib.HexVomDecode(v.(string), &msg); err != nil {
+ if err := lib.HexVomDecode(v.(string), &msg, nil); err != nil {
m.controller.HandleAuthResponse(m.flowCount, internalErr(fmt.Sprintf("error decoding %v:", err)))
return nil
}
@@ -201,7 +201,7 @@
authReply := lib.HexVomEncodeOrDie(server.AuthReply{
Err: m.authError,
- })
+ }, nil)
m.controller.HandleAuthResponse(m.flowCount, authReply)
return nil
@@ -218,7 +218,7 @@
}
var msg server.ServerRpcRequest
- if err := lib.HexVomDecode(v.(string), &msg); err != nil {
+ if err := lib.HexVomDecode(v.(string), &msg, nil); err != nil {
m.controller.HandleServerResponse(m.flowCount, internalErr(err))
return nil
}
@@ -294,7 +294,7 @@
defer m.sender.Done()
m.controllerReady.RLock()
for _, v := range m.serverStream {
- m.controller.SendOnStream(m.rpcFlow, lib.HexVomEncodeOrDie(v), m)
+ m.controller.SendOnStream(m.rpcFlow, lib.HexVomEncodeOrDie(v, nil), m)
}
m.controllerReady.RUnlock()
}
@@ -322,7 +322,7 @@
}
m.controllerReady.RLock()
- m.controller.HandleServerResponse(m.rpcFlow, lib.HexVomEncodeOrDie(reply))
+ m.controller.HandleServerResponse(m.rpcFlow, lib.HexVomEncodeOrDie(reply, nil))
m.controllerReady.RUnlock()
return nil
}
diff --git a/services/wspr/internal/app/stream.go b/services/wspr/internal/app/stream.go
index 74852c7..74ca481 100644
--- a/services/wspr/internal/app/stream.go
+++ b/services/wspr/internal/app/stream.go
@@ -8,6 +8,7 @@
"fmt"
"v.io/v23/rpc"
+ "v.io/v23/vom"
"v.io/x/ref/services/wspr/internal/lib"
"v.io/x/ref/services/wspr/internal/principal"
)
@@ -41,11 +42,12 @@
// true if the stream has been closed.
closed bool
+ typeDecoder *vom.TypeDecoder
// Used to translate from JsBlesssings to Blessings
blessingsCache *principal.JSBlessingsHandles
}
-func newStream(cache *principal.JSBlessingsHandles) *outstandingStream {
+func newStream(cache *principal.JSBlessingsHandles, typeDecoder *vom.TypeDecoder) *outstandingStream {
os := &outstandingStream{
initChan: make(chan *initConfig, 1),
// We allow queueing up to 100 messages before init is called.
@@ -53,6 +55,7 @@
messages: make(chan *message, 100),
done: make(chan bool),
blessingsCache: cache,
+ typeDecoder: typeDecoder,
}
go os.loop()
return os
@@ -81,7 +84,7 @@
config := <-os.initChan
for msg := range os.messages {
var item interface{}
- if err := lib.HexVomDecode(msg.data, &item); err != nil {
+ if err := lib.HexVomDecode(msg.data, &item, os.typeDecoder); err != nil {
msg.writer.Error(fmt.Errorf("failed to decode stream arg from %v: %v", msg.data, err))
break
}
diff --git a/services/wspr/internal/lib/binary_util.go b/services/wspr/internal/lib/binary_util.go
index fb4afab..936376f 100644
--- a/services/wspr/internal/lib/binary_util.go
+++ b/services/wspr/internal/lib/binary_util.go
@@ -18,8 +18,13 @@
errUintOverflow = verror.Register(pkgPath+".errUintOverflow", verror.NoRetry, "{1:}{2:} wspr: scalar larger than 8 bytes{:_}")
)
+<<<<<<< HEAD
+// The logic of the two functions here are copied from the vom implementations and
+// should be kept in sync.
+=======
// This code has been copied from the vom package and should be kept up to date
// with it.
+>>>>>>> decode2
// Unsigned integers are the basis for all other primitive values. This is a
// two-state encoding. If the number is less than 128 (0 through 0x7f), its
diff --git a/services/wspr/internal/lib/hex_vom.go b/services/wspr/internal/lib/hex_vom.go
index 24663e9..a7ac6c0 100644
--- a/services/wspr/internal/lib/hex_vom.go
+++ b/services/wspr/internal/lib/hex_vom.go
@@ -12,28 +12,39 @@
"v.io/v23/vom"
)
-func HexVomEncode(v interface{}) (string, error) {
+func HexVomEncode(v interface{}, te *vom.TypeEncoder) (string, error) {
var buf bytes.Buffer
- encoder := vom.NewEncoder(&buf)
+ var encoder *vom.Encoder
+ if te != nil {
+ encoder = vom.NewEncoderWithTypeEncoder(&buf, te)
+ } else {
+
+ encoder = vom.NewEncoder(&buf)
+ }
if err := encoder.Encode(v); err != nil {
return "", err
}
return hex.EncodeToString(buf.Bytes()), nil
}
-func HexVomEncodeOrDie(v interface{}) string {
- s, err := HexVomEncode(v)
+func HexVomEncodeOrDie(v interface{}, te *vom.TypeEncoder) string {
+ s, err := HexVomEncode(v, te)
if err != nil {
panic(err)
}
return s
}
-func HexVomDecode(data string, v interface{}) error {
+func HexVomDecode(data string, v interface{}, td *vom.TypeDecoder) error {
binbytes, err := hex.DecodeString(data)
if err != nil {
return fmt.Errorf("Error decoding hex string %q: %v", data, err)
}
- decoder := vom.NewDecoder(bytes.NewReader(binbytes))
+ var decoder *vom.Decoder
+ if td != nil {
+ decoder = vom.NewDecoderWithTypeDecoder(bytes.NewReader(binbytes), td)
+ } else {
+ decoder = vom.NewDecoder(bytes.NewReader(binbytes))
+ }
return decoder.Decode(v)
}
diff --git a/services/wspr/internal/rpc/server/dispatcher.go b/services/wspr/internal/rpc/server/dispatcher.go
index fa76d75..cb1c457 100644
--- a/services/wspr/internal/rpc/server/dispatcher.go
+++ b/services/wspr/internal/rpc/server/dispatcher.go
@@ -128,7 +128,7 @@
}
var lookupReply LookupReply
- if err := lib.HexVomDecode(data, &lookupReply); err != nil {
+ if err := lib.HexVomDecode(data, &lookupReply, nil); err != nil {
err2 := verror.Convert(verror.ErrInternal, nil, err)
lookupReply = LookupReply{Err: err2}
vlog.Errorf("unmarshaling invoke request failed: %v, %s", err, data)
diff --git a/services/wspr/internal/rpc/server/dispatcher_test.go b/services/wspr/internal/rpc/server/dispatcher_test.go
index df69179..0e6615b 100644
--- a/services/wspr/internal/rpc/server/dispatcher_test.go
+++ b/services/wspr/internal/rpc/server/dispatcher_test.go
@@ -94,7 +94,7 @@
HasAuthorizer: false,
Signature: expectedSig,
}
- d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply))
+ d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply, nil))
}()
invoker, auth, err := d.Lookup("a/b")
@@ -143,7 +143,7 @@
HasAuthorizer: true,
Signature: expectedSig,
}
- d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply))
+ d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply, nil))
}()
invoker, auth, err := d.Lookup("a/b")
@@ -187,7 +187,7 @@
reply := LookupReply{
Err: verror.New(verror.ErrNoExist, nil),
}
- d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply))
+ d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply, nil))
}()
_, _, err := d.Lookup("a/b")
diff --git a/services/wspr/internal/rpc/server/server.go b/services/wspr/internal/rpc/server/server.go
index eb54436..e3cd66e 100644
--- a/services/wspr/internal/rpc/server/server.go
+++ b/services/wspr/internal/rpc/server/server.go
@@ -21,6 +21,7 @@
"v.io/v23/vdlroot/signature"
vdltime "v.io/v23/vdlroot/time"
"v.io/v23/verror"
+ "v.io/v23/vom"
"v.io/v23/vtrace"
"v.io/x/lib/vlog"
"v.io/x/ref/services/wspr/internal/lib"
@@ -51,6 +52,8 @@
SendLogMessage(level lib.LogLevel, msg string) error
BlessingsCache() *principal.BlessingsCache
+ TypeEncoder() *vom.TypeEncoder
+
Context() *context.T
}
@@ -177,7 +180,7 @@
Args: vdlValArgs,
Call: rpcCall,
}
- vomMessage, err := lib.HexVomEncode(message)
+ vomMessage, err := lib.HexVomEncode(message, nil)
if err != nil {
return errHandler(err)
}
@@ -203,7 +206,7 @@
ch <- &lib.ServerRpcReply{nil, &err, vtrace.Response{}}
}()
- go proxyStream(call, flow.Writer, s.helper)
+ go proxyStream(call, flow.Writer, s.helper, s.helper.TypeEncoder())
return replyChan
}
@@ -286,7 +289,7 @@
Args: []*vdl.Value{vdl.ValueOf(pattern)},
Call: rpcCall,
}
- vomMessage, err := lib.HexVomEncode(message)
+ vomMessage, err := lib.HexVomEncode(message, nil)
if err != nil {
return errHandler(err)
}
@@ -316,7 +319,7 @@
}
}
-func proxyStream(stream rpc.Stream, w lib.ClientWriter, blessingsCache HandleStore) {
+func proxyStream(stream rpc.Stream, w lib.ClientWriter, blessingsCache HandleStore, typeEncoder *vom.TypeEncoder) {
var item interface{}
var err error
for err = stream.Recv(&item); err == nil; err = stream.Recv(&item) {
@@ -324,7 +327,7 @@
item = principal.ConvertBlessingsToHandle(blessings, blessingsCache.GetOrAddBlessingsHandle(blessings))
}
- vomItem, err := lib.HexVomEncode(item)
+ vomItem, err := lib.HexVomEncode(item, typeEncoder)
if err != nil {
w.Error(verror.Convert(verror.ErrInternal, nil, err))
return
@@ -565,7 +568,7 @@
}
vlog.VI(0).Infof("Sending out auth request for %v, %v", flow.ID, message)
- vomMessage, err := lib.HexVomEncode(message)
+ vomMessage, err := lib.HexVomEncode(message, nil)
if err != nil {
replyChan <- verror.Convert(verror.ErrInternal, nil, err)
} else if err := flow.Writer.Send(lib.ResponseAuthRequest, vomMessage); err != nil {
@@ -655,7 +658,7 @@
// Decode the result and send it through the channel
var reply lib.ServerRpcReply
- if err := lib.HexVomDecode(data, &reply); err != nil {
+ if err := lib.HexVomDecode(data, &reply, nil); err != nil {
reply.Err = err
}
@@ -697,7 +700,7 @@
}
// Decode the result and send it through the channel
var reply AuthReply
- if err := lib.HexVomDecode(data, &reply); err != nil {
+ if err := lib.HexVomDecode(data, &reply, nil); err != nil {
err = verror.Convert(verror.ErrInternal, nil, err)
reply = AuthReply{Err: err}
}
@@ -727,7 +730,7 @@
}
var reply CaveatValidationResponse
- if err := lib.HexVomDecode(data, &reply); err != nil {
+ if err := lib.HexVomDecode(data, &reply, nil); err != nil {
vlog.Errorf("failed to decode validation response %q: error %v", data, err)
ch <- []error{}
return