blob: b0df42141b7c6834d4ba69413f27df0e5790d970 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
var Incoming = require('./message-type').Incoming;
var emitStreamError = require('../lib/emit-stream-error');
var vError = require('../gen-vdl/v.io/v23/verror');
var SharedContextKeys = require('../runtime/shared-context-keys');
var BlessingsId =
require('../gen-vdl/v.io/x/ref/services/wspr/internal/principal').BlessingsId;
var runtimeFromContext = require('../runtime/runtime-from-context');
var TaskSequence = require('../lib/task-sequence');
var Promise = require('../lib/promise');
var vom = require('../vom');
var byteUtil = require('../vdl/byte-util');
module.exports = Handler;
/*
* A simple incoming stream handler that handles incoming response, error
* and close messages and queues them on the given stream object.
* @param {Stream} Stream instance
* @constructor
*/
function Handler(ctx, stream) {
this._ctx = ctx;
this._stream = stream;
this._controller = ctx.value(SharedContextKeys.RUNTIME)._controller;
this._pendingBlessings = [];
this._tasks = new TaskSequence();
}
Handler.prototype.handleResponse = function(type, data) {
switch (type) {
case Incoming.STREAM_RESPONSE:
this._tasks.addTask(this.handleStreamData.bind(this, data));
return true;
case Incoming.STREAM_CLOSE:
this._tasks.addTask(this.handleStreamClose.bind(this, data));
return true;
case Incoming.ERROR_RESPONSE:
this._tasks.addTask(this.handleStreamError.bind(this, data));
return true;
}
// can't handle the given type
return false;
};
Handler.prototype.handleStreamData = function(data) {
try {
data = byteUtil.hex2Bytes(data);
} catch (e) {
emitStreamError(this._stream,
new vError.InternalError(this._ctx,
'Failed to decode result: ', e));
return Promise.resolve();
}
var handler = this;
return vom.decode(data).then(function(data) {
if (data instanceof BlessingsId) {
var runtime = runtimeFromContext(handler._ctx);
runtime.blessingsManager.blessingsFromId(data)
.then(function(blessings) {
blessings.retain();
handler._stream._queueRead(blessings);
});
} else {
handler._stream._queueRead(data);
}
}, function(e) {
emitStreamError(handler._stream,
new vError.InternalError(
handler._ctx, 'Failed to decode result: ', e));
});
};
Handler.prototype.handleStreamClose = function() {
this.cleanupBlessings();
this._stream._queueClose();
return Promise.resolve();
};
Handler.prototype.handleStreamError = function(data) {
emitStreamError(this._stream, data);
return this.handleStreamClose();
};
Handler.prototype.cleanupBlessings = function() {
for (var i = 0; i < this._pendingBlessings; i++) {
this._pendingBlessings[i].release();
}
};