blob: 8e1ba0a349167cebd7fc0e22114da7029b4197ad [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
var Deferred = require('../lib/deferred');
var byteUtil = require('../vdl/byte-util');
var TaskSequence = require('../lib/task-sequence');
module.exports = StreamReader;
/**
* StreamReader provides a Reader api over a stream of bytes
* @private
* @constructor
*/
function StreamReader() {
this._bufs = [];
this._closed = false;
this._sequence = new TaskSequence();
this._bytesAvailableDef = new Deferred();
}
/**
* Adds a set of bytes to the stream
* @param {Uint8Array} bytes The bytes to add
*/
StreamReader.prototype.addBytes = function(bytes) {
if (bytes.length === 0) {
return;
}
this._bufs.push(bytes);
if (this._bytesAvailableDef) {
this._bytesAvailableDef.resolve();
}
this._bytesAvailableDef = null;
};
/**
* Closes the stream reader, which forces readers to
* consume all the bytes left.
*/
StreamReader.prototype.close = function() {
this._closed = true;
if (this._bytesAvailableDef) {
this._bytesAvailableDef.resolve();
}
this._bytesAvailableDef = null;
};
StreamReader.prototype._waitForData = function() {
if (this._hasBytes() || this._closed) {
return Promise.resolve();
}
this._bytesAvailableDef = new Deferred();
return this._bytesAvailableDef.promise;
};
StreamReader.prototype._hasBytes = function() {
return this._bufs.length > 0;
};
/**
* Reads a byte from the stream
* @return {Promise<number>}
*/
StreamReader.prototype.readByte = function() {
var reader = this;
var def = new Deferred();
function readByte() {
return reader._waitForData().then(function() {
if (!reader._hasBytes()) {
return Promise.reject(
new Error('Failed to read byte, eof is ' + reader._closed));
}
var byte = reader._bufs[0][0];
if (reader._bufs[0].length === 1) {
reader._bufs.shift();
} else {
reader._bufs[0] = reader._bufs[0].subarray(1);
}
return byte;
}).then(function(b) {
def.resolve(b);
}, function(err) {
def.reject(err);
});
}
reader._sequence.addTask(readByte);
return def.promise;
};
/**
* Peeks a byte from the stream
* @return {Promise<number>}
*/
StreamReader.prototype.peekByte = function() {
var reader = this;
var def = new Deferred();
function peekByte() {
return reader._waitForData().then(function() {
if (!reader._hasBytes()) {
return Promise.reject(
new Error('Failed to read byte, eof is ' + reader._closed));
}
return reader._bufs[0][0];
}).then(function(b) {
def.resolve(b);
}, function(err) {
def.reject(err);
});
}
reader._sequence.addTask(peekByte);
return def.promise;
};
/**
* Reads a byte array from the stream
* @param {number} amt The number to read.
* @return {Promise<Uint8Array>} A promise that will be resolved
* with the result.
*/
StreamReader.prototype.readByteArray = function(amt) {
var reader = this;
var def = new Deferred();
var pos = 0;
var buf = new Uint8Array(amt);
var bytesNeeded = amt;
function readByteArray() {
return reader._waitForData().then(function() {
var currentBuf = reader._bufs[0];
while (bytesNeeded > 0 && currentBuf) {
if (currentBuf.length <= bytesNeeded) {
// Consume the whole array.
buf.set(currentBuf, pos);
pos += currentBuf.length;
bytesNeeded -= currentBuf.length;
reader._bufs.shift();
currentBuf = reader._bufs[0];
} else {
buf.set(currentBuf.subarray(0, bytesNeeded), pos);
pos += bytesNeeded;
reader._bufs[0] = currentBuf.subarray(bytesNeeded);
bytesNeeded = 0;
}
}
if (bytesNeeded === 0) {
return buf;
}
if (reader._closed) {
return Promise.reject(
new Error('Failed to read ' + amt + 'bytes, eof is true'));
}
return readByteArray();
}).then(function(arr) {
return def.resolve(arr);
}, function(err) {
return def.reject(err);
});
}
this._sequence.addTask(readByteArray);
return def.promise;
};
StreamReader.prototype.getHexBytes = function() {
return this._bufs.map(byteUtil.bytes2Hex).join('');
};