TBR: js/core: Re-use the type stream for streaming messages
Change-Id: I532b624b00e5834d14527baa884d48fa44a37007
diff --git a/src/lib/hex-vom.js b/src/lib/hex-vom.js
index fa73524..8cf580a 100644
--- a/src/lib/hex-vom.js
+++ b/src/lib/hex-vom.js
@@ -15,8 +15,8 @@
encode: encode
};
-function encode(x) {
- return byteUtil.bytes2Hex(vom.encode(x));
+function encode(x, t, typeEnc) {
+ return byteUtil.bytes2Hex(vom.encode(x, t, typeEnc));
}
function decode(x) {
diff --git a/src/naming/namespace.js b/src/naming/namespace.js
index 89d32c8..7467c83 100644
--- a/src/naming/namespace.js
+++ b/src/naming/namespace.js
@@ -95,7 +95,7 @@
* @param {string} server Server object address
* @param {number} ttl Expiry time for the mount in milliseconds. ttl of zero
* implies never expire.
- * @param {boolean} Optional replaceMount Whether the previous mount should
+ * @param {boolean} [replaceMount] Whether the previous mount should
* be replaced by the new server object address. False by default.
* @param {function} cb(error) Optional callback
* @return {Promise} A promise to be resolved when mount is complete or rejected
diff --git a/src/proxy/stream-handler.js b/src/proxy/stream-handler.js
index fe6137e..a4a04b6 100644
--- a/src/proxy/stream-handler.js
+++ b/src/proxy/stream-handler.js
@@ -22,12 +22,13 @@
* @param {Stream} Stream instance
* @constructor
*/
-function Handler(ctx, stream) {
+function Handler(ctx, stream, typeDecoder) {
this._ctx = ctx;
this._stream = stream;
this._controller = ctx.value(SharedContextKeys.RUNTIME)._controller;
this._pendingBlessings = [];
this._tasks = new TaskSequence();
+ this._typeDecoder = typeDecoder;
}
Handler.prototype.handleResponse = function(type, data) {
@@ -60,7 +61,7 @@
return Promise.resolve();
}
var handler = this;
- return vom.decode(data).then(function(data) {
+ return vom.decode(data, false, this._typeDecoder).then(function(data) {
if (data instanceof BlessingsId) {
var runtime = runtimeFromContext(handler._ctx);
runtime.blessingsManager.blessingsFromId(data)
diff --git a/src/proxy/stream.js b/src/proxy/stream.js
index e218fab..0c39ddc 100644
--- a/src/proxy/stream.js
+++ b/src/proxy/stream.js
@@ -11,7 +11,6 @@
var Duplex = require('stream').Duplex;
var vlog = require('../lib/vlog');
var inherits = require('inherits');
-var fill = require('../vdl/canonicalize').fill;
var reduce = require('../vdl/canonicalize').reduce;
var unwrap = require('../vdl/type-util').unwrap;
var Blessings = require('../security/blessings');
@@ -43,7 +42,8 @@
* @inner
* @memberof module:vanadium.rpc
*/
-var Stream = function(flowId, webSocketPromise, isClient, readType, writeType) {
+var Stream = function(flowId, webSocketPromise, isClient, readType, writeType,
+ typeEncoder) {
Duplex.call(this, { objectMode: true });
this.flowId = flowId;
this.isClient = isClient;
@@ -51,6 +51,7 @@
this.writeType = writeType;
this.webSocketPromise = webSocketPromise;
this.onmessage = null;
+ this._typeEncoder = typeEncoder;
// The buffer of messages that will be passed to push
// when the internal buffer has room.
@@ -167,10 +168,9 @@
if (chunk instanceof Blessings) {
chunk = chunk.convertToJsBlessings();
}
- var canonChunk = fill(chunk, this.writeType);
var object = {
id: this.flowId,
- data: hexVom.encode(canonChunk),
+ data: hexVom.encode(chunk, this.writeType, this._typeEncoder),
type: Outgoing.STREAM_VALUE
};
return Duplex.prototype.write.call(this, object, encoding, cb);
diff --git a/src/rpc/client.js b/src/rpc/client.js
index 34511df..840b3ae 100644
--- a/src/rpc/client.js
+++ b/src/rpc/client.js
@@ -170,7 +170,7 @@
// Clients read data of type outStreamingType and write data of type
// inStreamingType.
def.stream = new Stream(this._id, streamingDeferred.promise, true,
- this._outStreamingType, this._inStreamingType);
+ this._outStreamingType, this._inStreamingType, this._typeEncoder);
def.promise.stream = def.stream;
}
@@ -263,7 +263,7 @@
return;
}
var rpc = this;
- return vom.decode(data).then(function(data) {
+ return vom.decode(data, false, this._typeDecoder).then(function(data) {
rpc._def.stream._queueRead(data);
}).catch(function(e) {
rpc.handleError(
diff --git a/src/rpc/server-router.js b/src/rpc/server-router.js
index 3d0f91d..511e8e1 100644
--- a/src/rpc/server-router.js
+++ b/src/rpc/server-router.js
@@ -65,6 +65,8 @@
this._outstandingRequestForId = {};
this._controller = controller;
this._blessingsManager = blessingsManager;
+ this._typeEncoder = proxy.typeEncoder;
+ this._typeDecoder = proxy.typeDecoder;
proxy.addIncomingHandler(Incoming.INVOKE_REQUEST, this);
proxy.addIncomingHandler(Incoming.LOOKUP_REQUEST, this);
@@ -276,9 +278,9 @@
var readType = (methodSig.inStream ? methodSig.inStream.type : null);
var writeType = (methodSig.outStream ? methodSig.outStream.type : null);
var stream = new Stream(messageId, this._proxy.senderPromise, false,
- readType, writeType);
+ readType, writeType, this._typeEncoder);
this._streamMap[messageId] = stream;
- var rpc = new StreamHandler(ctx, stream);
+ var rpc = new StreamHandler(ctx, stream, this._typeDecoder);
this._proxy.addIncomingStreamHandler(messageId, rpc);
} else {
this._proxy.addIncomingStreamHandler(messageId,
@@ -389,7 +391,6 @@
stream: self._streamMap[messageId],
};
-
// Invoke the method;
self.invokeMethod(invoker, options, function(err, results) {
if (err) {