js/core: Make decode async.
This is step 1 in a multi step process to use a separate type flow
between go and js. The separate type flow will allow us to use a new
encoder/decoder for each rpc, but share the type messages between them.
This should result in a huge perfomance improvement.
I'm sending this change out separately because it's a monster and I
don't want to hide the real type flow change in this monster. In the
process, I also found a bug in the caveat validator that failed to
validate async caveat chains when there were more than 1 caveat in the
chain.
Change-Id: Ida1814a1715e1bad492bcba4c4d522a576906ec7
diff --git a/extension/src/background/channel.js b/extension/src/background/channel.js
index 3d64e6a..755770d 100644
--- a/extension/src/background/channel.js
+++ b/extension/src/background/channel.js
@@ -7,10 +7,12 @@
*/
var vom = require('../../../src/vom');
+var TaskSequence = require('../../../src/lib/task-sequence');
var channelVdl =
require('../../vdl/v.io/x/ref/services/wspr/internal/channel');
var browsprVdl =
require('../../vdl/v.io/x/ref/services/wspr/internal/browspr');
+var vlog = require('../../../src/lib/vlog');
module.exports = RpcChannel;
@@ -26,6 +28,7 @@
this.lastSeq = 0;
this.handlers = {};
this.pendingCallbacks = {};
+ this.decodeQueue = new TaskSequence();
}
RpcChannel.prototype.registerRpcHandler = function(type, func) {
@@ -127,22 +130,32 @@
cb(null, body);
};
-RpcChannel.prototype.handleMessage = function(msg) {
+RpcChannel.prototype._handleMessageTask = function(msg) {
var msgBytes = new Uint8Array(msg);
var reader = new vom.ByteArrayMessageReader(msgBytes);
var dec = new vom.Decoder(reader);
- var jsMsg = dec.decode();
- if (jsMsg._type.name === channelVdl.Message.name) {
- throw new Error('Message does not have correct Message type: ' +
- JSON.stringify(jsMsg));
- } else if (jsMsg.request) {
- return this._handleRequest(jsMsg.request);
- } else if (jsMsg.response) {
- return this._handleResponse(jsMsg.response);
- } else {
- throw new Error('Message has Message type, but no "request" or ' +
- '"response" fields: ' + JSON.stringify(jsMsg));
- }
+ var channel = this;
+ return dec.decode().then(function(jsMsg) {
+ if (jsMsg._type.name === channelVdl.Message.name) {
+ throw new Error('Message does not have correct Message type: ' +
+ JSON.stringify(jsMsg));
+ } else if (jsMsg.request) {
+ return channel._handleRequest(jsMsg.request);
+ } else if (jsMsg.response) {
+ return channel._handleResponse(jsMsg.response);
+ } else {
+ var err = new Error('Message has Message type, but no "request" or ' +
+ '"response" fields: ' + JSON.stringify(jsMsg));
+ vlog.logger.error(err + ': ' + err.stack);
+ }
+ }, function(err) {
+ vlog.logger.error(err + ': ' + err.stack);
+ });
+};
+RpcChannel.prototype.handleMessage = function(msg) {
+ // We add this to the task queue to make sure that the decode callback for
+ // all the messages are peformed in order.
+ this.decodeQueue.addTask(this._handleMessageTask.bind(this, msg));
};
RpcChannel.prototype._postMessage = function(msg) {
diff --git a/src/lib/async-helper.js b/src/lib/async-helper.js
new file mode 100644
index 0000000..2825d9e
--- /dev/null
+++ b/src/lib/async-helper.js
@@ -0,0 +1,54 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+
+module.exports.promiseFor = promiseFor;
+module.exports.promiseWhile = promiseWhile;
+
+var Promise = require('./promise');
+
+/**
+ * promiseFor performs an asynchronous body n times.
+ * @param {number} n The number of times to call body
+ * @param {function} body The body to run. It should return
+ * a promise that will be resolved when it is done
+ * @return {Promise} A promise that will resolve when the body has
+ * been run n times.
+ * @private
+ */
+function promiseFor(n, body) {
+ if (n === 0) {
+ return Promise.resolve();
+ }
+ function doStep() {
+ n--;
+ if (n === 0) {
+ return Promise.resolve();
+ }
+ return body().then(doStep);
+ }
+
+ return body().then(doStep);
+}
+/**
+ * promiseWhile performs an asynchronous body as long as an async predict
+ * is true.
+ * @param {function} predicate A function that returns a Promise<bool> that
+ * says whether the body should be run or not.
+ * @param {function} body A function that returns a Promise that will be
+ * resolved once the body is done executing.
+ * @return {Promise} A promise that will be resolved once the while is done.
+ * @private
+ */
+
+function promiseWhile(predicate, body) {
+ return predicate().then(function(success) {
+ if (!success) {
+ return Promise.resolve();
+ }
+ return body().then(function() {
+ return promiseWhile(predicate, body);
+ });
+ });
+}
diff --git a/src/lib/task-sequence.js b/src/lib/task-sequence.js
new file mode 100644
index 0000000..e56362b
--- /dev/null
+++ b/src/lib/task-sequence.js
@@ -0,0 +1,33 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+module.exports = TaskSequence;
+
+var vlog = require('./vlog');
+var Promise = require('../lib/promise');
+/**
+ * A sequencer of async operations that need to happen synchronously. The
+ * queue will be processes in a FIFO order and only one operation will be
+ * outstanding at a time. This library uses Promises in the API instead of
+ * callbacks since setImmediate isn't implemented in Chrome, causing nextTick
+ * calls to take at least a millisecond.
+ * @constructor
+ * @private
+ */
+function TaskSequence() {
+ this._lastPromise = Promise.resolve();
+}
+
+/**
+ * Adds a task to a queue.
+ * @param {function} task The task to run. It should return a promise that
+ * will be resolved/rejected on completion of the task.
+ */
+TaskSequence.prototype.addTask = function(task) {
+ this._lastPromise = this._lastPromise.then(function() {
+ return task();
+ }).catch(function(err) {
+ vlog.logger.error('Task failed with ' + err.stack);
+ });
+};
diff --git a/src/proxy/index.js b/src/proxy/index.js
index ec2d7ff..eca1454 100644
--- a/src/proxy/index.js
+++ b/src/proxy/index.js
@@ -15,14 +15,25 @@
var Incoming = MessageType.Incoming;
var Outgoing = MessageType.Outgoing;
var vlog = require('./../lib/vlog');
+var vom = require('../vom');
+var byteUtil = require('../vdl/byte-util');
var unwrap = require('../vdl/type-util').unwrap;
var Encoder = require('../vom/encoder');
var Decoder = require('../vom/decoder');
-var hexVom = require('../lib/hex-vom');
+var ByteStreamMessageReader = require('../vom/byte-stream-message-reader');
+var TaskSequence = require('../lib/task-sequence');
// Cache the service signatures for one hour.
var SIGNATURE_CACHE_TTL = 3600 * 1000;
+// HandlerState is an object that contains the state for a given flow. This
+// includes an optional handler for incoming messages and a task sequencer for
+// decoding incoming messages.
+function HandlerState(handler) {
+ this.handler = handler;
+ this._tasks = new TaskSequence();
+}
+
/**
* A client for the native vanadium implementation.
* @constructor
@@ -43,7 +54,7 @@
this.senderPromise = senderPromise;
this.incomingRequestHandlers = {};
this.clientEncoder = new Encoder();
- this.clientDecoder = new Decoder();
+ this.clientDecoder = new Decoder(new ByteStreamMessageReader());
EE.call(this);
}
inherits(Proxy, EE);
@@ -56,39 +67,65 @@
Proxy.prototype.process = function(message) {
// Messages originating from server are even numbers
var isServerOriginatedMessage = (message.id % 2) === 0;
- var handler = this.outstandingRequests[message.id];
+ var handlerState = this.outstandingRequests[message.id];
// If we don't know about this flow, just drop the message. Unless it
// originated from the sever.
- if (!isServerOriginatedMessage && !handler) {
+ if (!isServerOriginatedMessage && !handlerState) {
return;
}
- var payload;
+ if (!handlerState) {
+ // This is an server originated message that we are seeing for the
+ // first time. We need to create a handler state so we have the task
+ // sequence for the input data. If a handler gets added later, then
+ // it will attached to this state.
+ handlerState = new HandlerState();
+ this.outstandingRequests[message.id] = handlerState;
+ }
+ var bytes;
try {
- payload = hexVom.decode(message.data);
- payload.message = unwrap(payload.message);
+ bytes = byteUtil.hex2Bytes(message.data);
} catch (e) {
vlog.logger.error(e);
if (!isServerOriginatedMessage) {
- handler.handleResponse(Incoming.ERROR_RESPONSE, message.data);
+ handlerState.handler.handleResponse(Incoming.ERROR_RESPONSE,
+ message.data);
}
return;
}
- if (!handler) {
- handler = this.incomingRequestHandlers[payload.type];
+ var proxy = this;
+ handlerState._tasks.addTask(function() {
+ return proxy.processRead(message.id, handlerState.handler, bytes);
+ });
+};
+
+Proxy.prototype.processRead = function(id, handler, bytes) {
+ var proxy = this;
+ var isServerOriginatedMessage = (id % 2) === 0;
+ return vom.decode(bytes).then(function(payload) {
+ payload.message = unwrap(payload.message);
if (!handler) {
- // TODO(bprosnitz) There is a race condition where we receive STREAM_CLOSE
- // before a method is invoked in js and see this warning.
- vlog.logger.warn('Dropping message for unknown invoke payload ' +
- payload.type + ' (message id: ' + message.id + ')');
- return;
+ handler = proxy.incomingRequestHandlers[payload.type];
+ if (!handler) {
+ // TODO(bprosnitz) There is a race condition where we receive
+ // STREAM_CLOSE before a method is invoked in js and see this warning.
+ vlog.logger.warn('Dropping message for unknown invoke payload ' +
+ payload.type + ' (message id: ' + id + ')');
+ return;
+ }
+ handler.handleRequest(id, payload.type, payload.message);
+ } else {
+ handler.handleResponse(payload.type, payload.message);
}
- handler.handleRequest(message.id, payload.type, payload.message);
- } else {
- handler.handleResponse(payload.type, payload.message);
- }
+ }).catch(function(e) {
+ vlog.logger.error(e.stack);
+ if (!isServerOriginatedMessage) {
+ handler.handleResponse(Incoming.ERROR_RESPONSE,
+ byteUtil.bytes2Hex(bytes));
+ }
+ });
};
Proxy.prototype.dequeue = function(id) {
@@ -107,7 +144,11 @@
};
Proxy.prototype.addIncomingStreamHandler = function(id, handler) {
- this.outstandingRequests[id] = handler;
+ if (!this.outstandingRequests[id]) {
+ this.outstandingRequests[id] = new HandlerState(handler);
+ } else {
+ this.outstandingRequests[id].handler = handler;
+ }
};
/**
@@ -121,8 +162,8 @@
ctx.waitUntilDone().catch(function(error) {
var h = proxy.outstandingRequests[id];
proxy.sendRequest(null, Outgoing.CANCEL, null, id);
- if (h) {
- h.handleResponse(Incoming.ERROR_RESPONSE, error);
+ if (h && h.handler) {
+ h.handler.handleResponse(Incoming.ERROR_RESPONSE, error);
delete proxy.outstandingRequests[id];
}
});
@@ -143,7 +184,7 @@
*/
Proxy.prototype.sendRequest = function(message, type, handler, id) {
if (handler) {
- this.outstandingRequests[id] = handler;
+ this.addIncomingStreamHandler(id, handler);
}
var body = {
id: id,
@@ -160,8 +201,8 @@
// in node.
var h = self.outstandingRequests[id];
- if (h) {
- h.handleResponse(Incoming.ERROR_RESPONSE, err);
+ if (h && h.handler) {
+ h.handler.handleResponse(Incoming.ERROR_RESPONSE, err);
delete self.outstandingRequests[id];
}
});
diff --git a/src/proxy/nacl.js b/src/proxy/nacl.js
index a21614f..39a344f 100644
--- a/src/proxy/nacl.js
+++ b/src/proxy/nacl.js
@@ -10,10 +10,12 @@
var Deferred = require('../lib/deferred');
var errors = require('../verror/index');
var extensionEventProxy = require('../browser/event-proxy');
-var hexVom = require('../lib/hex-vom');
var Proxy = require('./index');
+var TaskSequence = require('../lib/task-sequence');
var random = require('../lib/random');
var vlog = require('./../lib/vlog');
+var vom = require('../vom');
+var byteUtil = require('../vdl/byte-util');
module.exports = ProxyConnection;
@@ -27,18 +29,28 @@
var self = this;
this.instanceId = random.hex();
+ this._tasks = new TaskSequence();
this.onBrowsprMsg = function(msg) {
var body;
try {
- body = hexVom.decode(msg.body);
+ body = byteUtil.hex2Bytes(msg.body);
} catch (e) {
vlog.logger.warn('Failed to parse ' + msg.body + 'err: ' + e);
return;
}
-
- if (msg.instanceId === self.instanceId) {
- self.process(body);
+ // We add this to the task queue to make sure that the decode callback for
+ // all the messages are peformed in order.
+ self._tasks.addTask(decodeAndProcess);
+ function decodeAndProcess() {
+ return vom.decode(body).then(function(body) {
+ if (msg.instanceId === self.instanceId) {
+ self.process(body);
+ }
+ }, function(e) {
+ vlog.logger.error('Failed to parse ' + msg.body + 'err: ' + e);
+ return;
+ });
}
};
diff --git a/src/proxy/stream-handler.js b/src/proxy/stream-handler.js
index 49c90be..79b2332 100644
--- a/src/proxy/stream-handler.js
+++ b/src/proxy/stream-handler.js
@@ -9,7 +9,10 @@
var Blessings = require('../security/blessings');
var JsBlessings =
require('../gen-vdl/v.io/x/ref/services/wspr/internal/principal').JsBlessings;
-var hexVom = require('../lib/hex-vom');
+var TaskSequence = require('../lib/task-sequence');
+var Promise = require('../lib/promise');
+var vom = require('../vom');
+var byteUtil = require('../vdl/byte-util');
module.exports = Handler;
@@ -24,33 +27,19 @@
this._stream = stream;
this._controller = ctx.value(SharedContextKeys.RUNTIME)._controller;
this._pendingBlessings = [];
+ this._tasks = new TaskSequence();
}
Handler.prototype.handleResponse = function(type, data) {
switch (type) {
case Incoming.STREAM_RESPONSE:
- try {
- data = hexVom.decode(data);
- } catch (e) {
- emitStreamError(this._stream,
- new vError.InternalError(this._ctx,
- 'Failed to decode result: ', e));
- return true;
- }
- if (data instanceof JsBlessings) {
- data = new Blessings(data.handle, data.publicKey, this._controller);
- data.retain();
- }
- this._stream._queueRead(data);
- return true;
+ this._tasks.addTask(this.handleStreamData.bind(this, data));
+ return true;
case Incoming.STREAM_CLOSE:
- this.cleanupBlessings();
- this._stream._queueClose();
+ this._tasks.addTask(this.handleStreamClose.bind(this, data));
return true;
case Incoming.ERROR_RESPONSE:
- this.cleanupBlessings();
- emitStreamError(this._stream, data);
- this._stream._queueClose();
+ this._tasks.addTask(this.handleStreamError.bind(this, data));
return true;
}
@@ -58,6 +47,41 @@
return false;
};
+Handler.prototype.handleStreamData = function(data) {
+ try {
+ data = byteUtil.hex2Bytes(data);
+ } catch (e) {
+ emitStreamError(this._stream,
+ new vError.InternalError(this._ctx,
+ 'Failed to decode result: ', e));
+ return Promise.resolve();
+ }
+ var handler = this;
+ return vom.decode(data).then(function(data) {
+ if (data instanceof JsBlessings) {
+ data = new Blessings(data.handle, data.publicKey,
+ handler._controller);
+ data.retain();
+ }
+ handler._stream._queueRead(data);
+ }, function(e) {
+ emitStreamError(handler._stream,
+ new vError.InternalError(
+ handler._ctx, 'Failed to decode result: ', e));
+ });
+};
+
+Handler.prototype.handleStreamClose = function() {
+ this.cleanupBlessings();
+ this._stream._queueClose();
+ return Promise.resolve();
+};
+
+Handler.prototype.handleStreamError = function(data) {
+ emitStreamError(this._stream, data);
+ return this.handleStreamClose();
+};
+
Handler.prototype.cleanupBlessings = function() {
for (var i = 0; i < this._pendingBlessings; i++) {
this._pendingBlessings[i].release();
diff --git a/src/proxy/websocket.js b/src/proxy/websocket.js
index baa236e..9c64ce4 100644
--- a/src/proxy/websocket.js
+++ b/src/proxy/websocket.js
@@ -11,9 +11,11 @@
var WebSocket = require('ws');
var Deferred = require('./../lib/deferred');
-var hexVom = require('../lib/hex-vom');
+var TaskSequence = require('./../lib/task-sequence');
var Proxy = require('./index');
var vlog = require('./../lib/vlog');
+var vom = require('../vom');
+var byteUtil = require('../vdl/byte-util');
/**
* A client for the vanadium service using websockets. Connects to the vanadium
@@ -37,6 +39,7 @@
this.getWebSocket().then(function success() {
def.resolve(proxy);
}, def.reject);
+ this._tasks = new TaskSequence();
}
ProxyConnection.prototype = Object.create(Proxy.prototype);
@@ -88,13 +91,19 @@
websocket.onmessage = function(frame) {
var message;
try {
- message = hexVom.decode(frame.data);
+ message = byteUtil.hex2Bytes(frame.data);
} catch (e) {
vlog.logger.warn('Failed to parse ' + frame.data + ' err: ' + e);
return;
}
- self.process(message);
+ self._tasks.addTask(function() {
+ return vom.decode(message).then(function(message) {
+ self.process(message);
+ }, function(e) {
+ vlog.logger.warn('Failed to parse ' + frame.data + ' err: ' + e);
+ });
+ });
};
return deferred.promise;
diff --git a/src/rpc/client.js b/src/rpc/client.js
index 6f05115..9048580 100644
--- a/src/rpc/client.js
+++ b/src/rpc/client.js
@@ -19,7 +19,6 @@
var context = require('../context');
var Deferred = require('../lib/deferred');
var emitStreamError = require('../lib/emit-stream-error');
-var hexVom = require('../lib/hex-vom');
var Incoming = require('../proxy/message-type').Incoming;
var makeError = require('../verror/make-errors');
var Outgoing = require('../proxy/message-type').Outgoing;
@@ -42,10 +41,12 @@
var Blessings = require('../security/blessings');
var JsBlessings =
require('../gen-vdl/v.io/x/ref/services/wspr/internal/principal').JsBlessings;
-var ByteStreamMessageReader = require('../vom/byte-stream-message-reader');
var ByteStreamMessageWriter = require('../vom/byte-stream-message-writer');
+var ByteStreamMessageReader = require('../vom/byte-stream-message-reader');
var Encoder = require('../vom/encoder');
var Decoder = require('../vom/decoder');
+var TaskSequence = require('../lib/task-sequence');
+var vom = require('../vom');
var OutstandingRPC = function(ctx, options, cb) {
this._ctx = ctx;
@@ -65,6 +66,7 @@
this._encoder = options.encoder;
this._decoder = options.decoder;
this._def = null;
+ this._tasks = new TaskSequence();
};
// Helper function to convert an out argument to the given type.
@@ -190,74 +192,91 @@
};
OutstandingRPC.prototype.handleResponse = function(type, data) {
+ var rpc = this;
switch (type) {
case Incoming.FINAL_RESPONSE:
- this.handleCompletion(data);
+ this._tasks.addTask(function() {
+ return rpc.handleCompletion(data);
+ });
break;
case Incoming.STREAM_RESPONSE:
- this.handleStreamData(data);
+ this._tasks.addTask(function() {
+ return rpc.handleStreamData(data);
+ });
break;
case Incoming.ERROR_RESPONSE:
- this.handleError(data);
+ this._tasks.addTask(function() {
+ return rpc.handleError(data);
+ });
break;
case Incoming.STREAM_CLOSE:
- this.handleStreamClose();
+ this._tasks.addTask(function() {
+ return rpc.handleStreamClose();
+ });
break;
default:
- this.handleError(
- new verror.InternalError(
- this._ctx, 'Received unknown response type from wspr'));
+ this._tasks.addTask(function() {
+ return rpc.handleError(
+ new verror.InternalError(
+ rpc._ctx, 'Received unknown response type from wspr'));
+ });
break;
}
};
OutstandingRPC.prototype.handleCompletion = function(data) {
- var response;
try {
var bytes = byteUtil.hex2Bytes(data);
- if (!this._decoder._messageReader) {
- this._decoder._messageReader = new ByteStreamMessageReader(bytes);
- } else {
- this._decoder._messageReader.clear();
- this._decoder._messageReader.addBytes(bytes);
+ this._decoder._messageReader.addBytes(bytes);
+ } catch (e) {
+ this.handleError(
+ new verror.InternalError(this._ctx, 'Failed to decode result: ', e));
+ return Promise.resolve();
+ }
+ var rpc = this;
+ return this._decoder.decode().then(function(response) {
+ vtrace.getStore(rpc._ctx).merge(response.traceResponse);
+ vtrace.getSpan(rpc._ctx).finish();
+
+ rpc._def.resolve(response.outArgs);
+ if (rpc._def.stream) {
+ rpc._def.stream._queueClose();
}
- response = this._decoder.decode();
+ rpc._proxy.dequeue(this._id);
+ }).catch(function(e) {
+ rpc.handleError(
+ new verror.InternalError(rpc._ctx, 'Failed to decode result: ', e));
+ return;
+ });
+};
+
+OutstandingRPC.prototype.handleStreamData = function(data) {
+ if (!this._def.stream) {
+ vlog.logger.warn('Ignoring streaming message for non-streaming flow : ' +
+ this._id);
+ return Promise.resolve();
+ }
+ try {
+ data = byteUtil.hex2Bytes(data);
} catch (e) {
this.handleError(
new verror.InternalError(this._ctx, 'Failed to decode result: ', e));
return;
}
-
- vtrace.getStore(this._ctx).merge(response.traceResponse);
- vtrace.getSpan(this._ctx).finish();
-
- this._def.resolve(response.outArgs);
- if (this._def.stream) {
- this._def.stream._queueClose();
- }
- this._proxy.dequeue(this._id);
-};
-
-OutstandingRPC.prototype.handleStreamData = function(data) {
- if (this._def.stream) {
- try {
- data = hexVom.decode(data);
- } catch (e) {
- this.handleError(
- new verror.InternalError(this._ctx, 'Failed to decode result: ', e));
- return;
- }
- this._def.stream._queueRead(data);
- } else {
- vlog.logger.warn('Ignoring streaming message for non-streaming flow : ' +
- this._id);
- }
+ var rpc = this;
+ return vom.decode(data).then(function(data) {
+ rpc._def.stream._queueRead(data);
+ }).catch(function(e) {
+ rpc.handleError(
+ new verror.InternalError(rpc._ctx, 'Failed to decode result: ', e));
+ });
};
OutstandingRPC.prototype.handleStreamClose = function() {
if (this._def.stream) {
this._def.stream._queueClose();
}
+ return Promise.resolve();
};
OutstandingRPC.prototype.handleError = function(err) {
@@ -267,6 +286,7 @@
}
this._def.reject(err);
this._proxy.dequeue(this._id);
+ return Promise.resolve();
};
@@ -340,16 +360,10 @@
this._proxyConnection = proxyConnection;
if (proxyConnection && proxyConnection.clientEncoder) {
this._encoder = proxyConnection.clientEncoder;
- } else {
- this._encoder = new Encoder();
}
-
if (proxyConnection && proxyConnection.clientDecoder) {
this._decoder = proxyConnection.clientDecoder;
- } else {
- this._decoder = new Decoder();
}
-
this._controller = this.bindWithSignature(
'__controller', [Controller.prototype._serviceDescription]);
}
@@ -563,8 +577,10 @@
inStreamingType: inStreaming ? methodSig.inStream.type : null,
outStreamingType: outStreaming ? methodSig.outStream.type : null,
callOptions: callOptions,
- encoder: client._encoder,
- decoder: client._decoder,
+ // If there isn't an encoder or decoder cached, we just use a new one.
+ // This only really happens in unit tests.
+ encoder: client._encoder || new Encoder(),
+ decoder: client._decoder || new Decoder(new ByteStreamMessageReader()),
}, callback);
return rpc.start();
diff --git a/src/rpc/granter-router.js b/src/rpc/granter-router.js
index a2390a5..dee6e7a 100644
--- a/src/rpc/granter-router.js
+++ b/src/rpc/granter-router.js
@@ -4,7 +4,9 @@
var asyncCall = require('../lib/async-call');
-var Deferred = require('../lib/deferred');
+var Promise = require('../lib/promise');
+var vom = require('../vom');
+var byteUtil = require('../vdl/byte-util');
var hexVom = require('../lib/hex-vom');
var verror = require('../gen-vdl/v.io/v23/verror');
var MessageType = require('../proxy/message-type');
@@ -42,59 +44,62 @@
return;
}
+
try {
- request = hexVom.decode(request);
- request = request.val;
+ request = byteUtil.hex2Bytes(request);
} catch (e) {
- // TODO(bjornick): Pass in context here so we can generate useful error
- // messages.
- var res = new GranterResponse({
- err: new verror.NoExistError(this._rootCtx, 'failed to decode message')
- });
- var data = hexVom.encode(res);
- this._proxy.sendRequest(data, Outgoing.GRANTER_RESPONSE,
- null, messageId);
+ returnFailure(
+ new verror.NoExistError(this._rootCtx, 'failed to decode message'));
+ return Promise.resolve();
}
- var granter = this.activeGranters[request.granterHandle];
- if (!granter) {
- // TODO(bjornick): Pass in context here so we can generate useful error
- // messages.
- var res = new GranterResponse({
- err: new verror.NoExistError(this._rootCtx, 'unknown granter')
- });
- var data = hexVom.encode(res);
- this._proxy.sendRequest(data, Outgoing.GRANTER_RESPONSE,
- null, messageId);
- return;
- }
- delete this.activeGranters[request.granterHandle];
-
- var securityCall = new SecurityCall(request.call, this._controller);
- var ctx = this._rootCtx;
- var def = new Deferred();
- var inspectFn = new InspectableFunction(granter);
- var self = this;
- asyncCall(ctx, null, inspectFn, ['outBlessings'],
- [ctx, securityCall], function(err, outBlessings) {
- if (err) {
- var res = new GranterResponse({
- err: new verror.NoExistError(this._rootCtx, 'error while granting: ' +
- err)
- });
- var errData = hexVom.encode(res);
- self._proxy.sendRequest(errData, Outgoing.GRANTER_RESPONSE,
- null, messageId);
- def.resolve();
- return;
+ var router = this;
+ return vom.decode(request).then(function(request) {
+ request = request.val;
+ var granter = router.activeGranters[request.granterHandle];
+ if (!granter) {
+ // TODO(bjornick): Pass in context here so we can generate useful error
+ // messages
+ return Promise.reject(
+ new verror.NoExistError(router._rootCtx, 'unknown granter'));
}
+ delete router.activeGranters[request.granterHandle];
+ var securityCall = new SecurityCall(request.call, router._controller);
+ var ctx = router._rootCtx;
+ var inspectFn = new InspectableFunction(granter);
+ var resolve;
+ var reject;
+ var promise = new Promise(function(a, b) {
+ resolve = a;
+ reject = b;
+ });
+ asyncCall(ctx, null, inspectFn, ['outBlessings'],
+ [ctx, securityCall], function(err, res) {
+ if(err) {
+ return reject(err);
+ }
+ return resolve(res);
+ });
+ return promise;
+ }, function(e) {
+ return Promise.reject(
+ new verror.NoExistError(router._rootCtx, 'failed to decode message'));
+ }).then(function(outBlessings) {
var result = new GranterResponse({blessings: outBlessings[0]._id});
var data = hexVom.encode(result);
- self._proxy.sendRequest(data, Outgoing.GRANTER_RESPONSE, null,
+ router._proxy.sendRequest(data, Outgoing.GRANTER_RESPONSE, null,
messageId);
- def.resolve();
- });
- return def.promise;
+ }, function(e) {
+ return Promise.reject(
+ new verror.NoExistError(router._rootCtx, 'error while granting: ' + e));
+ }).catch(returnFailure);
+
+ function returnFailure(e) {
+ var res = new GranterResponse({err: e});
+ var data = byteUtil.bytes2Hex(vom.encode(res));
+ router._proxy.sendRequest(data, Outgoing.GRANTER_RESPONSE,
+ null, messageId);
+ }
};
/**
diff --git a/src/rpc/server-router.js b/src/rpc/server-router.js
index 9c4a3a1..82ac8c4 100644
--- a/src/rpc/server-router.js
+++ b/src/rpc/server-router.js
@@ -43,6 +43,8 @@
require('../gen-vdl/v.io/v23/security').WireBlessings;
var SharedContextKeys = require('../runtime/shared-context-keys');
var hexVom = require('../lib/hex-vom');
+var vom = require('../vom');
+var byteUtil = require('../vdl/byte-util');
/**
* A router that handles routing incoming requests to the right
@@ -101,11 +103,10 @@
};
Router.prototype.handleAuthorizationRequest = function(messageId, request) {
- var authReply;
try {
- request = hexVom.decode(request);
+ request = byteUtil.hex2Bytes(request);
} catch (e) {
- authReply = new AuthReply({
+ var authReply = new AuthReply({
// TODO(bjornick): Use the real context
err: new verror.InternalError(this._rootCtx, 'Failed to decode ', e)
});
@@ -115,32 +116,36 @@
return;
}
- var ctx = this._rootCtx.withValue(SharedContextKeys.LANG_KEY,
- request.context.language);
- var server = this._servers[request.serverId];
- if (!server) {
- authReply = new AuthReply({
- // TODO(bjornick): Use the real context
- err: new verror.ExistsError(ctx, 'unknown server')
- });
-
- this._proxy.sendRequest(hexVom.encode(authReply),
- Outgoing.AUTHORIZATION_RESPONSE, null, messageId);
- return;
- }
var router = this;
- var call = new SecurityCall(request.call, this._controller);
+ var call;
+ vom.decode(request).then(function(request) {
+ var ctx = router._rootCtx.withValue(SharedContextKeys.LANG_KEY,
+ request.context.language);
+ var server = router._servers[request.serverId];
+ if (!server) {
+ var authReply = new AuthReply({
+ // TODO(bjornick): Use the real context
+ err: new verror.ExistsError(ctx, 'unknown server')
+ });
+ router._proxy.sendRequest(hexVom.encode(authReply),
+ Outgoing.AUTHORIZATION_RESPONSE,
+ null, messageId);
+ return;
+ }
+ call = new SecurityCall(request.call, router._controller);
- authReply = new AuthReply({});
-
- server.handleAuthorization(request.handle, ctx, call).then(function() {
+ return server.handleAuthorization(request.handle, ctx, call);
+ }, function(e) {
+ return Promise.reject(new verror.InternalError(router._rootCtx,
+ 'Failed to decode ', e));
+ }).then(function() {
+ var authReply = new AuthReply({});
router._proxy.sendRequest(hexVom.encode(authReply),
- Outgoing.AUTHORIZATION_RESPONSE, null,
- messageId);
+ Outgoing.AUTHORIZATION_RESPONSE, null, messageId);
}).catch(function(e) {
var authReply = new AuthReply({
- err: ErrorConversion.fromNativeValue(e, this._appName,
- request.call.method)
+ err: ErrorConversion.fromNativeValue(e, router._appName,
+ call.method)
});
router._proxy.sendRequest(hexVom.encode(authReply),
Outgoing.AUTHORIZATION_RESPONSE, null,
@@ -149,18 +154,17 @@
};
Router.prototype._validateChain = function(ctx, call, cavs) {
- var promises = new Array(cavs.length);
- for (var j = 0; j < cavs.length; j++) {
+ var router = this;
+ var promises = cavs.map(function(cav) {
var def = new Deferred();
- this._caveatRegistry.validate(ctx, call, cavs[j],
- function(err) {
- if (err) {
- return def.reject(err);
- }
- def.resolve();
- }); // jshint ignore:line
- promises[j] = def.promise;
- }
+ router._caveatRegistry.validate(ctx, call, cav, function(err) {
+ if (err) {
+ return def.reject(err);
+ }
+ return def.resolve();
+ });
+ return def.promise;
+ });
return Promise.all(promises).then(function(results) {
return undefined;
}).catch(function(err) {
@@ -169,7 +173,7 @@
'Non-error value returned from caveat validator: ' +
err);
}
- return ErrorConversion.fromNativeValue(err, this._appName,
+ return ErrorConversion.fromNativeValue(err, router._appName,
'caveat validation');
});
};
@@ -191,7 +195,8 @@
Outgoing.CAVEAT_VALIDATION_RESPONSE, null,
messageId);
}).catch(function(err) {
- throw new Error('Unexpected error (all promises should resolve): ' + err);
+ vlog.logger.error(
+ new Error('Unexpected error (all promises should resolve): ' + err));
});
};
@@ -243,51 +248,7 @@
}
};
-/**
- * Handles incoming requests from the server to invoke methods on registered
- * services in JavaScript.
- * @private
- * @param {string} messageId Message Id set by the server.
- * @param {Object} vdlRequest VOM encoded request. Request's structure is
- * {
- * serverId: number // the server id
- * method: string // Name of the method on the service to call
- * args: [] // Array of positional arguments to be passed into the method
- * // Note: This array contains wrapped arguments!
- * }
- */
-Router.prototype.handleRPCRequest = function(messageId, vdlRequest) {
- // TODO(bjornick): Break this method up into smaller methods.
- var err;
- var request;
- try {
- request = hexVom.decode(vdlRequest);
- } catch (e) {
- err = new Error('Failed to decode args: ' + e);
- this.sendResult(messageId, '', null, err);
- return;
- }
- var methodName = capitalize(request.method);
-
- var server = this._servers[request.serverId];
-
- if (!server) {
- // TODO(bprosnitz) What error type should this be.
- err = new Error('Request for unknown server ' + request.serverId);
- this.sendResult(messageId, methodName, null, err);
- return;
- }
-
- var invoker = server.getInvokerForHandle(request.handle);
- if (!invoker) {
- vlog.logger.error('No invoker found: ', request);
- err = new Error('No service found');
- this.sendResult(messageId, methodName, null, err);
- return;
- }
-
- var self = this;
- var stream;
+Router.prototype.createRPCContext = function(request) {
var ctx = this._rootCtx;
// Setup the context passed in the context info passed in from wspr.
if (!request.call.deadline.noDeadline) {
@@ -304,9 +265,47 @@
var suffix = request.call.securityCall.suffix;
var spanName = '<jsserver>"'+suffix+'".'+request.method;
// TODO(mattr): We need to enforce some security on trace responses.
- ctx = vtrace.withContinuedTrace(ctx, spanName,
- request.call.traceRequest);
+ return vtrace.withContinuedTrace(ctx, spanName,
+ request.call.traceRequest);
+};
+/**
+ * Performs the rpc request. Unlike handleRPCRequest, this function works on
+ * the decoded message.
+ * @private
+ * @param {string} messageId Message Id set by the server.
+ * @param {Object} request Request's structure is
+ * {
+ * serverId: number // the server id
+ * method: string // Name of the method on the service to call
+ * args: [] // Array of positional arguments to be passed into the method
+ * // Note: This array contains wrapped arguments!
+ * }
+ */
+Router.prototype._handleRPCRequestInternal = function(messageId, request) {
+ // TODO(bjornick): Break this method up into smaller methods.
+ var methodName = capitalize(request.method);
+ var server = this._servers[request.serverId];
+ var err;
+
+ if (!server) {
+ // TODO(bprosnitz) What error type should this be.
+ err = new Error('Request for unknown server ' + request.serverId);
+ this.sendResult(messageId, methodName, null, err);
+ return;
+ }
+
+ var invoker = server.getInvokerForHandle(request.handle);
+ if (!invoker) {
+ vlog.logger.error('No invoker found: ', request);
+ err = new Error('No service found');
+ this.sendResult(messageId, methodName, null, err);
+ return;
+ }
+
+ var ctx = this.createRPCContext(request);
+ var self = this;
+ var stream;
var call = new ServerCall(request, this._controller);
if (request.method === 'Glob__') {
if (!invoker.hasGlobber()) {
@@ -411,6 +410,38 @@
self.sendResult(messageId, methodName, canonResults, undefined,
methodSig.outArgs.length);
});
+
+};
+/**
+ * Handles incoming requests from the server to invoke methods on registered
+ * services in JavaScript.
+ * @private
+ * @param {string} messageId Message Id set by the server.
+ * @param {string} vdlRequest VOM encoded request. Request's structure is
+ * {
+ * serverId: number // the server id
+ * method: string // Name of the method on the service to call
+ * args: [] // Array of positional arguments to be passed into the method
+ * // Note: This array contains wrapped arguments!
+ * }
+ */
+Router.prototype.handleRPCRequest = function(messageId, vdlRequest) {
+ var err;
+ var request;
+ var router = this;
+ try {
+ request = byteUtil.hex2Bytes(vdlRequest);
+ } catch (e) {
+ err = new Error('Failed to decode args: ' + e);
+ this.sendResult(messageId, '', null, err);
+ return;
+ }
+ vom.decode(request).then(function(request) {
+ router._handleRPCRequestInternal(messageId, request);
+ }, function(e) {
+ err = new Error('Failed to decode args: ' + e);
+ router.sendResult(messageId, '', null, err);
+ });
};
function methodIsStreaming(methodSig) {
diff --git a/src/security/caveat-validator-registry.js b/src/security/caveat-validator-registry.js
index c57b309..14a40f3 100644
--- a/src/security/caveat-validator-registry.js
+++ b/src/security/caveat-validator-registry.js
@@ -88,7 +88,12 @@
return cb(new vdlSecurity.CaveatNotRegisteredError(ctx,
'Unknown caveat id: ' + this._makeKey(caveat.id)));
}
- return validator.validate(ctx, call, vom.decode(caveat.paramVom), cb);
+ return vom.decode(caveat.paramVom, false, function(err, val) {
+ if (err) {
+ return cb(err);
+ }
+ return validator.validate(ctx, call, val, cb);
+ });
};
/**
diff --git a/src/vom/binary-reader.js b/src/vom/binary-reader.js
index 3cf95b1..fd17058 100644
--- a/src/vom/binary-reader.js
+++ b/src/vom/binary-reader.js
@@ -7,6 +7,8 @@
* @private
*/
+var Promise = require('../lib/promise');
+var byteUtil = require('../vdl/byte-util');
module.exports = BinaryReader;
/**
@@ -23,44 +25,51 @@
/**
* Reads a byte from the bufer.
- * @return {number} The byte value. EOF is represented by null.
+ * @return {Promise<number>} The byte value. EOF is represented by null.
*/
BinaryReader.prototype.readByte = function() {
var val = this.buf[this.pos];
this.pos++;
if (val === undefined) {
- throw new Error('Failed to read byte, reached end of buffer');
+ return Promise.reject(
+ new Error('Failed to read byte, reached end of buffer'));
}
- return val;
+ return Promise.resolve(val);
};
/**
* Returns the next byte from the buffer without advancing the reader
- * @return {number} The byte value. EOF is represented by null.
+ * @return {Promise<number>} The byte value. EOF is represented by null.
*/
BinaryReader.prototype.peekByte = function() {
var val = this.buf[this.pos];
if (val === undefined) {
- throw new Error('Failed to read byte, reached end of buffer');
+ return Promise.reject(
+ new Error('Failed to read byte, reached end of buffer'));
}
- return val;
+ return Promise.resolve(val);
};
/**
* Reads an array of bytes from the buffer.
* @param {number} amt. The number of bytes to read.
- * @return {Uint8Array} The byte array. If the whole size cannot be read, null
- * (representing EOF) is returned.
+ * @return {Promise<Uint8Array>} The byte array. If the whole size cannot be
+ * read, null (representing EOF) is returned.
*/
BinaryReader.prototype.readByteArray = function(amt) {
var arr = this.buf.subarray(this.pos, this.pos + amt);
this.pos += amt;
if (this.pos > this.buf) {
- throw new Error('Failed to read ' + amt + ' bytes. Hit EOF.');
+ return Promise.reject(
+ new Error('Failed to read ' + amt + ' bytes. Hit EOF.'));
}
- return arr;
+ return Promise.resolve(arr);
};
BinaryReader.prototype.hasData = function() {
return this.pos < this.buf.length;
};
+
+BinaryReader.prototype.getHexBytes = function() {
+ return byteUtil.bytes2Hex(this.buf.slice(this.pos));
+};
diff --git a/src/vom/byte-array-message-reader.js b/src/vom/byte-array-message-reader.js
index 0fff25d..3b327dd 100644
--- a/src/vom/byte-array-message-reader.js
+++ b/src/vom/byte-array-message-reader.js
@@ -13,6 +13,7 @@
var RawVomReader = require('./raw-vom-reader.js');
var TypeUtil = require('../vdl/type-util.js');
+
/**
* Create a VOM message reader backed by a byte array.
* @param {Uint8Array} bytes The byte array.
@@ -20,42 +21,44 @@
* @memberof module:vanadium.vom
*/
function ByteArrayMessageReader(bytes) {
- this.rawReader = new RawVomReader(bytes);
- var header = this.rawReader._readRawBytes(1);
- if (header[0] !== 0x80) {
+ if (bytes[0] !== 0x80) {
throw new Error('Improperly formatted bytes. Must start with 0x80');
}
+ this.rawReader = new RawVomReader(bytes);
+ // Consume the header byte.
+ this.rawReader._readRawBytes(1);
}
/**
* Get the the type of the next value message.
* @private
* @param {TypeDecoder} typeDecoder The current type decoder.
- * @return {Type} The type of the next message or null if the stream has ended.
+ * @return {Promise<Type>} The type of the next message or null if the stream
+ * has ended.
*/
ByteArrayMessageReader.prototype.nextMessageType = function(typeDecoder) {
- while (true) {
- var typeId;
- try {
- typeId = this.rawReader.readInt();
- } catch (error) {
- // Hopefully EOF.
- // TODO(bprosnitz) Make this a more accurate check.
- return null;
- }
+ var bamr = this;
+ return this.rawReader.readInt().then(function(typeId) {
if (typeId < 0) {
- // Type message.
- var len = this.rawReader.readUint();
- var body = this.rawReader._readRawBytes(len);
- typeDecoder.defineType(-typeId, body);
- } else {
- // Value message.
- var type = typeDecoder.lookupType(typeId);
- if (TypeUtil.shouldSendLength(type)) {
- // length
- this.rawReader.readUint();
- }
- return type;
+ // Type message. We add the type to the typeDecoder and continue reading
+ // trying to find a value message.
+ return bamr.rawReader.readUint().then(function(len) {
+ return bamr.rawReader._readRawBytes(len);
+ }).then(function(body) {
+ return typeDecoder.defineType(-typeId, body);
+ }).then(function() {
+ return bamr.nextMessageType(typeDecoder);
+ });
}
- }
+ var type = typeDecoder.lookupType(typeId);
+ if (TypeUtil.shouldSendLength(type)) {
+ return bamr.rawReader.readUint().then(function() {
+ return type;
+ });
+ }
+ return type;
+ }, function(err) {
+ // Hopefull this is an eof.
+ return null;
+ });
};
diff --git a/src/vom/byte-stream-message-reader.js b/src/vom/byte-stream-message-reader.js
index 5794bbb..520d55e 100644
--- a/src/vom/byte-stream-message-reader.js
+++ b/src/vom/byte-stream-message-reader.js
@@ -10,65 +10,61 @@
module.exports = ByteStreamMessageReader;
+var StreamReader = require('./stream-reader.js');
var RawVomReader = require('./raw-vom-reader.js');
var TypeUtil = require('../vdl/type-util.js');
/**
* Create a VOM message reader backed by a byte stream.
- * @param {Uint8Array} bytes The initial byte stream.
* @constructor
* @memberof module:vanadium.vom
*/
-function ByteStreamMessageReader(bytes) {
- this.rawReader = new RawVomReader(bytes);
- var header = this.rawReader._readRawBytes(1);
- if (header[0] !== 0x80) {
- throw new Error('Improperly formatted bytes. Must start with 0x80');
- }
+function ByteStreamMessageReader() {
+ this.rawReader = new RawVomReader(new StreamReader());
+ // Consume the header byte.
+ this.headerPromise = this.rawReader.readByte(1).then(function(byte) {
+ if (byte !== 0x80) {
+ throw new Error('Improperly formatted bytes. Must start with 0x80');
+ }
+ });
}
/**
* Get the the type of the next value message.
* @private
* @param {TypeDecoder} typeDecoder The current type decoder.
- * @return {Type} The type of the next message or null if the stream has ended.
+ * @return {Promise<Type>} The type of the next message or null if the stream
+ * has ended.
*/
ByteStreamMessageReader.prototype.nextMessageType = function(typeDecoder) {
- while (true) {
- var typeId;
- try {
- typeId = this.rawReader.readInt();
- } catch (error) {
- // Hopefully EOF.
- // TODO(bprosnitz) Make this a more accurate check.
- return null;
- }
+ var bsmr = this;
+ return this.headerPromise.then(function() {
+ return bsmr.rawReader.readInt();
+ }).then(function(typeId) {
if (typeId < 0) {
- // Type message.
- var len = this.rawReader.readUint();
- var body = this.rawReader._readRawBytes(len);
- typeDecoder.defineType(-typeId, body);
- } else {
- // Value message.
- var type = typeDecoder.lookupType(typeId);
- if (TypeUtil.shouldSendLength(type)) {
- // length
- this.rawReader.readUint();
- }
- return type;
+ // Type message. We add the type to the typeDecoder and continue reading
+ // trying to find a value message.
+ return bsmr.rawReader.readUint().then(function(len) {
+ return bsmr.rawReader._readRawBytes(len);
+ }).then(function(body) {
+ return typeDecoder.defineType(-typeId, body);
+ }).then(function() {
+ return bsmr.nextMessageType(typeDecoder);
+ });
}
- }
-};
-
-ByteStreamMessageReader.prototype.clear = function() {
- this.rawReader = null;
+ var type = typeDecoder.lookupType(typeId);
+ if (TypeUtil.shouldSendLength(type)) {
+ return bsmr.rawReader.readUint().then(function() {
+ return type;
+ });
+ }
+ return type;
+ }, function(err) {
+ // Hopefull this is an eof.
+ return null;
+ });
};
ByteStreamMessageReader.prototype.addBytes = function(bytes) {
- // TODO(bjornick): Implement a real stream reader.
- if (this.rawReader && this.rawReader.hasData()) {
- throw new Error('Can\'t addBytes if all the previous bytes ' +
- 'haven\'t been read');
- }
- this.rawReader = new RawVomReader(bytes);
+ this.rawReader._reader.addBytes(bytes);
};
diff --git a/src/vom/decode.js b/src/vom/decode.js
index 4ac8e0e..4065544 100644
--- a/src/vom/decode.js
+++ b/src/vom/decode.js
@@ -14,11 +14,12 @@
* @param {boolean} [deepWrap=false] true if the values on the object should
* remain wrapped with type information deeply, false (default) to strip
* deep type information and obtain a more usage-friendly value
- * @return {*} decoded value
+ * @param {module:vanadium.vom.decode~cb} cb
+ * @return {Promise<*>} decoded value
* @memberof module:vanadium.vom
*/
-function decode(bytes, deepWrap) {
+function decode(bytes, deepWrap, cb) {
var reader = new ByteArrayMessageReader(bytes);
var decoder = new Decoder(reader, deepWrap);
- return decoder.decode();
+ return decoder.decode(cb);
}
diff --git a/src/vom/decoder.js b/src/vom/decoder.js
index ae51b8e..198f589 100644
--- a/src/vom/decoder.js
+++ b/src/vom/decoder.js
@@ -18,6 +18,11 @@
var unwrap = require('../vdl/type-util').unwrap;
var wiretype = require('../gen-vdl/v.io/v23/vom');
var nativeTypeRegistry = require('../vdl/native-type-registry');
+var Deferred = require('../lib/deferred');
+var Promise = require('../lib/promise');
+var TaskSequence = require('../lib/task-sequence');
+var promiseFor = require('../lib/async-helper').promiseFor;
+var promiseWhile = require('../lib/async-helper').promiseWhile;
var endByte = unwrap(wiretype.WireCtrlEnd);
var nilByte = unwrap(wiretype.WireCtrlNil);
@@ -36,6 +41,7 @@
this._messageReader = messageReader;
this._typeDecoder = new TypeDecoder();
this._deepWrap = deepWrap || false;
+ this._tasks = new TaskSequence();
}
/*
@@ -46,32 +52,43 @@
* Decodes the next object off of the message reader.
* @return {object} The next object or null if no more objects are available.
*/
-Decoder.prototype.decode = function() {
- var type = this._messageReader.nextMessageType(this._typeDecoder);
- if (type === null) {
- return null;
- }
- var reader = this._messageReader.rawReader;
- return this._decodeValue(type, reader, true);
+Decoder.prototype.decode = function(cb) {
+ var def = new Deferred(cb);
+ var decoder = this;
+ this._tasks.addTask(function() {
+ return decoder._messageReader.nextMessageType(decoder._typeDecoder).
+ then(function(type) {
+ if (type === null) {
+ return null;
+ }
+ var reader = decoder._messageReader.rawReader;
+ return decoder._decodeValue(type, reader, true);
+ }).then(function(v) {
+ def.resolve(v);
+ }, function(err) {
+ def.reject(err);
+ });
+ });
+ return def.promise;
};
Decoder.prototype._decodeValue = function(t, reader, shouldWrap) {
- var value = this._decodeUnwrappedValue(t, reader);
+ return this._decodeUnwrappedValue(t, reader).then(function(value) {
+ // Special: JSValue should be reduced and returned as a native value.
+ if (types.JSVALUE.equals(t)) {
+ return canonicalize.reduce(value, types.JSVALUE);
+ }
- // Special: JSValue should be reduced and returned as a native value.
- if (types.JSVALUE.equals(t)) {
- return canonicalize.reduce(value, types.JSVALUE);
- }
-
- if (nativeTypeRegistry.hasNativeType(t)) {
- return canonicalize.reduce(value, t);
- }
- // If this value should be wrapped, apply the constructor.
- if (t.kind !== kind.TYPEOBJECT && shouldWrap) {
- var Ctor = Registry.lookupOrCreateConstructor(t);
- return new Ctor(value, this._deepWrap);
- }
- return value;
+ if (nativeTypeRegistry.hasNativeType(t)) {
+ return canonicalize.reduce(value, t);
+ }
+ // If this value should be wrapped, apply the constructor.
+ if (t.kind !== kind.TYPEOBJECT && shouldWrap) {
+ var Ctor = Registry.lookupOrCreateConstructor(t);
+ return new Ctor(value, this._deepWrap);
+ }
+ return value;
+ });
};
Decoder.prototype._decodeUnwrappedValue = function(t, reader) {
@@ -95,10 +112,14 @@
return reader.readFloat();
case kind.COMPLEX64:
case kind.COMPLEX128:
- return {
- real: reader.readFloat(),
- imag: reader.readFloat()
- };
+ return reader.readFloat().then(function(real) {
+ return reader.readFloat().then(function(imag) {
+ return {
+ real: real,
+ imag: imag
+ };
+ });
+ });
case kind.STRING:
return reader.readString();
case kind.ENUM:
@@ -120,38 +141,46 @@
case kind.OPTIONAL:
return this._decodeOptional(t, reader);
case kind.TYPEOBJECT:
- var typeId = reader.readUint();
- var type = this._typeDecoder.lookupType(typeId);
- if (type === undefined) {
- throw new Error('Undefined type for TYPEOBJECT id ' + typeId);
- }
- return type;
+ var decoder = this;
+ return reader.readUint().then(function(typeId) {
+ var type = decoder._typeDecoder.lookupType(typeId);
+ if (type === undefined) {
+ throw new Error('Undefined type for TYPEOBJECT id ' + typeId);
+ }
+ return type;
+ });
+
default:
- throw new Error('Support for decoding kind ' + t.kind +
- ' not yet implemented');
+ return Promise.reject(new Error('Support for decoding kind ' + t.kind +
+ ' not yet implemented'));
}
};
Decoder.prototype._decodeEnum = function(t, reader) {
- var index = reader.readUint();
- if (t.labels.length <= index) {
- throw new Error('Invalid enum index ' + index);
- }
- return t.labels[index];
+ return reader.readUint().then(function(index) {
+ if (t.labels.length <= index) {
+ throw new Error('Invalid enum index ' + index);
+ }
+ return t.labels[index];
+ });
};
Decoder.prototype._decodeList = function(t, reader) {
- var len = reader.readUint();
- return this._readSequence(t, len, reader);
+ var decoder = this;
+ return reader.readUint().then(function(len) {
+ return decoder._readSequence(t, len, reader);
+ });
};
Decoder.prototype._decodeArray = function(t, reader) {
+ var decoder = this;
// Consume the zero byte at the beginning of the array.
- var b = reader.readByte();
- if (b !== 0) {
- throw new Error('Unexpected length ' + b);
- }
- return this._readSequence(t, t.len, reader);
+ return reader.readByte().then(function(b) {
+ if (b !== 0) {
+ throw new Error('Unexpected length ' + b);
+ }
+ return decoder._readSequence(t, t.len, reader);
+ });
};
Decoder.prototype._readSequence = function(t, len, reader) {
@@ -161,99 +190,143 @@
// The Uint8Array is created by calling subarray. In node, this means that
// its buffer points to the whole binary_reader buffer. To fix this, we
// recreate the Uint8Array here to avoid exposing it.
- return new Uint8Array(reader._readRawBytes(len));
+ return reader._readRawBytes(len).then(function(b) {
+ return new Uint8Array(b);
+ });
}
var arr = new Array(len);
- for (var i = 0; i < len; i++) {
- arr[i] = this._decodeValue(t.elem, reader, false);
- }
- return arr;
+ var i = 0;
+ var decoder = this;
+ return promiseFor(len, function() {
+ return decoder._decodeValue(t.elem, reader, false).then(function(val) {
+ arr[i] = val;
+ i++;
+ });
+ }).then(function() {
+ return arr;
+ });
};
Decoder.prototype._decodeSet = function(t, reader) {
- var len = reader.readUint();
+ var decoder = this;
var s = new Set();
- for (var i = 0; i < len; i++) {
- var key = this._decodeValue(t.key, reader, false);
- s.add(key);
- }
- return s;
+ return reader.readUint().then(function(len) {
+ return promiseFor(len, function() {
+ return decoder._decodeValue(t.key, reader, false).then(function(key) {
+ s.add(key);
+ });
+ });
+ }).then(function() {
+ return s;
+ });
};
Decoder.prototype._decodeMap = function(t, reader) {
- var len = reader.readUint();
- var m = new Map();
- for (var i = 0; i < len; i++) {
- var key = this._decodeValue(t.key, reader, false);
- var val = this._decodeValue(t.elem, reader, false);
- m.set(key, val);
- }
- return m;
+ var decoder = this;
+ return reader.readUint().then(function(len) {
+ var m = new Map();
+ var i = 0;
+ if (len > 0) {
+ return decoder._decodeValue(t.key, reader, false).then(handleKey);
+ }
+ return m;
+
+ function handleKey(key) {
+ return decoder._decodeValue(t.elem, reader, false).then(function(value) {
+ m.set(key, value);
+ i++;
+ if (i < len) {
+ return decoder._decodeValue(t.key, reader, false).then(handleKey);
+ }
+ return m;
+ });
+ }
+ });
};
Decoder.prototype._decodeStruct = function(t, reader) {
+ var decoder = this;
var Ctor = Registry.lookupOrCreateConstructor(t);
var obj = Object.create(Ctor.prototype);
- while (true) {
- var ctrl = reader.tryReadControlByte();
- if (ctrl === endByte) {
- break;
+
+ return promiseWhile(notEndByte, readField).then(function() {
+ return obj;
+ });
+ function notEndByte() {
+ return reader.tryReadControlByte().then(function(ctrl) {
+ if (ctrl === endByte) {
+ return false;
+ }
+
+ if (ctrl) {
+ throw new Error('Unexpected control byte ' + ctrl);
+ }
+ return true;
+ });
+ }
+ function readField() {
+ var name = '';
+ return reader.readUint().then(function(nextIndex) {
+ if (t.fields.length <= nextIndex) {
+ throw new Error('Struct index ' + nextIndex + ' out of bounds');
+ }
+ var field = t.fields[nextIndex];
+ name = util.uncapitalize(field.name);
+ return decoder._decodeValue(field.type, reader, false);
+ }).then(function(val) {
+ obj[name] = val;
+ });
+ }
+};
+
+Decoder.prototype._decodeOptional = function(t, reader) {
+ var decoder = this;
+ return reader.peekByte().then(function(isNil) {
+ if (isNil === nilByte) {
+ // We don't have to wait for the read to finish.
+ reader.readByte();
+ return null;
+ }
+ return decoder._decodeValue(t.elem, reader, false);
+ });
+};
+
+Decoder.prototype._decodeAny = function(reader) {
+ var decoder = this;
+ return reader.tryReadControlByte().then(function(ctrl) {
+ if (ctrl === nilByte) {
+ return null;
}
if (ctrl) {
throw new Error('Unexpected control byte ' + ctrl);
}
-
- var nextIndex = reader.readUint();
- if (t.fields.length <= nextIndex) {
- throw new Error('Struct index ' + nextIndex + ' out of bounds');
- }
- var field = t.fields[nextIndex];
- var val = this._decodeValue(field.type, reader, false);
- obj[util.uncapitalize(field.name)] = val;
- }
- return obj;
-};
-
-Decoder.prototype._decodeOptional = function(t, reader) {
- var isNil = reader.peekByte();
- if (isNil === nilByte) {
- reader.readByte();
- return null;
- }
- return this._decodeValue(t.elem, reader, false);
-};
-
-Decoder.prototype._decodeAny = function(reader) {
- var ctrl = reader.tryReadControlByte();
- if (ctrl === nilByte) {
- return null;
- }
-
- if (ctrl) {
- throw new Error('Unexpected control byte ' + ctrl);
- }
- var typeId = reader.readUint();
- var type = this._typeDecoder.lookupType(typeId);
- if (type === undefined) {
- throw new Error('Undefined typeid ' + typeId);
- }
- return this._decodeValue(type, reader, true);
+ return reader.readUint().then(function(typeId) {
+ var type = decoder._typeDecoder.lookupType(typeId);
+ if (type === undefined) {
+ throw new Error('Undefined typeid ' + typeId);
+ }
+ return decoder._decodeValue(type, reader, true);
+ });
+ });
};
Decoder.prototype._decodeUnion = function(t, reader) {
+ var decoder = this;
+ var field;
// Find the Union field that was set and decode its value.
- var fieldIndex = reader.readUint();
- if (t.fields.length <= fieldIndex) {
- throw new Error('Union index ' + fieldIndex + ' out of bounds');
- }
- var field = t.fields[fieldIndex];
- var val = this._decodeValue(field.type, reader, false);
-
- // Return the Union with a single field set to its decoded value.
- var Ctor = Registry.lookupOrCreateConstructor(t);
- var obj = Object.create(Ctor.prototype);
- obj[util.uncapitalize(field.name)] = val;
- return obj;
+ return reader.readUint().then(function(fieldIndex) {
+ if (t.fields.length <= fieldIndex) {
+ throw new Error('Union index ' + fieldIndex + ' out of bounds');
+ }
+ field = t.fields[fieldIndex];
+ return decoder._decodeValue(field.type, reader, false);
+ }).then(function(val) {
+ // Return the Union with a single field set to its decoded value.
+ var Ctor = Registry.lookupOrCreateConstructor(t);
+ var obj = Object.create(Ctor.prototype);
+ obj[util.uncapitalize(field.name)] = val;
+ return obj;
+ });
};
diff --git a/src/vom/raw-vom-reader.js b/src/vom/raw-vom-reader.js
index f38d683..205841a 100644
--- a/src/vom/raw-vom-reader.js
+++ b/src/vom/raw-vom-reader.js
@@ -18,133 +18,155 @@
* RawVomReader reads VOM primitive values (numbers, strings, bools) from a
* provided Uint8Array.
* @private
- * @param {Uint8Array} arr The array to read from.
+ * @param {Uint8Array|StreamReader} arr The array to read from.
* @constructor
*/
function RawVomReader(arr) {
- this._reader = new BinaryReader(arr);
+ if (arr instanceof Uint8Array) {
+ this._reader = new BinaryReader(arr);
+ } else {
+ this._reader = arr;
+ }
}
/**
- * Reads a BigUint.
- * @return {BigUint} The BigUint that was read.
+ * Reads a uint as a BigInt.
+ * @return {Promise<BigInt>} The BigUint that was read.
*/
RawVomReader.prototype.readBigUint = function() {
- var firstByte = this._reader.readByte();
- if (firstByte <= 0x7f) {
- if (firstByte === 0) {
- return new BigInt(0, new Uint8Array(0));
+ var reader = this;
+ return this._reader.readByte().then(function(firstByte) {
+ if (firstByte <= 0x7f) {
+ if (firstByte === 0) {
+ return new BigInt(0, new Uint8Array(0));
+ }
+ return new BigInt(1, new Uint8Array([firstByte]));
}
- return new BigInt(1, new Uint8Array([firstByte]));
- }
- var numBytes = 0x100 - firstByte;
- if (numBytes > 8 || numBytes < 1) {
- throw new Error('Invalid size ' + numBytes);
- }
+ var numBytes = 0x100 - firstByte;
+ if (numBytes > 8 || numBytes < 1) {
+ throw new Error('Invalid size ' + numBytes);
+ }
- var uintBytes = this._reader.readByteArray(numBytes);
- return new BigInt(1, uintBytes);
+ return reader._reader.readByteArray(numBytes).then(function(uintBytes) {
+ return new BigInt(1, uintBytes);
+ });
+ });
};
/**
* Returns a control byte if the next byte is a control byte.
- * @returns {Number} a control byte if there is one, null if there is no
- * control byte
+ * @returns {Promise<number>} a control byte if there is one, null if there
+ * is no control byte.
*/
RawVomReader.prototype.tryReadControlByte = function() {
- var firstByte = this.peekByte();
- if (firstByte === null) {
- return null;
- }
+ var reader = this;
+ return this.peekByte().then(function(firstByte) {
+ if (firstByte === null) {
+ return null;
+ }
- if (firstByte > 0x7f && firstByte <= 0xef) {
- return this.readByte();
- }
- return null;
+ if (firstByte > 0x7f && firstByte <= 0xef) {
+ return reader.readByte();
+ }
+ return null;
+ });
};
/**
* Reads a BigInt.
- * @return {BigInt} The BigInt that was read.
+ * @return {Promise<BigInt>} The BigInt that was read.
*/
RawVomReader.prototype.readBigInt = function() {
- var uint = this.readBigUint();
- var bytes = uint.getUintBytes();
- var sign;
- if (uint.getSign() === 0) {
- sign = 0;
- } else if (bytes.length > 0 && (bytes[bytes.length - 1] & 0x01) !== 0) {
- sign = -1;
- } else {
- sign = 1;
- }
- bytes = ByteUtil.shiftRightOne(bytes);
- if (sign === -1) {
- bytes = ByteUtil.increment(bytes);
- }
- return new BigInt(sign, bytes);
+ return this.readBigUint().then(function(uint) {
+ var bytes = uint.getUintBytes();
+ var sign;
+ if (uint.getSign() === 0) {
+ sign = 0;
+ } else if (bytes.length > 0 && (bytes[bytes.length - 1] & 0x01) !== 0) {
+ sign = -1;
+ } else {
+ sign = 1;
+ }
+ bytes = ByteUtil.shiftRightOne(bytes);
+ if (sign === -1) {
+ bytes = ByteUtil.increment(bytes);
+ }
+ return new BigInt(sign, bytes);
+ });
};
/**
* Reads a unsigned integer as a native JavaScript number.
- * @return {number} The uint that was read.
+ * @return {Promise<number>} The uint that was read.
*/
RawVomReader.prototype.readUint = function() {
- return this.readBigUint().toNativeNumber();
+ return this.readBigUint().then(function(uint) {
+ return uint.toNativeNumber();
+ });
};
/**
* Reads a integer as a native JavaScript number.
- * @return {number} The int that was read.
+ * @return {Promise<number>} The int that was read.
*/
RawVomReader.prototype.readInt = function() {
- return this.readBigInt().toNativeNumber();
+ return this.readBigInt().then(function(uint) {
+ return uint.toNativeNumber();
+ });
};
/**
* Reads a float as a native JavaScript number.
- * @return {number} The float that was read.
+ * @return {Promise<number>} The float that was read.
*/
RawVomReader.prototype.readFloat = function() {
- var uintBytes = this.readBigUint().getUintBytes();
- var arr = new Uint8Array(8);
- arr.set(uintBytes, arr.length - uintBytes.length);
- var view = new DataView(arr.buffer);
- return view.getFloat64(0, true);
+ return this.readBigUint().then(function (bigInt) {
+ var uintBytes = bigInt.getUintBytes();
+ var arr = new Uint8Array(8);
+ arr.set(uintBytes, arr.length - uintBytes.length);
+ var view = new DataView(arr.buffer);
+ return view.getFloat64(0, true);
+ });
};
/**
* Reads a string.
- * @return {string} The string that was read.
+ * @return {Promise<string>} The string that was read.
*/
RawVomReader.prototype.readString = function() {
- var length = this.readUint();
- var str = '';
- for (var i = 0; i < length; i++) {
- str += String.fromCharCode(this._reader.readByte());
- }
- return decodeURIComponent(escape(str));
+ var reader = this;
+ return this.readUint().then(function(length) {
+ return reader._reader.readByteArray(length);
+ }).then(function(bytes) {
+ var str = '';
+ for (var i = 0; i < bytes.length; i++) {
+ str += String.fromCharCode(bytes[i]);
+ }
+ return decodeURIComponent(escape(str));
+ });
};
/**
* Reads a boolean.
- * @return {boolean} The boolean that was read.
+ * @return {Promise<boolean>} The boolean that was read.
*/
RawVomReader.prototype.readBool = function() {
- var b = this._reader.readByte();
- if (b === 1) {
- return true;
- } else if (b === 0) {
- return false;
- }
- throw new Error('Invalid boolean byte ' + b);
+ return this._reader.readByte().then(function(b) {
+ if (b === 1) {
+ return true;
+ } else if (b === 0) {
+ return false;
+ }
+
+ throw new Error('Invalid boolean byte ' + b);
+ });
};
/**
* Reads a single VOM byte.
- * @return {byte} The byte that was read.
+ * @return {Promise<byte>} The byte that was read.
*/
RawVomReader.prototype.readByte = function() {
return this._reader.readByte();
@@ -152,7 +174,7 @@
/**
* Reads a single VOM byte without advancing the reader
- * @return {byte} The byte that was read.
+ * @return {Promise<number>} The byte that was read.
*/
RawVomReader.prototype.peekByte = function() {
return this._reader.peekByte();
@@ -161,7 +183,7 @@
/**
* Reads raw bytes.
* @param {number} amt The number of bytes to read.
- * @return {Uint8Array} The bytes that were read.
+ * @return {Promise<Uint8Array>} The bytes that were read.
*/
RawVomReader.prototype._readRawBytes = function(amt) {
return this._reader.readByteArray(amt);
diff --git a/src/vom/stream-reader.js b/src/vom/stream-reader.js
new file mode 100644
index 0000000..0554ccb
--- /dev/null
+++ b/src/vom/stream-reader.js
@@ -0,0 +1,165 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var Deferred = require('../lib/deferred');
+var byteUtil = require('../vdl/byte-util');
+var TaskSequence = require('../lib/task-sequence');
+
+module.exports = StreamReader;
+
+/**
+ * StreamReader provides a Reader api over a stream of bytes
+ * @private
+ * @constructor
+ */
+function StreamReader() {
+ this._bufs = [];
+ this._closed = false;
+ this._sequence = new TaskSequence();
+
+ this._bytesAvailableDef = new Deferred();
+}
+
+
+/**
+ * Adds a set of bytes to the stream
+ * @param {Uint8Array} bytes The bytes to add
+ */
+StreamReader.prototype.addBytes = function(bytes) {
+ if (bytes.length === 0) {
+ return;
+ }
+ this._bufs.push(bytes);
+ this._bytesAvailableDef.resolve();
+};
+
+/**
+ * Closes the stream reader, which forces readers to
+ * consume all the bytes left.
+ */
+StreamReader.prototype.close = function() {
+ this._closed = true;
+ this._bytesAvailableDef.resolve();
+};
+
+StreamReader.prototype._waitForData = function() {
+ if (this._hasBytes() || this._closed) {
+ return Promise.resolve();
+ }
+ this._bytesAvailableDef = new Deferred();
+ return this._bytesAvailableDef.promise;
+};
+
+StreamReader.prototype._hasBytes = function() {
+ return this._bufs.length > 0;
+};
+
+/**
+ * Reads a byte from the stream
+ * @return {Promise<number>}
+ */
+StreamReader.prototype.readByte = function() {
+ var reader = this;
+ var def = new Deferred();
+ function readByte() {
+ return reader._waitForData().then(function() {
+ if (!reader._hasBytes()) {
+ return Promise.reject(
+ new Error('Failed to read byte, eof is ' + reader._closed));
+ }
+ var byte = reader._bufs[0][0];
+ if (reader._bufs[0].length === 1) {
+ reader._bufs.shift();
+ } else {
+ reader._bufs[0] = reader._bufs[0].subarray(1);
+ }
+ return byte;
+ }).then(function(b) {
+ def.resolve(b);
+ }, function(err) {
+ def.reject(err);
+ });
+ }
+ reader._sequence.addTask(readByte);
+ return def.promise;
+};
+
+/**
+ * Peeks a byte from the stream
+ * @return {Promise<number>}
+ */
+StreamReader.prototype.peekByte = function() {
+ var reader = this;
+ var def = new Deferred();
+ function peekByte() {
+ return reader._waitForData().then(function() {
+ if (!reader._hasBytes()) {
+ return Promise.reject(
+ new Error('Failed to read byte, eof is ' + reader._closed));
+ }
+ return reader._bufs[0][0];
+ }).then(function(b) {
+ def.resolve(b);
+ }, function(err) {
+ def.reject(err);
+ });
+ }
+ reader._sequence.addTask(peekByte);
+ return def.promise;
+};
+
+/**
+ * Reads a byte array from the stream
+ * @param {number} amt The number to read.
+ * @return {Promise<Uint8Array>} A promise that will be resolved
+ * with the result.
+ */
+StreamReader.prototype.readByteArray = function(amt) {
+ var reader = this;
+ var def = new Deferred();
+ var pos = 0;
+ var buf = new Uint8Array(amt);
+ var bytesNeeded = amt;
+ function readByteArray() {
+ return reader._waitForData().then(function() {
+ var currentBuf = reader._bufs[0];
+ while (bytesNeeded > 0 && currentBuf) {
+ if (currentBuf.length < bytesNeeded) {
+ // Consume the whole array.
+ buf.set(currentBuf, pos);
+ pos += currentBuf.length;
+ bytesNeeded -= currentBuf.length;
+ reader._bufs.shift();
+ currentBuf = reader._bufs[0];
+ } else {
+ buf.set(currentBuf.subarray(0, bytesNeeded), pos);
+ pos += bytesNeeded;
+ reader._bufs[0] = currentBuf.subarray(bytesNeeded);
+ bytesNeeded = 0;
+ }
+ }
+
+ if (bytesNeeded === 0) {
+ return buf;
+ }
+
+ if (reader._closed) {
+ return Promise.reject(
+ new Error('Failed to read ' + amt + 'bytes, eof is true'));
+ }
+ return readByteArray();
+ }).then(function(arr) {
+ return def.resolve(arr);
+ }, function(err) {
+ return def.reject(err);
+ });
+ }
+
+ this._sequence.addTask(readByteArray);
+ return def.promise;
+};
+
+StreamReader.prototype.getHexBytes = function() {
+ return this._bufs.map(byteUtil.bytes2Hex).join('');
+};
diff --git a/src/vom/type-decoder.js b/src/vom/type-decoder.js
index d3fb503..eccfa1a 100644
--- a/src/vom/type-decoder.js
+++ b/src/vom/type-decoder.js
@@ -40,6 +40,8 @@
var RawVomReader = require('./raw-vom-reader.js');
var unwrap = require('../vdl/type-util').unwrap;
var wiretype = require('../gen-vdl/v.io/v23/vom');
+var promiseFor = require('../lib/async-helper').promiseFor;
+var promiseWhile = require('../lib/async-helper').promiseWhile;
var endByte = unwrap(wiretype.WireCtrlEnd);
@@ -81,7 +83,7 @@
/**
* Add a new type definition to the type cache.
* @param {number} typeId The id of the type.
- * @param {Uint8Array} The raw bytes that describe the type structure.
+ * @param {Promise<Uint8Array>} The raw bytes that describe the type structure.
*/
TypeDecoder.prototype.defineType = function(typeId, messageBytes) {
if (typeId < 0) {
@@ -93,7 +95,10 @@
}
// Read the type in and add it to the partial type set.
- this._partialTypes[typeId] = this._readPartialType(messageBytes);
+ var td = this;
+ return this._readPartialType(messageBytes).then(function(type) {
+ td._partialTypes[typeId] = type;
+ });
};
/**
@@ -262,217 +267,192 @@
};
/**
+ * Reads a type off of the wire.
+ * @param {RawVomReader} reader The reader with the data
+ * @param {module:vanadium.vdl.kind} kind The kind that is being read.
+ * @param {string} wireName The name of the type. This is used to generate
+ * error messages
+ * @param {object[]} indexMap An array of options specifying how to read the
+ * fields of the type object. The index in the array is the index in the wire
+ * structure for the wire type. Each object in the array should have a key
+ * field which is the name of the field in the wire struct and a fn field with
+ * a function that will be called with this set to reader and returns a promise
+ * with its value. For instance:<br>
+ * <pre>[{key: 'name', fn: reader.readString)}]</pre>
+ * <br>
+ * Means the value at index 0 will correspond to the name field and should
+ * be read by reader.readString
+ * @returns {Promise<object>} A promise with the constructed wire type as the
+ * result.
+ */
+TypeDecoder.prototype._readTypeHelper = function(
+ reader, kind, wireName, indexMap) {
+ var partialType = {
+ name: '',
+ };
+ if (kind) {
+ partialType.kind = kind;
+ }
+
+ function notEndByte() {
+ return reader.tryReadControlByte().then(function(b) {
+ if (b === endByte) {
+ return false;
+ }
+
+ if (b !== null) {
+ return Promise.reject('Unknown control byte ' + b);
+ }
+ return true;
+ });
+ }
+
+ function readField() {
+ var entry;
+ return reader.readUint().then(function(nextIndex) {
+ entry = indexMap[nextIndex];
+ if (!entry) {
+ throw Error('Unexpected index for ' + wireName + ': ' + nextIndex);
+ }
+ return entry.fn.bind(reader)();
+ }).then(function(val) {
+ partialType[entry.key] = val;
+ });
+ }
+ return promiseWhile(notEndByte, readField).then(function() {
+ return partialType;
+ });
+};
+
+TypeDecoder.prototype._readNamedType = function(reader) {
+ return this._readTypeHelper(reader, null, 'WireNamed', [
+ {key: 'name', fn: reader.readString },
+ {key: 'namedTypeId', fn: reader.readUint },
+ ]);
+};
+
+TypeDecoder.prototype._readEnumType = function(reader) {
+ var labels = [];
+ var i = 0;
+ return this._readTypeHelper(reader, kind.ENUM, 'WireEnum',[
+ { key: 'name', fn: reader.readString },
+ { key: 'labels', fn: readLabels },
+ ]);
+ function readLabels() {
+ return reader.readUint().then(function(length) {
+ labels = new Array(length);
+ return reader.readString().then(handleLabel);
+ });
+ }
+ function handleLabel(s) {
+ labels[i] = s;
+ i++;
+ if (i < labels.length) {
+ return reader.readString().then(handleLabel);
+ }
+ return labels;
+ }
+};
+
+TypeDecoder.prototype._readArrayType = function(reader) {
+ return this._readTypeHelper(reader, kind.ARRAY, 'WireArray', [
+ {key: 'name', fn: reader.readString },
+ {key: 'elemTypeId', fn: reader.readUint },
+ {key: 'len', fn: reader.readUint },
+ ]);
+};
+
+TypeDecoder.prototype._readListType = function(reader) {
+ return this._readTypeHelper(reader, kind.LIST, 'WireList', [
+ {key: 'name', fn: reader.readString },
+ {key: 'elemTypeId', fn: reader.readUint },
+ ]);
+};
+
+TypeDecoder.prototype._readOptionalType = function(reader) {
+ return this._readTypeHelper(reader, kind.OPTIONAL, 'WireList', [
+ {key: 'name', fn: reader.readString },
+ {key: 'elemTypeId', fn: reader.readUint },
+ ]);
+};
+
+TypeDecoder.prototype._readSetType = function(reader) {
+ return this._readTypeHelper(reader, kind.SET, 'WireSet', [
+ {key: 'name', fn: reader.readString },
+ {key: 'keyTypeId', fn: reader.readUint },
+ ]);
+};
+
+TypeDecoder.prototype._readMapType = function(reader) {
+ return this._readTypeHelper(reader, kind.MAP, 'WireMap', [
+ {key: 'name', fn: reader.readString },
+ {key: 'keyTypeId', fn: reader.readUint },
+ {key: 'elemTypeId', fn: reader.readUint },
+ ]);
+};
+
+TypeDecoder.prototype._readStructOrUnionType = function(reader, kind) {
+ var fields = [];
+ var i = 0;
+ var td = this;
+ return this._readTypeHelper(reader, kind, 'WireStruct', [
+ {key: 'name', fn: reader.readString },
+ {key: 'fields', fn: readFields },
+ ]).then(function(res) {
+ res.fields = res.fields || [];
+ return res;
+ });
+
+ function readFields() {
+ return reader.readUint().then(function(numFields) {
+ fields = new Array(numFields);
+ return promiseFor(numFields, readField);
+ }).then(function() {
+ return fields;
+ });
+ }
+
+ function readField() {
+ return td._readTypeHelper(reader, null, 'WireField', [
+ {key: 'name', fn: reader.readString },
+ {key: 'typeId', fn: reader.readUint },
+ ]).then(function(field) {
+ fields[i] = field;
+ i++;
+ });
+ }
+};
+
+/**
* Read the binary type description into a partial type description.
* @param {Uint8Array} messageBytes The binary type message.
* @return {PartialType} The type that was read.
*/
TypeDecoder.prototype._readPartialType = function(messageBytes) {
var reader = new RawVomReader(messageBytes);
- var unionId = reader.readUint();
- var partialType = {};
- var nextIndex;
- var i;
- switch (unionId) {
- case BootstrapTypes.unionIds.NAMED_TYPE:
- endDef:
- while (true) {
- if (reader.tryReadControlByte() === endByte) {
- break endDef;
- }
- nextIndex = reader.readUint();
- switch(nextIndex) {
- case 0:
- partialType.name = reader.readString();
- break;
- case 1:
- partialType.namedTypeId = reader.readUint();
- break;
- default:
- throw new Error('Unexpected index for WireNamed: ' + nextIndex);
- }
- }
- break;
- case BootstrapTypes.unionIds.ENUM_TYPE:
- partialType.kind = kind.ENUM;
- endDef2:
- while (true) {
- if (reader.tryReadControlByte() === endByte) {
- break endDef2;
- }
-
- nextIndex = reader.readUint();
- switch(nextIndex) {
- case 0:
- partialType.name = reader.readString();
- break;
- case 1:
- partialType.labels = new Array(reader.readUint());
- for (i = 0; i < partialType.labels.length; i++) {
- partialType.labels[i] = reader.readString();
- }
- break;
- default:
- throw new Error('Unexpected index for WireEnum: ' + nextIndex);
- }
- }
- break;
- case BootstrapTypes.unionIds.ARRAY_TYPE:
- partialType.kind = kind.ARRAY;
- endDef3:
- while (true) {
- if (reader.tryReadControlByte() === endByte) {
- break endDef3;
- }
- nextIndex = reader.readUint();
- switch(nextIndex) {
- case 0:
- partialType.name = reader.readString();
- break;
- case 1:
- partialType.elemTypeId = reader.readUint();
- break;
- case 2:
- partialType.len = reader.readUint();
- break;
- default:
- throw new Error('Unexpected index for WireArray: ' + nextIndex);
- }
- }
- break;
- case BootstrapTypes.unionIds.LIST_TYPE:
- partialType.kind = kind.LIST;
- endDef4:
- while (true) {
- if (reader.tryReadControlByte() === endByte) {
- break endDef4;
- }
- nextIndex = reader.readUint();
- switch(nextIndex) {
- case 0:
- partialType.name = reader.readString();
- break;
- case 1:
- partialType.elemTypeId = reader.readUint();
- break;
- default:
- throw new Error('Unexpected index for WireList: ' + nextIndex);
- }
- }
- break;
- case BootstrapTypes.unionIds.SET_TYPE:
- partialType.kind = kind.SET;
- endDef5:
- while (true) {
- if (reader.tryReadControlByte() === endByte) {
- break endDef5;
- }
- nextIndex = reader.readUint();
- switch(nextIndex) {
- case 0:
- partialType.name = reader.readString();
- break;
- case 1:
- partialType.keyTypeId = reader.readUint();
- break;
- default:
- throw new Error('Unexpected index for WireSet: ' + nextIndex);
- }
- }
- break;
- case BootstrapTypes.unionIds.MAP_TYPE:
- partialType.kind = kind.MAP;
- endDef6:
- while (true) {
- if (reader.tryReadControlByte() === endByte) {
- break endDef6;
- }
- nextIndex = reader.readUint();
- switch(nextIndex) {
- case 0:
- partialType.name = reader.readString();
- break;
- case 1:
- partialType.keyTypeId = reader.readUint();
- break;
- case 2:
- partialType.elemTypeId = reader.readUint();
- break;
- default:
- throw new Error('Unexpected index for WireMap: ' + nextIndex);
- }
- }
- break;
- case BootstrapTypes.unionIds.STRUCT_TYPE:
- case BootstrapTypes.unionIds.UNION_TYPE:
- if (unionId === BootstrapTypes.unionIds.STRUCT_TYPE) {
- partialType.kind = kind.STRUCT;
- } else {
- partialType.kind = kind.UNION;
- }
- endDef7:
- while (true) {
- if (reader.tryReadControlByte() === endByte) {
- break endDef7;
- }
- nextIndex = reader.readUint();
- switch(nextIndex) {
- case 0:
- partialType.name = reader.readString();
- break;
- case 1:
- partialType.fields = new Array(reader.readUint());
- for (i = 0; i < partialType.fields.length; i++) {
- partialType.fields[i] = {};
- sfEndDef:
- while(true) {
- if (reader.tryReadControlByte() === endByte) {
- break sfEndDef;
- }
- var sfNextIndex = reader.readUint();
- switch(sfNextIndex) {
- case 0:
- var s = reader.readString();
- partialType.fields[i].name = s;
- break;
- case 1:
- partialType.fields[i].typeId = reader.readUint();
- break;
- }
- }
- }
- break;
- default:
- throw new Error('Unexpected index for WireStruct: ' + nextIndex);
- }
- }
- // We allow struct{} definitions.
- if (partialType.kind === kind.STRUCT) {
- partialType.fields = partialType.fields || [];
- }
- break;
- case BootstrapTypes.unionIds.OPTIONAL_TYPE:
- partialType.kind = kind.OPTIONAL;
- endDef9:
- while (true) {
- if (reader.tryReadControlByte() === endByte) {
- break endDef9;
- }
- nextIndex = reader.readUint();
- switch(nextIndex) {
- case 0:
- partialType.name = reader.readString();
- break;
- case 1:
- partialType.elemTypeId = reader.readUint();
- break;
- default:
- throw new Error('Unexpected index for WireOptional: ' + nextIndex);
- }
- }
- break;
- default:
- throw new Error('Unknown wire type id ' + unionId);
- }
- partialType.name = partialType.name || '';
- return partialType;
+ var td = this;
+ return reader.readUint().then(function(unionId) {
+ switch (unionId) {
+ case BootstrapTypes.unionIds.NAMED_TYPE:
+ return td._readNamedType(reader);
+ case BootstrapTypes.unionIds.ENUM_TYPE:
+ return td._readEnumType(reader);
+ case BootstrapTypes.unionIds.ARRAY_TYPE:
+ return td._readArrayType(reader);
+ case BootstrapTypes.unionIds.LIST_TYPE:
+ return td._readListType(reader);
+ case BootstrapTypes.unionIds.SET_TYPE:
+ return td._readSetType(reader);
+ case BootstrapTypes.unionIds.MAP_TYPE:
+ return td._readMapType(reader);
+ case BootstrapTypes.unionIds.STRUCT_TYPE:
+ return td._readStructOrUnionType(reader, kind.STRUCT);
+ case BootstrapTypes.unionIds.UNION_TYPE:
+ return td._readStructOrUnionType(reader, kind.UNION);
+ case BootstrapTypes.unionIds.OPTIONAL_TYPE:
+ return td._readOptionalType(reader);
+ default:
+ throw new Error('Unknown wire type id ' + unionId);
+ }
+ });
};
diff --git a/test/integration/test-js-client-server.js b/test/integration/test-js-client-server.js
index e7f175a..7c58e0b 100644
--- a/test/integration/test-js-client-server.js
+++ b/test/integration/test-js-client-server.js
@@ -240,7 +240,6 @@
Promise
.all(jobs)
.then(function() {
-
// 2. Add a listener or create a stream reader to receive the values
var promise = cache.multiGet(ctx);
var stream = promise.stream;
diff --git a/test/unit/mock-proxy.js b/test/unit/mock-proxy.js
index 5717264..75db872 100644
--- a/test/unit/mock-proxy.js
+++ b/test/unit/mock-proxy.js
@@ -22,6 +22,13 @@
},
sendRequest: function(data, type, handler, id) {
var result = requestHandler(data, type);
+ if (result && result.then) {
+ return result.then(function(result) {
+ handler.handleResponse(0, result);
+ }, function(err) {
+ console.log(err + ' ' + err.stack);
+ });
+ }
handler.handleResponse(0, result);
},
dequeue: function() {}
diff --git a/test/unit/test-async-helper.js b/test/unit/test-async-helper.js
new file mode 100644
index 0000000..dfbbc44
--- /dev/null
+++ b/test/unit/test-async-helper.js
@@ -0,0 +1,57 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var test = require('prova');
+var Promise = require('../../src/lib/promise');
+var asyncHelper = require('../../src/lib/async-helper');
+
+test('promiseFor with n === 0', function(t) {
+ var numCalls = 0;
+ asyncHelper.promiseFor(0, function() {
+ numCalls++;
+ return Promise.resolve();
+ }).then(function() {
+ t.equal(numCalls, 0, 'made wrong number of calls');
+ t.end();
+ }).catch(t.end);
+});
+
+test('promiseFor with n > 0', function(t) {
+ var numCalls = 0;
+ asyncHelper.promiseFor(5, function() {
+ numCalls++;
+ return Promise.resolve();
+ }).then(function() {
+ t.equal(numCalls, 5, 'made wrong number of calls');
+ t.end();
+ }).catch(t.end);
+});
+
+test('promiseWhile with condition always false', function(t) {
+ asyncHelper.promiseWhile(function() {
+ return Promise.resolve(false);
+ }, function neverCall() {
+ t.fail('should never have been called');
+ }).then(function() {
+ t.pass('success!');
+ t.end();
+ }).catch(t.end);
+});
+
+
+test('promiseWhile with simple condition', function(t) {
+ var numCalls = 0;
+ asyncHelper.promiseWhile(function() {
+ if (numCalls < 4) {
+ return Promise.resolve(true);
+ }
+ return Promise.resolve(false);
+ }, function() {
+ numCalls++;
+ return Promise.resolve();
+ }).then(function() {
+ t.equal(numCalls, 4, 'made correct number of calls');
+ t.end();
+ }).catch(t.end);
+});
diff --git a/test/unit/test-create-caveats.js b/test/unit/test-create-caveats.js
index 0954c31..f9421df 100644
--- a/test/unit/test-create-caveats.js
+++ b/test/unit/test-create-caveats.js
@@ -17,9 +17,11 @@
};
var cav = caveats.createCaveat(desc, 9);
t.deepEqual(cav.id, desc.id, 'Correct id');
- t.deepEqual(vom.decode(cav.paramVom), {val: 9}, 'Correct data');
- t.equal(cav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
- t.end();
+ vom.decode(cav.paramVom).then(function(res) {
+ t.deepEqual(res, {val: 9}, 'Correct data');
+ t.equal(cav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
+ t.end();
+ }).catch(t.end);
});
test('createCaveat with incompatible param', function(t) {
@@ -45,36 +47,43 @@
};
var cav = caveats.createCaveat(desc, 9);
t.deepEqual(cav.id, desc.id, 'Correct id');
- t.deepEqual(vom.decode(cav.paramVom), {val: 9}, 'Correct data');
- t.deepEqual(vom.decode(cav.paramVom)._type, desc.paramType,
- 'Correct data type');
- t.equal(cav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
- t.end();
+ vom.decode(cav.paramVom).then(function(res) {
+ t.deepEqual(res, {val: 9}, 'Correct data');
+ t.deepEqual(res._type, desc.paramType, 'Correct data type');
+ t.equal(cav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
+ t.end();
+ }).catch(t.end);
});
test('createConstCaveat', function(t) {
var trueCav = caveats.createConstCaveat(false);
t.deepEqual(trueCav.id, vdlSecurity.ConstCaveat.id, 'Correct id');
- t.deepEqual(vom.decode(trueCav.paramVom), {val: false}, 'Correct data');
- t.equal(trueCav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
- t.end();
+ vom.decode(trueCav.paramVom).then(function(res) {
+ t.deepEqual(res, {val: false}, 'Correct data');
+ t.equal(trueCav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
+ t.end();
+ }).catch(t.end);
});
test('createUnconstrainedUseCaveat', function(t) {
var unCon = caveats.unconstrainedUse;
t.deepEqual(unCon.id, vdlSecurity.ConstCaveat.id, 'Correct id');
- t.deepEqual(vom.decode(unCon.paramVom), {val: true}, 'Correct data');
- t.equal(unCon._type, (new vdlSecurity.Caveat())._type, 'Correct type');
- t.end();
+ vom.decode(unCon.paramVom).then(function(res) {
+ t.deepEqual(res, {val: true}, 'Correct data');
+ t.equal(unCon._type, (new vdlSecurity.Caveat())._type, 'Correct type');
+ t.end();
+ }).catch(t.end);
});
test('createExpiryCaveat w/ date', function(t) {
var date = new Date(1920, 3, 4, 9, 10);
var expiryCav = caveats.createExpiryCaveat(date);
t.deepEqual(expiryCav.id, vdlSecurity.ExpiryCaveat.id, 'Correct id');
- t.deepEqual(vom.decode(expiryCav.paramVom), date, 'Correct data');
- t.equal(expiryCav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
- t.end();
+ vom.decode(expiryCav.paramVom).then(function(res) {
+ t.deepEqual(res, date, 'Correct data');
+ t.equal(expiryCav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
+ t.end();
+ }).catch(t.end);
});
test('createExpiryCaveat w/ millisecond representation', function(t) {
@@ -89,8 +98,9 @@
var methodList = ['A', 'B', 'C'];
var methodCav = caveats.createMethodCaveat(methodList);
t.deepEqual(methodCav.id, vdlSecurity.MethodCaveat.id, 'Correct id');
- t.deepEqual(vom.decode(methodCav.paramVom), {val: methodList},
- 'Correct data');
- t.equal(methodCav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
- t.end();
+ vom.decode(methodCav.paramVom).then(function(res) {
+ t.deepEqual(res, {val: methodList}, 'Correct data');
+ t.equal(methodCav._type, (new vdlSecurity.Caveat())._type, 'Correct type');
+ t.end();
+ }).catch(t.end);
});
diff --git a/test/unit/test-rpc-client.js b/test/unit/test-rpc-client.js
index 883ed47..f205eb0 100644
--- a/test/unit/test-rpc-client.js
+++ b/test/unit/test-rpc-client.js
@@ -8,6 +8,8 @@
var createSignature = require('../../src/vdl/create-signature');
var createMockProxy = require('./mock-proxy');
var vdl = require('../../src/vdl');
+var byteUtil = require('../../src/vdl/byte-util');
+var vom = require('../../src/vom');
var hexVom = require('../../src/lib/hex-vom');
var vtrace = require('../../src/vtrace');
var app = require('../../src/gen-vdl/v.io/x/ref/services/wspr/internal/app');
@@ -49,17 +51,18 @@
}
var mockProxy = createMockProxy(function(data, type) {
- var decodedData = hexVom.decode(data);
- var response = new app.RpcResponse();
+ return vom.decode(byteUtil.hex2Bytes(data)).then(function(decodedData) {
+ var response = new app.RpcResponse();
- if (decodedData instanceof app.RpcRequest &&
- decodedData.method === 'Signature') {
- response.outArgs = [mockSignature];
- } else {
- // Take the first arg and return it in a result list.
- response.outArgs = [decodedData];
- }
- return hexVom.encode(response);
+ if (decodedData instanceof app.RpcRequest &&
+ decodedData.method === 'Signature') {
+ response.outArgs = [mockSignature];
+ } else {
+ // Take the first arg and return it in a result list.
+ response.outArgs = [decodedData];
+ }
+ return hexVom.encode(response);
+ });
});
test('creating instances', function(assert) {
diff --git a/test/unit/test-rpc-signature-cache.js b/test/unit/test-rpc-signature-cache.js
index fd75545..2289292 100644
--- a/test/unit/test-rpc-signature-cache.js
+++ b/test/unit/test-rpc-signature-cache.js
@@ -18,6 +18,8 @@
var vtrace = require('../../src/vtrace');
var vdlsig = require('../../src/gen-vdl/v.io/v23/vdlroot/signature');
var SharedContextKeys = require('../../src/runtime/shared-context-keys');
+var vom = require('../../src/vom');
+var byteUtil = require('../../src/vdl/byte-util');
var freshSig = [ new vdlsig.Interface({ doc: 'fresh signature' }) ];
var cachedSig = [ new vdlsig.Interface({ doc: 'cached signature'}) ];
@@ -28,13 +30,15 @@
function createProxy() {
return createMockProxy(function(message, type) {
if (type === Outgoing.REQUEST) {
- var decodedData = hexVom.decode(message);
- if (decodedData.method !== 'Signature') {
- throw new Error('Unexpected method call');
- }
- var response = new app.RpcResponse();
- response.outArgs = [freshSig];
- return hexVom.encode(response);
+ var bytes = byteUtil.hex2Bytes(message);
+ return vom.decode(bytes).then(function(decodedData) {
+ if (decodedData.method !== 'Signature') {
+ throw new Error('Unexpected method call');
+ }
+ var response = new app.RpcResponse();
+ response.outArgs = [freshSig];
+ return hexVom.encode(response);
+ });
}
throw new Error('Unexpected message type');
}, CACHE_TTL);
diff --git a/test/unit/test-server-router.js b/test/unit/test-server-router.js
index 7687a35..57e43d1 100644
--- a/test/unit/test-server-router.js
+++ b/test/unit/test-server-router.js
@@ -101,11 +101,11 @@
t.equals(responseType, Outgoing.LOOKUP_RESPONSE, 'response type');
t.equals(responseMessageId, inputMessageId, 'message id');
- var reply = hexVom.decode(responseData);
+ return hexVom.decode(responseData);
+ }).then(function(reply) {
t.ok(reply.hasOwnProperty('handle'), 'has a handle');
t.equals(reply.hasAuthorizer, true, 'has authorizer');
t.deepEquals(reply.signature, expectedSignature, 'signature');
-
t.end();
- });
+ }).catch(t.end);
});
diff --git a/test/unit/test-task-sequence.js b/test/unit/test-task-sequence.js
new file mode 100644
index 0000000..b7fac99
--- /dev/null
+++ b/test/unit/test-task-sequence.js
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+var test = require('prova');
+var Promise = require('../../src/lib/promise');
+var TaskSequence = require('../../src/lib/task-sequence');
+var promiseFor = require('../../src/lib/async-helper').promiseFor;
+
+test('task sequence', function(t) {
+ var callOrder = [];
+ function callSequence(n) {
+ var numLoop = 5 - n;
+ // Create a promise chain of 5 - n and then append to the array. We create
+ // a longer promise chain for early tasks to make sure that the task
+ // queue waits for early tasks to finish before starting later tasks. If
+ // the tasks were run in parallel, then later tasks with shorter chains
+ // will catch up to earlier promises.
+ return promiseFor(numLoop, function() {
+ return Promise.resolve();
+ }).then(function() {
+ callOrder.push(n);
+ });
+ }
+ var sequence = new TaskSequence();
+ for (var i = 0; i < 5; i++) {
+ sequence.addTask(callSequence.bind(null, i));
+ }
+ sequence.addTask(function() {
+ t.deepEqual(callOrder, [0, 1, 2, 3, 4]);
+ t.end();
+ return Promise.resolve();
+ });
+});
+
+test('task continues after a failure', function(t) {
+ var sequence = new TaskSequence();
+ sequence.addTask(function() {
+ throw new Error('Bad!!!');
+ });
+ sequence.addTask(function() {
+ t.pass('task run after failure');
+ t.end();
+ });
+});
diff --git a/test/vom/test-binary-reader.js b/test/vom/test-binary-reader.js
index 9f667db..c627b5d 100644
--- a/test/vom/test-binary-reader.js
+++ b/test/vom/test-binary-reader.js
@@ -10,24 +10,33 @@
var BinaryReader = require('./../../src/vom/binary-reader.js');
var ByteUtil = require('./../../src/vdl/byte-util.js');
+var promiseFor = require('../../src/lib/async-helper').promiseFor;
test('readByte', function(t) {
var expectedBytes = [ 0x0, 0xff, 0x10, 0x20, 0x30, 0x40 ];
var buf = new Uint8Array(expectedBytes);
var br = new BinaryReader(buf);
- for (var i = 0; i < expectedBytes.length; i++) {
- t.equals(br.readByte(), expectedBytes[i], 'index ' + i + ' differs');
+ var i = 0;
+ function checkByte(b) {
+ return br.readByte().then(function(b) {
+ t.equals(b, expectedBytes[i], 'index ' + i + ' differs');
+ i++;
+ });
}
- t.end();
+ promiseFor(expectedBytes.length, checkByte).then(t.end).catch(t.end);
});
test('readByteArray', function(t) {
var expectedBytes = [ 0x00, 0xff, 0x10, 0x20, 0x30, 0x40 ];
var br = new BinaryReader(new Uint8Array(expectedBytes));
- t.equals(ByteUtil.bytes2Hex(br.readByteArray(2)),
- ByteUtil.bytes2Hex(new Uint8Array([0x00, 0xff])));
- t.equals(ByteUtil.bytes2Hex(br.readByteArray(4)),
- ByteUtil.bytes2Hex(new Uint8Array([0x10, 0x20, 0x30, 0x40])));
- t.end();
+ br.readByteArray(2).then(function(b) {
+ t.equals(ByteUtil.bytes2Hex(b),
+ ByteUtil.bytes2Hex(new Uint8Array([0x00, 0xff])));
+ return br.readByteArray(4);
+ }).then(function (b) {
+ t.equals(ByteUtil.bytes2Hex(b),
+ ByteUtil.bytes2Hex(new Uint8Array([0x10, 0x20, 0x30, 0x40])));
+ t.end();
+ }).catch(t.end);
});
diff --git a/test/vom/test-encode-decode.js b/test/vom/test-encode-decode.js
index 3b3a4b1..f155aff 100644
--- a/test/vom/test-encode-decode.js
+++ b/test/vom/test-encode-decode.js
@@ -15,6 +15,7 @@
var typeUtil = require('./../../src/vdl/type-util.js');
var stringify = require('./../../src/vdl/stringify.js');
var canonicalize = require('./../../src/vdl/canonicalize.js');
+var Deferred = require('../../src/lib/deferred');
var ByteArrayMessageWriter = require(
'./../../src/vom/byte-array-message-writer.js');
@@ -1033,43 +1034,71 @@
expectedOutput: new (registry.lookupOrCreateConstructor(types.BOOL))(true)
},
];
- for (var i = 0; i < tests.length; i++) {
- var test = tests[i];
-
+ var promises = [];
+ function runTestCase(test, useCb) {
var messageWriter = new ByteArrayMessageWriter();
var encoder = new Encoder(messageWriter);
encoder.encode(test.v, test.t); // encode to messageWriter
var messageReader = new ByteArrayMessageReader(messageWriter.getBytes());
+
var decoder = new Decoder(messageReader);
- var result = decoder.decode(); // decode the written bytes
- var resultStr = stringify(result);
- var expected = test.expectedOutput || test.v;
- var expectedStr = stringify(expected);
- t.equals(resultStr, expectedStr, test.n + ' - decode value match');
-
- // Then validate that we were given a canonicalized value.
- // Note that some results are native post-decode; if so, use types.JSVALUE.
- var resulttype = types.JSVALUE;
- if (typeUtil.isTyped(result)) {
- resulttype = result._type;
+ if (useCb) {
+ var def = new Deferred();
+ decoder.decode(function(err, result) {
+ def.resolve();
+ if (err) {
+ return t.fail(test.n + ' (cb) failed with ' + err.stack);
+ }
+ handleResult(result);
+ });
+ promises.push(def.promise);
+ } else {
+ promises.push(decoder.decode().then(handleResult, function(err) {
+ t.fail(test.n + ' (promise) failed with ' + err.stack);
+ }));
}
- t.deepEqual(
- canonicalize.reduce(result, resulttype),
- expected,
- test.n + ' - decode value validation'
- );
- // If given a type, check that the decoded object's type matches it.
- // TODO(bprosnitz) Even if test.t isn't defined, we should still know what
- // the expected type ought to be.
- if (test.t) {
- var resulttypeStr = stringify(resulttype);
- var expectedtypeStr = stringify(canonicalize.type(test.t));
- t.equals(resulttypeStr, expectedtypeStr, test.n + ' - decode type match');
+
+ function handleResult(result) {
+ var asyncType = useCb ? ' (cb)' : ' (promise)';
+ var resultStr = stringify(result);
+ var expected = test.expectedOutput || test.v;
+ var expectedStr = stringify(expected);
+ t.equals(resultStr, expectedStr, test.n + asyncType +
+ ' - decode value match');
+
+ // Then validate that we were given a canonicalized value.
+ // Note that some results are native post-decode; if so, use
+ // types.JSVALUE.
+ var resultType = types.JSVALUE;
+ if (typeUtil.isTyped(result)) {
+ resultType = result._type;
+ }
+ t.deepEqual(
+ canonicalize.reduce(result, resultType),
+ expected,
+ test.n + asyncType + ' - decode value validation'
+ );
+
+ // If given a type, check that the decoded object's type matches it.
+ // TODO(bprosnitz) Even if test.t isn't defined, we should still know
+ // what the expected type ought to be.
+ if (test.t) {
+ var resultTypeStr = stringify(resultType);
+ var expectedTypeStr = stringify(canonicalize.type(test.t));
+ t.equals(resultTypeStr, expectedTypeStr, test.n + asyncType +
+ ' - decode type match');
+ }
}
}
- t.end();
+ for (var i = 0; i < tests.length; i++) {
+ runTestCase(tests[i], false);
+ runTestCase(tests[i], true);
+ }
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
test('encode error cases', function(t) {
diff --git a/test/vom/test-native-type.js b/test/vom/test-native-type.js
index d76a2ad..442a0f0 100644
--- a/test/vom/test-native-type.js
+++ b/test/vom/test-native-type.js
@@ -32,20 +32,22 @@
// TODO(bprosnitz) Implement native type guessing and enable this test.
test('date - test encoding and decoding without type',
function(t) {
- var result = encodeDecodeDate();
- t.ok(result instanceof Date, 'Decoded date should be a date object');
- var diff = Math.abs(expectedDate - result);
- t.ok(diff < 1, 'Should decode to the expected date');
- t.end();
+ encodeDecodeDate().then(function(result) {
+ t.ok(result instanceof Date, 'Decoded date should be a date object');
+ var diff = Math.abs(expectedDate - result);
+ t.ok(diff < 1, 'Should decode to the expected date');
+ t.end();
+ }).catch(t.end);
});
test('date - test encoding and decoding with type',
function(t) {
- var result = encodeDecodeDate(Time.prototype._type);
- t.ok(result instanceof Date, 'Decoded date should be a date object');
- t.equal(result.getTime(), expectedDate.getTime(),
- 'Should decode to the expected date');
- t.end();
+ encodeDecodeDate(Time.prototype._type).then(function(result) {
+ t.ok(result instanceof Date, 'Decoded date should be a date object');
+ t.equal(result.getTime(), expectedDate.getTime(),
+ 'Should decode to the expected date');
+ t.end();
+ }).catch(t.end);
});
test('date - test fromWireValue', function(t) {
diff --git a/test/vom/test-raw-vom-compatibility.js b/test/vom/test-raw-vom-compatibility.js
index 946c416..f61285e 100644
--- a/test/vom/test-raw-vom-compatibility.js
+++ b/test/vom/test-raw-vom-compatibility.js
@@ -12,6 +12,7 @@
var RawVomWriter = require('./../../src/vom/raw-vom-writer');
var RawVomReader = require('./../../src/vom/raw-vom-reader');
var ByteUtil = require('./../../src/vdl/byte-util.js');
+var Promise = require('../../src/lib/promise');
var testTypes = {
UINT: {
@@ -139,14 +140,18 @@
});
test('Raw reader compatibility', function(t) {
- for (var i = 0; i < tests.length; i++) {
- var test = tests[i];
-
+ var promises = [];
+ function runTest(test) {
var rr = new RawVomReader(ByteUtil.hex2Bytes(test.hexString));
- var result = rr[test.type.read]();
-
- t.equal(ByteUtil.bytes2Hex(result), ByteUtil.bytes2Hex(test.val),
- 'type: ' + test.type.name + ' input: ' + test.hexString);
+ promises.push(rr[test.type.read]().then(function(result) {
+ t.equal(ByteUtil.bytes2Hex(result), ByteUtil.bytes2Hex(test.val),
+ 'type: ' + test.type.name + ' input: ' + test.hexString);
+ }));
}
- t.end();
+ for (var i = 0; i < tests.length; i++) {
+ runTest(tests[i]);
+ }
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
diff --git a/test/vom/test-raw-vom-recoverability.js b/test/vom/test-raw-vom-recoverability.js
index a37fbaf..446455c 100644
--- a/test/vom/test-raw-vom-recoverability.js
+++ b/test/vom/test-raw-vom-recoverability.js
@@ -11,6 +11,7 @@
var BigInt = require('./../../src/vdl/big-int.js');
var RawVomWriter = require('./../../src/vom/raw-vom-writer');
var RawVomReader = require('./../../src/vom/raw-vom-reader');
+var Promise = require('../../src/lib/promise');
test('Reading and writing big uint', function(t) {
var testVals = [
@@ -29,15 +30,21 @@
new Uint8Array([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]))
];
- for (var i = 0; i < testVals.length; i++) {
- var testVal = testVals[i];
+ var promises = [];
+ function runTest(testVal) {
var rw = new RawVomWriter();
rw.writeUint(testVal);
var rr = new RawVomReader(rw.getBytes());
- var result = rr.readBigUint();
- t.ok(testVal.equals(result), 'expected ' + testVal + ' got ' + result);
+ promises.push(rr.readBigUint().then(function(result) {
+ t.ok(testVal.equals(result), 'expected ' + testVal + ' got ' + result);
+ }));
}
- t.end();
+ for (var i = 0; i < testVals.length; i++) {
+ runTest(testVals[i]);
+ }
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
test('Reading and writing big int', function(t) {
@@ -67,15 +74,22 @@
new Uint8Array([0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]))
];
- for (var i = 0; i < testVals.length; i++) {
- var testVal = testVals[i];
+ var promises = [];
+ function runTest(testVal) {
var rw = new RawVomWriter();
rw.writeInt(testVal);
var rr = new RawVomReader(rw.getBytes());
- var result = rr.readBigInt();
- t.ok(testVal.equals(result), 'expected ' + testVal + ' got ' + result);
+ promises.push(rr.readBigInt().then(function(result) {
+ t.ok(testVal.equals(result), 'expected ' + testVal + ' got ' + result);
+ }));
}
- t.end();
+
+ for (var i = 0; i < testVals.length; i++) {
+ runTest(testVals[i]);
+ }
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
test('Reading and writing uint', function(t) {
@@ -89,15 +103,22 @@
0xffffffff
];
- for (var i = 0; i < testVals.length; i++) {
- var testVal = testVals[i];
+ var promises = [];
+ function runTest(testVal) {
var rw = new RawVomWriter();
rw.writeUint(testVal);
var rr = new RawVomReader(rw.getBytes());
- var result = rr.readUint();
- t.equals(result, testVal, 'expected ' + testVal + ' got ' + result);
+ promises.push(rr.readUint().then(function(result) {
+ t.equals(result, testVal, 'expected ' + testVal + ' got ' + result);
+ }));
}
- t.end();
+
+ for (var i = 0; i < testVals.length; i++) {
+ runTest(testVals[i]);
+ }
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
test('Reading and writing int', function(t) {
@@ -117,15 +138,21 @@
-0x80000000
];
- for (var i = 0; i < testVals.length; i++) {
- var testVal = testVals[i];
+ var promises = [];
+ function runTest(testVal) {
var rw = new RawVomWriter();
rw.writeInt(testVal);
var rr = new RawVomReader(rw.getBytes());
- var result = rr.readInt();
- t.equals(result, testVal, 'expected ' + testVal + ' got ' + result);
+ promises.push(rr.readInt().then(function(result) {
+ t.equals(result, testVal, 'expected ' + testVal + ' got ' + result);
+ }));
}
- t.end();
+ for (var i = 0; i < testVals.length; i++) {
+ runTest(testVals[i]);
+ }
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
test('Reading and writing float', function(t) {
@@ -143,15 +170,21 @@
Number.POSITIVE_INFINITY
];
- for (var i = 0; i < testVals.length; i++) {
- var testVal = testVals[i];
+ var promises = [];
+ function runTest(testVal) {
var rw = new RawVomWriter();
rw.writeFloat(testVal);
var rr = new RawVomReader(rw.getBytes());
- var result = rr.readFloat();
- t.equals(result, testVal, 'expected ' + testVal + ' got ' + result);
+ promises.push(rr.readFloat().then(function(result) {
+ t.equals(result, testVal, 'expected ' + testVal + ' got ' + result);
+ }));
}
- t.end();
+ for (var i = 0; i < testVals.length; i++) {
+ runTest(testVals[i]);
+ }
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
test('Reading and writing string', function(t) {
@@ -161,15 +194,22 @@
'ଆ➓龥𐇐𒅑'
];
- for (var i = 0; i < testVals.length; i++) {
- var testVal = testVals[i];
+ var promises = [];
+ function runTest(testVal) {
var rw = new RawVomWriter();
rw.writeString(testVal);
var rr = new RawVomReader(rw.getBytes());
- var result = rr.readString();
- t.equals(result, testVal, 'expected ' + testVal + ' got ' + result);
+ promises.push(rr.readString().then(function(result) {
+ t.equals(result, testVal, 'expected ' + testVal + ' got ' + result);
+ }));
}
- t.end();
+
+ for (var i = 0; i < testVals.length; i++) {
+ runTest(testVals[i]);
+ }
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
test('Reading and writing bool', function(t) {
@@ -178,15 +218,21 @@
false
];
- for (var i = 0; i < testVals.length; i++) {
- var testVal = testVals[i];
+ var promises = [];
+ function runTest(testVal) {
var rw = new RawVomWriter();
rw.writeBool(testVal);
var rr = new RawVomReader(rw.getBytes());
- var result = rr.readBool();
+ promises.push(rr.readBool().then(function(result) {
t.equals(result, testVal, 'expected ' + testVal + ' got ' + result);
+ }));
}
- t.end();
+ for (var i = 0; i < testVals.length; i++) {
+ runTest(testVals[i]);
+ }
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
test('Reading and writing multiple values', function(t) {
@@ -197,9 +243,17 @@
rw.writeUint(8);
var rr = new RawVomReader(rw.getBytes());
- t.equals(rr.readString(), 'test');
- t.equals(rr.readFloat(), 9.4);
- t.equals(rr.readInt(), -4);
- t.equals(rr.readUint(), 8);
- t.end();
+ rr.readString().then(function(val) {
+ t.equals(val, 'test');
+ return rr.readFloat();
+ }).then(function(val) {
+ t.equals(val, 9.4);
+ return rr.readInt();
+ }).then(function(val) {
+ t.equals(val, -4);
+ return rr.readUint();
+ }).then(function(val) {
+ t.equals(val, 8);
+ t.end();
+ }).catch(t.end);
});
diff --git a/test/vom/test-stream-reader.js b/test/vom/test-stream-reader.js
new file mode 100644
index 0000000..0e43391
--- /dev/null
+++ b/test/vom/test-stream-reader.js
@@ -0,0 +1,136 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+/**
+ * @fileoverview Tests for stream reader.
+ */
+
+var test = require('prova');
+
+var StreamReader = require('./../../src/vom/stream-reader.js');
+var ByteUtil = require('./../../src/vdl/byte-util.js');
+
+test('readByte with data already loaded', function(t) {
+ var expectedBytes = [ 0x0, 0xff, 0x10, 0x20, 0x30, 0x40 ];
+ var buf = new Uint8Array(expectedBytes);
+
+ var sr = new StreamReader();
+ sr.addBytes(buf);
+ var i = 0;
+ function checkByte(b) {
+ t.equals(b, expectedBytes[i], 'index ' + i + ' differs');
+ i++;
+ if (i < expectedBytes.length) {
+ return sr.readByte().then(checkByte);
+ } else {
+ t.end();
+ }
+ }
+ sr.readByte().then(checkByte).catch(t.end);
+});
+
+test('readByteArray with data already loaded', function(t) {
+ var expectedBytes = [ 0x00, 0xff, 0x10, 0x20, 0x30, 0x40 ];
+ var buf = new Uint8Array(expectedBytes);
+ var sr = new StreamReader();
+ sr.addBytes(buf);
+ sr.readByteArray(2).then(function(b) {
+ t.equals(ByteUtil.bytes2Hex(b),
+ ByteUtil.bytes2Hex(new Uint8Array([0x00, 0xff])));
+ return sr.readByteArray(4);
+ }).then(function(b) {
+ t.equals(ByteUtil.bytes2Hex(b),
+ ByteUtil.bytes2Hex(new Uint8Array([0x10, 0x20, 0x30, 0x40])));
+ t.end();
+ }).catch(t.end);
+});
+
+test('read after close returns error', function(t) {
+ var sr = new StreamReader();
+ sr.close();
+ sr.readByte().then(function() {
+ t.fail('should not have returned a value');
+ t.end();
+ }, function(err) {
+ t.ok(err);
+ t.end();
+ });
+});
+
+test('read byte before data is set', function(t) {
+ var expectedBytes = [ 0x0, 0xff, 0x10, 0x20, 0x30, 0x40 ];
+ var buf = new Uint8Array(expectedBytes);
+
+ var sr = new StreamReader();
+ var i = 0;
+ function checkByte(b) {
+ t.equals(b, expectedBytes[i], 'index ' + i + ' differs');
+ i++;
+ if (i < expectedBytes.length) {
+ return sr.readByte().then(checkByte);
+ } else {
+ t.end();
+ }
+ }
+
+ // Read before there is data.
+ sr.readByte().then(checkByte).catch(t.end);
+
+ sr.addBytes(buf);
+});
+
+test('readByteArray before data is set', function(t) {
+ var expectedBytes = [ 0x00, 0xff, 0x10, 0x20, 0x30, 0x40 ];
+ var buf = new Uint8Array(expectedBytes);
+ var sr = new StreamReader();
+ sr.readByteArray(2).then(function(b) {
+ t.equals(ByteUtil.bytes2Hex(b),
+ ByteUtil.bytes2Hex(new Uint8Array([0x00, 0xff])));
+ return sr.readByteArray(4);
+ }).then(function (b) {
+ t.equals(ByteUtil.bytes2Hex(b),
+ ByteUtil.bytes2Hex(new Uint8Array([0x10, 0x20, 0x30, 0x40])));
+ t.end();
+ }).catch(t.end);
+ sr.addBytes(buf);
+});
+
+test('readByteArray with multiple chunks', function(t) {
+ var expectedBytes = [ 0x00, 0xff, 0x10, 0x20, 0x30, 0x40 ];
+ var buf1 = new Uint8Array(expectedBytes.slice(0, 3));
+ var buf2 = new Uint8Array(expectedBytes.slice(3));
+ var sr = new StreamReader();
+ sr.addBytes(buf1);
+ sr.readByteArray(2).then(function(b) {
+ t.equals(ByteUtil.bytes2Hex(b),
+ ByteUtil.bytes2Hex(new Uint8Array([0x00, 0xff])));
+ return sr.readByteArray(4);
+ }).then(function (b) {
+ t.equals(ByteUtil.bytes2Hex(b),
+ ByteUtil.bytes2Hex(new Uint8Array([0x10, 0x20, 0x30, 0x40])));
+ t.end();
+ }).catch(t.end);
+ sr.addBytes(buf2);
+});
+
+test('peekByte doesn\'t consume', function(t) {
+ var expectedBytes = [ 0x00 ];
+ var buf1 = new Uint8Array(expectedBytes);
+ var sr = new StreamReader();
+ sr.peekByte().then(function(b) {
+ return sr.peekByte();
+ }).then(function(b) {
+ t.equals(ByteUtil.bytes2Hex(new Uint8Array([b])), '00');
+ t.end();
+ }).catch(t.end);
+ sr.addBytes(buf1);
+});
+
+test('readByteArray without enough data and eof', function(t) {
+ var sr = new StreamReader([0x00]);
+ sr.readByteArray(4).then(function() {
+ t.fail('should not have succeeded');
+ }, function(err) {}).then(t.end);
+ sr.close();
+});
diff --git a/test/vom/test-type-encoding.js b/test/vom/test-type-encoding.js
index 4e9e47a..f1b49f3 100644
--- a/test/vom/test-type-encoding.js
+++ b/test/vom/test-type-encoding.js
@@ -11,6 +11,7 @@
var stringify = require('./../../src/vdl/stringify.js');
var types = require('./../../src/vdl/types.js');
var kind = require('./../../src/vdl/kind.js');
+var Promise = require('./../../src/lib/promise');
var TypeEncoder = require('./../../src/vom/type-encoder.js');
var TypeDecoder = require('./../../src/vom/type-decoder.js');
@@ -25,9 +26,11 @@
* @constructor
*/
function TypeMessageReader(bytes) {
+ var header = bytes[0];
this.rawReader = new RawVomReader(bytes);
- var header = this.rawReader._readRawBytes(1);
- if (header[0] !== 0x80) {
+ // consume the header byte.
+ this.rawReader._readRawBytes(1);
+ if (header !== 0x80) {
throw new Error('Improperly formatted bytes. Must start with 0x80');
}
}
@@ -36,32 +39,33 @@
* Read the next type message.
*/
TypeMessageReader.prototype.nextMessage = function(typeDecoder) {
- var typeId;
- try {
- typeId = this.rawReader.readInt();
- } catch (error) {
- // Hopefully EOF.
- // TODO(bprosnitz) Make this a more accurate check.
+ var reader = this;
+ return this.rawReader.readInt().then(function(typeId) {
+ if (typeId >= 0) {
+ throw new Error('Value messages not implemented.');
+ }
+ return reader.rawReader.readUint().then(function(len) {
+ return reader.rawReader._readRawBytes(len);
+ }).then(function(bytes) {
+ return {
+ typeId: -typeId,
+ messageBytes: bytes,
+ };
+ });
+ }, function(err) {
return null;
- }
- if (typeId < 0) {
- var len = this.rawReader.readUint();
- var body = this.rawReader._readRawBytes(len);
- return {
- typeId: -typeId,
- messageBytes: body
- };
- }
- throw new Error('Value messages not implemented.');
+ });
};
test('type encoding encode and decode (optional fields filled)', function(t) {
var tests = require('../vdl/type-test-cases.js');
-
+ var promises = [];
for (var i = 0; i < tests.length; i++) {
- encodeDecodeType(t, tests[i].type);
+ promises.push(encodeDecodeType(t, tests[i].type));
}
- t.end();
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
test('type encoding encode and decode (optional fields omitted)',
@@ -159,10 +163,13 @@
}
];
- for (var j = 0; j < tests.length; j++) {
- encodeDecodeType(t, tests[j].test, tests[j].expected);
+ var promises = [];
+ for (var i = 0; i < tests.length; i++) {
+ promises.push(encodeDecodeType(t, tests[i].test, tests[i].expected));
}
- t.end();
+ Promise.all(promises).then(function() {
+ t.end();
+ }, t.end);
});
var UPPER_LOOP_LIMIT = 100;
@@ -176,21 +183,24 @@
var typeDecoder = new TypeDecoder();
var reader = new TypeMessageReader(writer.getBytes());
- for (var j = 0; j < UPPER_LOOP_LIMIT; j++) {
- var message = reader.nextMessage();
- if (message === null) {
- break;
- }
- typeDecoder.defineType(message.typeId, message.messageBytes);
+ var j = 1;
+ return readMessage();
+ function readMessage() {
+ return reader.nextMessage().then(function(message) {
+ if (message === null) {
+ var resultType = typeDecoder.lookupType(id);
+ var resultStr = stringify(resultType);
+ var expectedStr = stringify(expected);
+ return t.equals(resultStr, expectedStr);
+ }
+ if (j === UPPER_LOOP_LIMIT) {
+ return t.fail('read too many messages');
+ }
+ j++;
+ return typeDecoder.defineType(message.typeId, message.messageBytes).
+ then(readMessage);
+ });
}
- if (j === UPPER_LOOP_LIMIT) {
- t.fail('read too many messages');
- }
-
- var resultType = typeDecoder.lookupType(id);
- var resultStr = stringify(resultType);
- var expectedStr = stringify(expected);
- t.equals(resultStr, expectedStr);
}
// This tests a subset of potential type encoding errors.
diff --git a/test/vom/test-vdl-arith.js b/test/vom/test-vdl-arith.js
index 9ec3d9f..9ec9807 100644
--- a/test/vom/test-vdl-arith.js
+++ b/test/vom/test-vdl-arith.js
@@ -6,6 +6,7 @@
var vom = require('../../src/vom');
var ifaceSigType =
require('../../src/gen-vdl/v.io/v23/vdlroot/signature').Interface;
+var Promise = require('../../src/lib/promise');
test('import paths work', function(assert) {
// We just need to require the arith package to make sure that
@@ -21,15 +22,9 @@
test('method signature encode-decode match', function(assert) {
var arith = require('../vdl-out/v.io/x/ref/lib/vdl/testdata/arith');
- var writer;
- var encoder;
- var reader;
- var decoder;
- var sigEncode;
- var sigDecode;
-
// For every service signature defined...
var serviceNames = ['Arith', 'Calculator'];
+ var promises = [];
serviceNames.forEach(function(serviceName) {
if (!arith.hasOwnProperty(serviceName)) {
assert.fail('Expected interface ' + serviceName + ' to be defined');
@@ -39,39 +34,41 @@
_serviceDescription;
// Encode the signature using the type defined in VDL-generated .js file
- writer = new vom.ByteArrayMessageWriter();
- encoder = new vom.Encoder(writer);
+ var writer = new vom.ByteArrayMessageWriter();
+ var encoder = new vom.Encoder(writer);
encoder.encode(signature, ifaceSigType.prototype._type);
- sigEncode = writer.getBytes();
+ var sigEncode = writer.getBytes();
// Decode the signature.
- reader = new vom.ByteArrayMessageReader(sigEncode);
- decoder = new vom.Decoder(reader);
- sigDecode = decoder.decode();
+ var reader = new vom.ByteArrayMessageReader(sigEncode);
+ var decoder = new vom.Decoder(reader);
+ promises.push(decoder.decode().then(function(sigDecode) {
+ // Ensure that what was decoded matches the original signature deeply.
+ assert.deepEqual(sigDecode, signature, serviceName + ' signature match');
- // Ensure that what was decoded matches the original signature deeply.
- assert.deepEqual(sigDecode, signature, serviceName + ' signature match');
+ // TODO The signature type should be attached to the generated signature
+ // This is currently problematic (Issue 432), so manually attaching type
+ // for now and NOT passing the type into the encoder.
+ var wrappedSignature = new ifaceSigType(signature);
- // TODO The signature type should be attached to the generated signature
- // This is currently problematic (Issue 432), so manually attaching type
- // for now and NOT passing the type into the encoder.
- var wrappedSignature = new ifaceSigType(signature);
+ // Encode the signature as a wrapped struct.
+ var writer = new vom.ByteArrayMessageWriter();
+ var encoder = new vom.Encoder(writer);
+ encoder.encode(wrappedSignature);
+ var sigEncode = writer.getBytes();
- // Encode the signature as a wrapped struct.
- writer = new vom.ByteArrayMessageWriter();
- encoder = new vom.Encoder(writer);
- encoder.encode(wrappedSignature);
- sigEncode = writer.getBytes();
-
- // Decode the signature.
- reader = new vom.ByteArrayMessageReader(sigEncode);
- decoder = new vom.Decoder(reader);
- sigDecode = decoder.decode();
-
- assert.deepEqual(sigDecode, wrappedSignature, serviceName +
- ' wrapped signature match');
+ // Decode the signature.
+ var reader = new vom.ByteArrayMessageReader(sigEncode);
+ var decoder = new vom.Decoder(reader);
+ return decoder.decode().then(function(sigDecode) {
+ assert.deepEqual(sigDecode, wrappedSignature, serviceName +
+ ' wrapped signature match');
+ });
+ }));
});
- assert.end();
+ Promise.all(promises).then(function() {
+ assert.end();
+ }, assert.end);
});
var expectedAdvancedMathDescription =
diff --git a/test/vom/test-vom-compatibility.js b/test/vom/test-vom-compatibility.js
index 174fadf..114aa86 100644
--- a/test/vom/test-vom-compatibility.js
+++ b/test/vom/test-vom-compatibility.js
@@ -50,23 +50,25 @@
var data = util.hex2Bytes(t.hex);
var messageReader = new ByteArrayMessageReader(data);
var decoder = new Decoder(messageReader, false);
- var result = decoder.decode();
- assert.equal(stringify(result), stringify(t.value), t.name +
- ' value comparison');
- assert.deepEqual(result._type, t.value._type, t.name + ' type comparison');
- assert.deepEqual(result._type.toString(), t.typeString,
- t.name + ' type string ok');
- assert.deepEqual(result.prototype, t.value.prototype,
- t.name + ' prototype comparison');
+ decoder.decode().then(function(result) {
+ assert.equal(stringify(result), stringify(t.value), t.name +
+ ' value comparison');
+ assert.deepEqual(result._type, t.value._type, t.name +
+ ' type comparison');
+ assert.deepEqual(result._type.toString(), t.typeString,
+ t.name + ' type string ok');
+ assert.deepEqual(result.prototype, t.value.prototype,
+ t.name + ' prototype comparison');
- // Ensure that we lost no information; encode(decode(t.hex)) === t.hex.
- var messageWriter = new ByteArrayMessageWriter();
- var encoder = new Encoder(messageWriter);
- encoder.encode(result);
- var hex = util.bytes2Hex(messageWriter.getBytes());
- assert.equal(hex, t.hex, t.name + ' hex comparison');
+ // Ensure that we lost no information; encode(decode(t.hex)) === t.hex.
+ var messageWriter = new ByteArrayMessageWriter();
+ var encoder = new Encoder(messageWriter);
+ encoder.encode(result);
+ var hex = util.bytes2Hex(messageWriter.getBytes());
+ assert.equal(hex, t.hex, t.name + ' hex comparison');
- assert.end();
+ assert.end();
+ }).catch(assert.end);
});
});
@@ -165,4 +167,4 @@
}
assert.end();
});
-});
\ No newline at end of file
+});