Remove one level of encoding.
Change-Id: I73b04f9af9c42875362cf46aec1423879c9290d7
diff --git a/src/proxy/index.js b/src/proxy/index.js
index eca1454..e781c77 100644
--- a/src/proxy/index.js
+++ b/src/proxy/index.js
@@ -15,12 +15,13 @@
var Incoming = MessageType.Incoming;
var Outgoing = MessageType.Outgoing;
var vlog = require('./../lib/vlog');
-var vom = require('../vom');
var byteUtil = require('../vdl/byte-util');
var unwrap = require('../vdl/type-util').unwrap;
var Encoder = require('../vom/encoder');
var Decoder = require('../vom/decoder');
+var RawVomReader = require('../vom/raw-vom-reader');
var ByteStreamMessageReader = require('../vom/byte-stream-message-reader');
+var ByteArrayMessageReader = require('../vom/byte-array-message-reader');
var TaskSequence = require('../lib/task-sequence');
// Cache the service signatures for one hour.
@@ -55,75 +56,92 @@
this.incomingRequestHandlers = {};
this.clientEncoder = new Encoder();
this.clientDecoder = new Decoder(new ByteStreamMessageReader());
+ this.sequence = new TaskSequence();
EE.call(this);
}
inherits(Proxy, EE);
+Proxy.prototype._parseAndHandleMessage = function(message) {
+ var messageId;
+ var reader = new RawVomReader(message);
+ var proxy = this;
+ var isServerOriginatedMessage;
+ var handlerState;
+ return reader.readUint().then(function(id) {
+ messageId = id;
+ // Messages originating from server are even numbers
+ isServerOriginatedMessage = (messageId % 2) === 0;
+ handlerState = proxy.outstandingRequests[messageId];
+
+ // If we don't know about this flow, just drop the message. Unless it
+ // originated from the sever.
+ if (!isServerOriginatedMessage && !handlerState) {
+ return;
+ }
+
+ if (!handlerState) {
+ // This is an server originated message that we are seeing for the
+ // first time. We need to create a handler state so we have the task
+ // sequence for the input data. If a handler gets added later, then
+ // it will attached to this state.
+ handlerState = new HandlerState();
+ proxy.outstandingRequests[message.id] = handlerState;
+ }
+
+ return reader.readUint().then(function(type) {
+ var decoder = new Decoder(new ByteArrayMessageReader(reader));
+ handlerState._tasks.addTask(proxy.processRead.bind(proxy, messageId,
+ type,
+ handlerState.handler,
+ decoder));
+ });
+ }).catch(function(e) {
+ vlog.logger.error(e + ': ' + e.stack);
+ if (!isServerOriginatedMessage && handlerState) {
+ handlerState.handler.handleResponse(Incoming.ERROR_RESPONSE,
+ e);
+ }
+ });
+};
/**
* Handles a message from native vanadium implementation.
* @private
- * @param {Object} messsage The message from the native vanadium code.
+ * @param {string} messsage The hex encoded message from the native
+ * vanadium code.
*/
Proxy.prototype.process = function(message) {
- // Messages originating from server are even numbers
- var isServerOriginatedMessage = (message.id % 2) === 0;
- var handlerState = this.outstandingRequests[message.id];
-
- // If we don't know about this flow, just drop the message. Unless it
- // originated from the sever.
- if (!isServerOriginatedMessage && !handlerState) {
- return;
- }
-
- if (!handlerState) {
- // This is an server originated message that we are seeing for the
- // first time. We need to create a handler state so we have the task
- // sequence for the input data. If a handler gets added later, then
- // it will attached to this state.
- handlerState = new HandlerState();
- this.outstandingRequests[message.id] = handlerState;
- }
- var bytes;
try {
- bytes = byteUtil.hex2Bytes(message.data);
- } catch (e) {
- vlog.logger.error(e);
- if (!isServerOriginatedMessage) {
- handlerState.handler.handleResponse(Incoming.ERROR_RESPONSE,
- message.data);
- }
+ message = byteUtil.hex2Bytes(message);
+ } catch(e) {
+ vlog.logger.warn('Failed to parse ' + message + ' err: ' + e + ': ' +
+ e.stack);
return;
}
-
- var proxy = this;
- handlerState._tasks.addTask(function() {
- return proxy.processRead(message.id, handlerState.handler, bytes);
- });
+ this.sequence.addTask(this._parseAndHandleMessage.bind(this, message));
};
-Proxy.prototype.processRead = function(id, handler, bytes) {
- var proxy = this;
+Proxy.prototype.processRead = function(id, messageType, handler, decoder) {
var isServerOriginatedMessage = (id % 2) === 0;
- return vom.decode(bytes).then(function(payload) {
- payload.message = unwrap(payload.message);
+ var proxy = this;
+ return decoder.decode().then(function(message) {
+ message = unwrap(message);
if (!handler) {
- handler = proxy.incomingRequestHandlers[payload.type];
+ handler = proxy.incomingRequestHandlers[messageType];
if (!handler) {
// TODO(bprosnitz) There is a race condition where we receive
// STREAM_CLOSE before a method is invoked in js and see this warning.
vlog.logger.warn('Dropping message for unknown invoke payload ' +
- payload.type + ' (message id: ' + id + ')');
+ messageType + ' (message id: ' + id + ')');
return;
}
- handler.handleRequest(id, payload.type, payload.message);
+ handler.handleRequest(id, messageType, message);
} else {
- handler.handleResponse(payload.type, payload.message);
+ handler.handleResponse(messageType, message);
}
}).catch(function(e) {
vlog.logger.error(e.stack);
if (!isServerOriginatedMessage) {
- handler.handleResponse(Incoming.ERROR_RESPONSE,
- byteUtil.bytes2Hex(bytes));
+ handler.handleResponse(Incoming.ERROR_RESPONSE, e);
}
});
};
diff --git a/src/proxy/nacl.js b/src/proxy/nacl.js
index 39a344f..5143a93 100644
--- a/src/proxy/nacl.js
+++ b/src/proxy/nacl.js
@@ -13,9 +13,6 @@
var Proxy = require('./index');
var TaskSequence = require('../lib/task-sequence');
var random = require('../lib/random');
-var vlog = require('./../lib/vlog');
-var vom = require('../vom');
-var byteUtil = require('../vdl/byte-util');
module.exports = ProxyConnection;
@@ -32,26 +29,7 @@
this._tasks = new TaskSequence();
this.onBrowsprMsg = function(msg) {
- var body;
- try {
- body = byteUtil.hex2Bytes(msg.body);
- } catch (e) {
- vlog.logger.warn('Failed to parse ' + msg.body + 'err: ' + e);
- return;
- }
- // We add this to the task queue to make sure that the decode callback for
- // all the messages are peformed in order.
- self._tasks.addTask(decodeAndProcess);
- function decodeAndProcess() {
- return vom.decode(body).then(function(body) {
- if (msg.instanceId === self.instanceId) {
- self.process(body);
- }
- }, function(e) {
- vlog.logger.error('Failed to parse ' + msg.body + 'err: ' + e);
- return;
- });
- }
+ self.process(msg.body);
};
extensionEventProxy.on('browsprMsg', this.onBrowsprMsg);
diff --git a/src/proxy/websocket.js b/src/proxy/websocket.js
index 9c64ce4..e5df1be 100644
--- a/src/proxy/websocket.js
+++ b/src/proxy/websocket.js
@@ -14,8 +14,6 @@
var TaskSequence = require('./../lib/task-sequence');
var Proxy = require('./index');
var vlog = require('./../lib/vlog');
-var vom = require('../vom');
-var byteUtil = require('../vdl/byte-util');
/**
* A client for the vanadium service using websockets. Connects to the vanadium
@@ -89,21 +87,7 @@
};
websocket.onmessage = function(frame) {
- var message;
- try {
- message = byteUtil.hex2Bytes(frame.data);
- } catch (e) {
- vlog.logger.warn('Failed to parse ' + frame.data + ' err: ' + e);
- return;
- }
-
- self._tasks.addTask(function() {
- return vom.decode(message).then(function(message) {
- self.process(message);
- }, function(e) {
- vlog.logger.warn('Failed to parse ' + frame.data + ' err: ' + e);
- });
- });
+ self.process(frame.data);
};
return deferred.promise;
diff --git a/src/vom/byte-array-message-reader.js b/src/vom/byte-array-message-reader.js
index 3b327dd..5a038f2 100644
--- a/src/vom/byte-array-message-reader.js
+++ b/src/vom/byte-array-message-reader.js
@@ -16,17 +16,22 @@
/**
* Create a VOM message reader backed by a byte array.
- * @param {Uint8Array} bytes The byte array.
+ * @param {Uint8Array|RawVomReader} bytes The byte array.
* @constructor
* @memberof module:vanadium.vom
*/
function ByteArrayMessageReader(bytes) {
- if (bytes[0] !== 0x80) {
- throw new Error('Improperly formatted bytes. Must start with 0x80');
+ if (!(bytes instanceof RawVomReader)) {
+ this.rawReader = new RawVomReader(bytes);
+ } else {
+ this.rawReader = bytes;
}
- this.rawReader = new RawVomReader(bytes);
- // Consume the header byte.
- this.rawReader._readRawBytes(1);
+
+ this._headerPromise = this.rawReader.readByte().then(function(b) {
+ if (b !== 0x80) {
+ throw new Error('Improperly formatted bytes. Must start with 0x80 ' + b);
+ }
+ });
}
/**
@@ -38,27 +43,29 @@
*/
ByteArrayMessageReader.prototype.nextMessageType = function(typeDecoder) {
var bamr = this;
- return this.rawReader.readInt().then(function(typeId) {
- if (typeId < 0) {
- // Type message. We add the type to the typeDecoder and continue reading
- // trying to find a value message.
- return bamr.rawReader.readUint().then(function(len) {
- return bamr.rawReader._readRawBytes(len);
- }).then(function(body) {
- return typeDecoder.defineType(-typeId, body);
- }).then(function() {
- return bamr.nextMessageType(typeDecoder);
- });
- }
- var type = typeDecoder.lookupType(typeId);
- if (TypeUtil.shouldSendLength(type)) {
- return bamr.rawReader.readUint().then(function() {
- return type;
- });
- }
- return type;
- }, function(err) {
- // Hopefull this is an eof.
- return null;
+ return this._headerPromise.then(function() {
+ return bamr.rawReader.readInt().then(function(typeId) {
+ if (typeId < 0) {
+ // Type message. We add the type to the typeDecoder and continue
+ // reading trying to find a value message.
+ return bamr.rawReader.readUint().then(function(len) {
+ return bamr.rawReader._readRawBytes(len);
+ }).then(function(body) {
+ return typeDecoder.defineType(-typeId, body);
+ }).then(function() {
+ return bamr.nextMessageType(typeDecoder);
+ });
+ }
+ var type = typeDecoder.lookupType(typeId);
+ if (TypeUtil.shouldSendLength(type)) {
+ return bamr.rawReader.readUint().then(function() {
+ return type;
+ });
+ }
+ return type;
+ }, function(err) {
+ // Hopefull this is an eof.
+ return null;
+ });
});
};