TBR: js/core: Re-use the type stream for streaming messages

Change-Id: I532b624b00e5834d14527baa884d48fa44a37007
diff --git a/src/lib/hex-vom.js b/src/lib/hex-vom.js
index fa73524..8cf580a 100644
--- a/src/lib/hex-vom.js
+++ b/src/lib/hex-vom.js
@@ -15,8 +15,8 @@
   encode: encode
 };
 
-function encode(x) {
-  return byteUtil.bytes2Hex(vom.encode(x));
+function encode(x, t, typeEnc) {
+  return byteUtil.bytes2Hex(vom.encode(x, t, typeEnc));
 }
 
 function decode(x) {
diff --git a/src/naming/namespace.js b/src/naming/namespace.js
index 89d32c8..7467c83 100644
--- a/src/naming/namespace.js
+++ b/src/naming/namespace.js
@@ -95,7 +95,7 @@
  * @param {string} server Server object address
  * @param {number} ttl Expiry time for the mount in milliseconds. ttl of zero
  * implies never expire.
- * @param {boolean} Optional replaceMount Whether the previous mount should
+ * @param {boolean} [replaceMount] Whether the previous mount should
  * be replaced by the new server object address. False by default.
  * @param {function} cb(error) Optional callback
  * @return {Promise} A promise to be resolved when mount is complete or rejected
diff --git a/src/proxy/stream-handler.js b/src/proxy/stream-handler.js
index fe6137e..a4a04b6 100644
--- a/src/proxy/stream-handler.js
+++ b/src/proxy/stream-handler.js
@@ -22,12 +22,13 @@
  * @param {Stream} Stream instance
  * @constructor
  */
-function Handler(ctx, stream) {
+function Handler(ctx, stream, typeDecoder) {
   this._ctx = ctx;
   this._stream = stream;
   this._controller = ctx.value(SharedContextKeys.RUNTIME)._controller;
   this._pendingBlessings = [];
   this._tasks = new TaskSequence();
+  this._typeDecoder = typeDecoder;
 }
 
 Handler.prototype.handleResponse = function(type, data) {
@@ -60,7 +61,7 @@
     return Promise.resolve();
   }
   var handler = this;
-  return vom.decode(data).then(function(data) {
+  return vom.decode(data, false, this._typeDecoder).then(function(data) {
     if (data instanceof BlessingsId) {
       var runtime = runtimeFromContext(handler._ctx);
       runtime.blessingsManager.blessingsFromId(data)
diff --git a/src/proxy/stream.js b/src/proxy/stream.js
index e218fab..0c39ddc 100644
--- a/src/proxy/stream.js
+++ b/src/proxy/stream.js
@@ -11,7 +11,6 @@
 var Duplex = require('stream').Duplex;
 var vlog = require('../lib/vlog');
 var inherits = require('inherits');
-var fill = require('../vdl/canonicalize').fill;
 var reduce = require('../vdl/canonicalize').reduce;
 var unwrap = require('../vdl/type-util').unwrap;
 var Blessings = require('../security/blessings');
@@ -43,7 +42,8 @@
  * @inner
  * @memberof module:vanadium.rpc
  */
-var Stream = function(flowId, webSocketPromise, isClient, readType, writeType) {
+var Stream = function(flowId, webSocketPromise, isClient, readType, writeType,
+                      typeEncoder) {
   Duplex.call(this, { objectMode: true });
   this.flowId = flowId;
   this.isClient = isClient;
@@ -51,6 +51,7 @@
   this.writeType = writeType;
   this.webSocketPromise = webSocketPromise;
   this.onmessage = null;
+  this._typeEncoder = typeEncoder;
 
   // The buffer of messages that will be passed to push
   // when the internal buffer has room.
@@ -167,10 +168,9 @@
   if (chunk instanceof Blessings) {
     chunk = chunk.convertToJsBlessings();
   }
-  var canonChunk = fill(chunk, this.writeType);
   var object = {
     id: this.flowId,
-    data: hexVom.encode(canonChunk),
+    data: hexVom.encode(chunk, this.writeType, this._typeEncoder),
     type: Outgoing.STREAM_VALUE
   };
   return Duplex.prototype.write.call(this, object, encoding, cb);
diff --git a/src/rpc/client.js b/src/rpc/client.js
index 34511df..840b3ae 100644
--- a/src/rpc/client.js
+++ b/src/rpc/client.js
@@ -170,7 +170,7 @@
     // Clients read data of type outStreamingType and write data of type
     // inStreamingType.
     def.stream = new Stream(this._id, streamingDeferred.promise, true,
-      this._outStreamingType, this._inStreamingType);
+      this._outStreamingType, this._inStreamingType, this._typeEncoder);
     def.promise.stream = def.stream;
   }
 
@@ -263,7 +263,7 @@
       return;
   }
   var rpc = this;
-  return vom.decode(data).then(function(data) {
+  return vom.decode(data, false, this._typeDecoder).then(function(data) {
     rpc._def.stream._queueRead(data);
   }).catch(function(e) {
     rpc.handleError(
diff --git a/src/rpc/server-router.js b/src/rpc/server-router.js
index 3d0f91d..511e8e1 100644
--- a/src/rpc/server-router.js
+++ b/src/rpc/server-router.js
@@ -65,6 +65,8 @@
   this._outstandingRequestForId = {};
   this._controller = controller;
   this._blessingsManager = blessingsManager;
+  this._typeEncoder = proxy.typeEncoder;
+  this._typeDecoder = proxy.typeDecoder;
 
   proxy.addIncomingHandler(Incoming.INVOKE_REQUEST, this);
   proxy.addIncomingHandler(Incoming.LOOKUP_REQUEST, this);
@@ -276,9 +278,9 @@
     var readType = (methodSig.inStream ? methodSig.inStream.type : null);
     var writeType = (methodSig.outStream ? methodSig.outStream.type : null);
     var stream = new Stream(messageId, this._proxy.senderPromise, false,
-                        readType, writeType);
+                        readType, writeType, this._typeEncoder);
     this._streamMap[messageId] = stream;
-    var rpc = new StreamHandler(ctx, stream);
+    var rpc = new StreamHandler(ctx, stream, this._typeDecoder);
     this._proxy.addIncomingStreamHandler(messageId, rpc);
   } else {
     this._proxy.addIncomingStreamHandler(messageId,
@@ -389,7 +391,6 @@
         stream: self._streamMap[messageId],
       };
 
-
       // Invoke the method;
       self.invokeMethod(invoker, options, function(err, results) {
         if (err) {