Share type stream for streaming messages.

Change-Id: Ia5e6ea58fdd02173fdb6698c6afb69b3f22c1fb0
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
index f456192..5eb77d1 100644
--- a/services/wspr/internal/app/app.go
+++ b/services/wspr/internal/app/app.go
@@ -166,7 +166,7 @@
 			if blessings, ok := item.(security.Blessings); ok {
 				item = principal.ConvertBlessingsToHandle(blessings, c.blessingsCache.GetOrAddHandle(blessings))
 			}
-			vomItem, err := lib.HexVomEncode(item)
+			vomItem, err := lib.HexVomEncode(item, c.typeEncoder)
 			if err != nil {
 				w.Error(verror.New(marshallingError, ctx, item, err))
 				continue
@@ -284,7 +284,7 @@
 	id := c.lastGeneratedId
 	c.lastGeneratedId += 2
 	c.flowMap[id] = s
-	os := newStream(c.blessingsCache)
+	os := newStream(c.blessingsCache, c.typeDecoder)
 	os.init(stream)
 	c.outstandingRequests[id] = &outstandingRequest{
 		stream: os,
@@ -430,10 +430,11 @@
 // but currently none of the methods the controller exports require
 // any of this context information.
 type localCall struct {
-	ctx  *context.T
-	vrpc *RpcRequest
-	tags []*vdl.Value
-	w    lib.ClientWriter
+	ctx         *context.T
+	vrpc        *RpcRequest
+	tags        []*vdl.Value
+	w           lib.ClientWriter
+	typeEncoder *vom.TypeEncoder
 }
 
 var (
@@ -442,7 +443,7 @@
 )
 
 func (l *localCall) Send(item interface{}) error {
-	vomItem, err := lib.HexVomEncode(item)
+	vomItem, err := lib.HexVomEncode(item, l.typeEncoder)
 	if err != nil {
 		err = verror.New(marshallingError, l.ctx, item, err)
 		l.w.Error(err)
@@ -483,7 +484,7 @@
 			return
 		}
 	}
-	results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w}, msg.Method, argptrs)
+	results, err := invoker.Invoke(ctx, &localCall{ctx, msg, tags, w, c.typeEncoder}, msg.Method, argptrs)
 	if err != nil {
 		w.Error(verror.Convert(verror.ErrInternal, ctx, err))
 		return
@@ -579,7 +580,7 @@
 		// to put the outstanding stream in the map before we make the async call so that
 		// the future send know which queue to write to, even if the client call isn't
 		// actually ready yet.
-		request.stream = newStream(c.blessingsCache)
+		request.stream = newStream(c.blessingsCache, c.typeDecoder)
 	}
 	c.Lock()
 	c.outstandingRequests[id] = request
@@ -730,16 +731,6 @@
 	server.HandleServerResponse(id, data)
 }
 
-// parseVeyronRequest parses a json rpc request into a RpcRequest object.
-func (c *Controller) parseVeyronRequest(data string) (*RpcRequest, error) {
-	var msg RpcRequest
-	if err := lib.HexVomDecode(data, &msg); err != nil {
-		return nil, err
-	}
-	vlog.VI(2).Infof("RpcRequest: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
-	return &msg, nil
-}
-
 // getSignature uses the signature manager to get and cache the signature of a remote server.
 func (c *Controller) getSignature(ctx *context.T, name string) ([]signature.Interface, error) {
 	return c.signatureManager.Signature(ctx, name)
diff --git a/services/wspr/internal/app/app_test.go b/services/wspr/internal/app/app_test.go
index 8ee138e..537b8fc 100644
--- a/services/wspr/internal/app/app_test.go
+++ b/services/wspr/internal/app/app_test.go
@@ -204,13 +204,13 @@
 	}
 	var stream *outstandingStream
 	if len(testCase.streamingInputs) > 0 {
-		stream = newStream(nil)
+		stream = newStream(nil, nil)
 		controller.outstandingRequests[0] = &outstandingRequest{
 			stream: stream,
 		}
 		go func() {
 			for _, value := range testCase.streamingInputs {
-				controller.SendOnStream(0, lib.HexVomEncodeOrDie(value), &writer)
+				controller.SendOnStream(0, lib.HexVomEncodeOrDie(value, nil), &writer)
 			}
 			controller.CloseStream(0)
 		}()
@@ -291,19 +291,19 @@
 		numOutArgs:         1,
 		expectedStream: []lib.Response{
 			lib.Response{
-				Message: lib.HexVomEncodeOrDie(int32(1)),
+				Message: lib.HexVomEncodeOrDie(int32(1), nil),
 				Type:    lib.ResponseStream,
 			},
 			lib.Response{
-				Message: lib.HexVomEncodeOrDie(int32(3)),
+				Message: lib.HexVomEncodeOrDie(int32(3), nil),
 				Type:    lib.ResponseStream,
 			},
 			lib.Response{
-				Message: lib.HexVomEncodeOrDie(int32(6)),
+				Message: lib.HexVomEncodeOrDie(int32(6), nil),
 				Type:    lib.ResponseStream,
 			},
 			lib.Response{
-				Message: lib.HexVomEncodeOrDie(int32(10)),
+				Message: lib.HexVomEncodeOrDie(int32(10), nil),
 				Type:    lib.ResponseStream,
 			},
 			lib.Response{
@@ -435,7 +435,7 @@
 
 	vomClientStream := []string{}
 	for _, m := range testCase.clientStream {
-		vomClientStream = append(vomClientStream, lib.HexVomEncodeOrDie(m))
+		vomClientStream = append(vomClientStream, lib.HexVomEncodeOrDie(m, nil))
 	}
 	mock := &mockJSServer{
 		t:                    t,
diff --git a/services/wspr/internal/app/granter.go b/services/wspr/internal/app/granter.go
index bd9d4a9..659c906 100644
--- a/services/wspr/internal/app/granter.go
+++ b/services/wspr/internal/app/granter.go
@@ -29,7 +29,7 @@
 		GranterHandle: g.granterHandle,
 		Call:          server.ConvertSecurityCall(g.c, ctx, call, true),
 	}
-	encoded, err := lib.HexVomEncode(request)
+	encoded, err := lib.HexVomEncode(request, nil)
 	if err != nil {
 		return security.Blessings{}, err
 	}
@@ -64,7 +64,7 @@
 func (g *granterStream) Send(item interface{}) error {
 	dataString := item.(string)
 	var gr *GranterResponse
-	if err := lib.HexVomDecode(dataString, &gr); err != nil {
+	if err := lib.HexVomDecode(dataString, &gr, nil); err != nil {
 		return fmt.Errorf("error decoding granter response: %v", err)
 	}
 	g.c <- gr
diff --git a/services/wspr/internal/app/mock_jsServer_test.go b/services/wspr/internal/app/mock_jsServer_test.go
index 1dc2b0f..663efea 100644
--- a/services/wspr/internal/app/mock_jsServer_test.go
+++ b/services/wspr/internal/app/mock_jsServer_test.go
@@ -90,7 +90,7 @@
 
 	return lib.HexVomEncodeOrDie(server.LookupReply{
 		Err: err,
-	})
+	}, nil)
 }
 
 func (m *mockJSServer) Error(err error) {
@@ -137,7 +137,7 @@
 		Handle:        0,
 		Signature:     m.serviceSignature,
 		HasAuthorizer: m.hasAuthorizer,
-	})
+	}, nil)
 	m.controller.HandleLookupResponse(m.flowCount, lookupReply)
 	return nil
 }
@@ -163,7 +163,7 @@
 	}
 
 	var msg server.AuthRequest
-	if err := lib.HexVomDecode(v.(string), &msg); err != nil {
+	if err := lib.HexVomDecode(v.(string), &msg, nil); err != nil {
 		m.controller.HandleAuthResponse(m.flowCount, internalErr(fmt.Sprintf("error decoding %v:", err)))
 		return nil
 	}
@@ -207,7 +207,7 @@
 
 	authReply := lib.HexVomEncodeOrDie(server.AuthReply{
 		Err: m.authError,
-	})
+	}, nil)
 
 	m.controller.HandleAuthResponse(m.flowCount, authReply)
 	return nil
@@ -224,7 +224,7 @@
 	}
 
 	var msg server.ServerRpcRequest
-	if err := lib.HexVomDecode(v.(string), &msg); err != nil {
+	if err := lib.HexVomDecode(v.(string), &msg, nil); err != nil {
 		m.controller.HandleServerResponse(m.flowCount, internalErr(err))
 		return nil
 	}
@@ -295,7 +295,7 @@
 	defer m.sender.Done()
 	m.controllerReady.RLock()
 	for _, v := range m.serverStream {
-		m.controller.SendOnStream(m.rpcFlow, lib.HexVomEncodeOrDie(v), m)
+		m.controller.SendOnStream(m.rpcFlow, lib.HexVomEncodeOrDie(v, nil), m)
 	}
 	m.controllerReady.RUnlock()
 }
@@ -323,7 +323,7 @@
 	}
 
 	m.controllerReady.RLock()
-	m.controller.HandleServerResponse(m.rpcFlow, lib.HexVomEncodeOrDie(reply))
+	m.controller.HandleServerResponse(m.rpcFlow, lib.HexVomEncodeOrDie(reply, nil))
 	m.controllerReady.RUnlock()
 	return nil
 }
diff --git a/services/wspr/internal/app/stream.go b/services/wspr/internal/app/stream.go
index 74852c7..74ca481 100644
--- a/services/wspr/internal/app/stream.go
+++ b/services/wspr/internal/app/stream.go
@@ -8,6 +8,7 @@
 	"fmt"
 
 	"v.io/v23/rpc"
+	"v.io/v23/vom"
 	"v.io/x/ref/services/wspr/internal/lib"
 	"v.io/x/ref/services/wspr/internal/principal"
 )
@@ -41,11 +42,12 @@
 	// true if the stream has been closed.
 	closed bool
 
+	typeDecoder *vom.TypeDecoder
 	// Used to translate from JsBlesssings to Blessings
 	blessingsCache *principal.JSBlessingsHandles
 }
 
-func newStream(cache *principal.JSBlessingsHandles) *outstandingStream {
+func newStream(cache *principal.JSBlessingsHandles, typeDecoder *vom.TypeDecoder) *outstandingStream {
 	os := &outstandingStream{
 		initChan: make(chan *initConfig, 1),
 		// We allow queueing up to 100 messages before init is called.
@@ -53,6 +55,7 @@
 		messages:       make(chan *message, 100),
 		done:           make(chan bool),
 		blessingsCache: cache,
+		typeDecoder:    typeDecoder,
 	}
 	go os.loop()
 	return os
@@ -81,7 +84,7 @@
 	config := <-os.initChan
 	for msg := range os.messages {
 		var item interface{}
-		if err := lib.HexVomDecode(msg.data, &item); err != nil {
+		if err := lib.HexVomDecode(msg.data, &item, os.typeDecoder); err != nil {
 			msg.writer.Error(fmt.Errorf("failed to decode stream arg from %v: %v", msg.data, err))
 			break
 		}
diff --git a/services/wspr/internal/lib/hex_vom.go b/services/wspr/internal/lib/hex_vom.go
index 70d3333..2dcbb82 100644
--- a/services/wspr/internal/lib/hex_vom.go
+++ b/services/wspr/internal/lib/hex_vom.go
@@ -14,29 +14,40 @@
 	"v.io/v23/vom"
 )
 
-func HexVomEncode(v interface{}) (string, error) {
+func HexVomEncode(v interface{}, te *vom.TypeEncoder) (string, error) {
 	var buf bytes.Buffer
-	encoder := vom.NewEncoder(&buf)
+	var encoder *vom.Encoder
+	if te != nil {
+		encoder = vom.NewEncoderWithTypeEncoder(&buf, te)
+	} else {
+
+		encoder = vom.NewEncoder(&buf)
+	}
 	if err := encoder.Encode(v); err != nil {
 		return "", err
 	}
 	return hex.EncodeToString(buf.Bytes()), nil
 }
 
-func HexVomEncodeOrDie(v interface{}) string {
-	s, err := HexVomEncode(v)
+func HexVomEncodeOrDie(v interface{}, te *vom.TypeEncoder) string {
+	s, err := HexVomEncode(v, te)
 	if err != nil {
 		panic(err)
 	}
 	return s
 }
 
-func HexVomDecode(data string, v interface{}) error {
+func HexVomDecode(data string, v interface{}, td *vom.TypeDecoder) error {
 	binbytes, err := hex.DecodeString(data)
 	if err != nil {
 		return fmt.Errorf("Error decoding hex string %q: %v", data, err)
 	}
-	decoder := vom.NewDecoder(bytes.NewReader(binbytes))
+	var decoder *vom.Decoder
+	if td != nil {
+		decoder = vom.NewDecoderWithTypeDecoder(bytes.NewReader(binbytes), td)
+	} else {
+		decoder = vom.NewDecoder(bytes.NewReader(binbytes))
+	}
 	return decoder.Decode(v)
 }
 
diff --git a/services/wspr/internal/rpc/server/dispatcher.go b/services/wspr/internal/rpc/server/dispatcher.go
index fa76d75..cb1c457 100644
--- a/services/wspr/internal/rpc/server/dispatcher.go
+++ b/services/wspr/internal/rpc/server/dispatcher.go
@@ -128,7 +128,7 @@
 	}
 
 	var lookupReply LookupReply
-	if err := lib.HexVomDecode(data, &lookupReply); err != nil {
+	if err := lib.HexVomDecode(data, &lookupReply, nil); err != nil {
 		err2 := verror.Convert(verror.ErrInternal, nil, err)
 		lookupReply = LookupReply{Err: err2}
 		vlog.Errorf("unmarshaling invoke request failed: %v, %s", err, data)
diff --git a/services/wspr/internal/rpc/server/dispatcher_test.go b/services/wspr/internal/rpc/server/dispatcher_test.go
index df69179..0e6615b 100644
--- a/services/wspr/internal/rpc/server/dispatcher_test.go
+++ b/services/wspr/internal/rpc/server/dispatcher_test.go
@@ -94,7 +94,7 @@
 			HasAuthorizer: false,
 			Signature:     expectedSig,
 		}
-		d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply))
+		d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply, nil))
 	}()
 
 	invoker, auth, err := d.Lookup("a/b")
@@ -143,7 +143,7 @@
 			HasAuthorizer: true,
 			Signature:     expectedSig,
 		}
-		d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply))
+		d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply, nil))
 	}()
 
 	invoker, auth, err := d.Lookup("a/b")
@@ -187,7 +187,7 @@
 		reply := LookupReply{
 			Err: verror.New(verror.ErrNoExist, nil),
 		}
-		d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply))
+		d.handleLookupResponse(0, lib.HexVomEncodeOrDie(reply, nil))
 	}()
 
 	_, _, err := d.Lookup("a/b")
diff --git a/services/wspr/internal/rpc/server/server.go b/services/wspr/internal/rpc/server/server.go
index f056c91..c709731 100644
--- a/services/wspr/internal/rpc/server/server.go
+++ b/services/wspr/internal/rpc/server/server.go
@@ -174,7 +174,7 @@
 			Args:     vdlValArgs,
 			Call:     rpcCall,
 		}
-		vomMessage, err := lib.HexVomEncode(message)
+		vomMessage, err := lib.HexVomEncode(message, nil)
 		if err != nil {
 			return errHandler(err)
 		}
@@ -283,7 +283,7 @@
 			Args:     []*vdl.Value{vdl.ValueOf(pattern)},
 			Call:     rpcCall,
 		}
-		vomMessage, err := lib.HexVomEncode(message)
+		vomMessage, err := lib.HexVomEncode(message, nil)
 		if err != nil {
 			return errHandler(err)
 		}
@@ -320,7 +320,7 @@
 			item = principal.ConvertBlessingsToHandle(blessings, blessingsCache.GetOrAddBlessingsHandle(blessings))
 
 		}
-		vomItem, err := lib.HexVomEncode(item)
+		vomItem, err := lib.HexVomEncode(item, nil)
 		if err != nil {
 			w.Error(verror.Convert(verror.ErrInternal, nil, err))
 			return
@@ -558,7 +558,7 @@
 	}
 	vlog.VI(0).Infof("Sending out auth request for %v, %v", flow.ID, message)
 
-	vomMessage, err := lib.HexVomEncode(message)
+	vomMessage, err := lib.HexVomEncode(message, nil)
 	if err != nil {
 		replyChan <- verror.Convert(verror.ErrInternal, nil, err)
 	} else if err := flow.Writer.Send(lib.ResponseAuthRequest, vomMessage); err != nil {
@@ -648,7 +648,7 @@
 
 	// Decode the result and send it through the channel
 	var reply lib.ServerRpcReply
-	if err := lib.HexVomDecode(data, &reply); err != nil {
+	if err := lib.HexVomDecode(data, &reply, nil); err != nil {
 		reply.Err = err
 	}
 
@@ -690,7 +690,7 @@
 	}
 	// Decode the result and send it through the channel
 	var reply AuthReply
-	if err := lib.HexVomDecode(data, &reply); err != nil {
+	if err := lib.HexVomDecode(data, &reply, nil); err != nil {
 		err = verror.Convert(verror.ErrInternal, nil, err)
 		reply = AuthReply{Err: err}
 	}
@@ -720,7 +720,7 @@
 	}
 
 	var reply CaveatValidationResponse
-	if err := lib.HexVomDecode(data, &reply); err != nil {
+	if err := lib.HexVomDecode(data, &reply, nil); err != nil {
 		vlog.Errorf("failed to decode validation response %q: error %v", data, err)
 		ch <- []error{}
 		return