Have streams re-use type stream.
Change-Id: I0371ff302df4a43d0507d8406e0ff7469c718811
diff --git a/src/lib/hex-vom.js b/src/lib/hex-vom.js
index fa73524..8cf580a 100644
--- a/src/lib/hex-vom.js
+++ b/src/lib/hex-vom.js
@@ -15,8 +15,8 @@
encode: encode
};
-function encode(x) {
- return byteUtil.bytes2Hex(vom.encode(x));
+function encode(x, t, typeEnc) {
+ return byteUtil.bytes2Hex(vom.encode(x, t, typeEnc));
}
function decode(x) {
diff --git a/src/proxy/stream.js b/src/proxy/stream.js
index 7949074..0c39ddc 100644
--- a/src/proxy/stream.js
+++ b/src/proxy/stream.js
@@ -11,7 +11,6 @@
var Duplex = require('stream').Duplex;
var vlog = require('../lib/vlog');
var inherits = require('inherits');
-var fill = require('../vdl/canonicalize').fill;
var reduce = require('../vdl/canonicalize').reduce;
var unwrap = require('../vdl/type-util').unwrap;
var Blessings = require('../security/blessings');
@@ -84,7 +83,7 @@
results: results,
err: err || null,
traceResponse: traceResponse
- }, null, this._typeEncoder))
+ }))
};
Duplex.prototype.write.call(this, object);
};
@@ -169,10 +168,9 @@
if (chunk instanceof Blessings) {
chunk = chunk.convertToJsBlessings();
}
- var canonChunk = fill(chunk, this.writeType);
var object = {
id: this.flowId,
- data: hexVom.encode(canonChunk),
+ data: hexVom.encode(chunk, this.writeType, this._typeEncoder),
type: Outgoing.STREAM_VALUE
};
return Duplex.prototype.write.call(this, object, encoding, cb);
diff --git a/src/rpc/server-router.js b/src/rpc/server-router.js
index 6c4155c..b00bc96 100644
--- a/src/rpc/server-router.js
+++ b/src/rpc/server-router.js
@@ -64,6 +64,8 @@
this._outstandingRequestForId = {};
this._controller = controller;
this._blessingsManager = blessingsManager;
+ this._typeEncoder = proxy.typeEncoder;
+ this._typeDecoder = proxy.typeDecoder;
proxy.addIncomingHandler(Incoming.INVOKE_REQUEST, this);
proxy.addIncomingHandler(Incoming.LOOKUP_REQUEST, this);
@@ -319,7 +321,7 @@
}
// Glob takes no streaming input and has GlobReply as output.
stream = new Stream(messageId, self._proxy.senderPromise, false,
- null, naming.GlobReply.prototype._type);
+ null, naming.GlobReply.prototype._type, self._typeEncoder);
self._streamMap[messageId] = stream;
self._contextMap[messageId] = ctx;
self._outstandingRequestForId[messageId] = 0;
@@ -381,9 +383,9 @@
var readType = (methodSig.inStream ? methodSig.inStream.type : null);
var writeType = (methodSig.outStream ? methodSig.outStream.type : null);
stream = new Stream(messageId, self._proxy.senderPromise, false,
- readType, writeType);
+ readType, writeType, self._typeEncoder);
self._streamMap[messageId] = stream;
- var rpc = new StreamHandler(options.ctx, stream);
+ var rpc = new StreamHandler(options.ctx, stream, self._typeDecoder);
self._proxy.addIncomingStreamHandler(messageId, rpc);
options.stream = stream;
}