| var util = require('util') |
| |
| var Promise = require('bluebird') |
| var syrup = require('stf-syrup') |
| var WebSocket = require('ws') |
| var uuid = require('node-uuid') |
| var EventEmitter = require('eventemitter3').EventEmitter |
| var split = require('split') |
| var adbkit = require('adbkit') |
| |
| var logger = require('../../../../util/logger') |
| var lifecycle = require('../../../../util/lifecycle') |
| var bannerutil = require('./util/banner') |
| var FrameParser = require('./util/frameparser') |
| var FrameConfig = require('./util/frameconfig') |
| var FrameStore = require('./util/framestore') |
| var BroadcastSet = require('./util/broadcastset') |
| var StateQueue = require('../../../../util/statequeue') |
| var RiskyStream = require('../../../../util/riskystream') |
| var FailCounter = require('../../../../util/failcounter') |
| |
| const FINAL_DEVICE_CAPTURE_WIDTH = 1080; |
| const FINAL_DEVICE_CAPTURE_HEIGHT = 1920; |
| |
| module.exports = syrup.serial() |
| .dependency(require('../../support/adb')) |
| .dependency(require('../../resources/minicap')) |
| .dependency(require('../util/display')) |
| .dependency(require('./options')) |
| .define(function(options, adb, minicap, display, screenOptions) { |
| var log = logger.createLogger('device:plugins:screen:stream') |
| var frameStore = new FrameStore(); |
| |
| function FrameProducer(config) { |
| EventEmitter.call(this) |
| this.actionQueue = [] |
| this.runningState = FrameProducer.STATE_STOPPED |
| this.desiredState = new StateQueue() |
| this.output = null |
| this.socket = null |
| this.pid = -1 |
| this.banner = null |
| this.parser = null |
| this.frameConfig = config |
| this.readable = false |
| this.needsReadable = false |
| this.failCounter = new FailCounter(3, 10000) |
| this.failCounter.on('exceedLimit', this._failLimitExceeded.bind(this)) |
| this.failed = false |
| this.readableListener = this._readableListener.bind(this) |
| } |
| |
| util.inherits(FrameProducer, EventEmitter) |
| |
| FrameProducer.STATE_STOPPED = 1 |
| FrameProducer.STATE_STARTING = 2 |
| FrameProducer.STATE_STARTED = 3 |
| FrameProducer.STATE_STOPPING = 4 |
| |
| FrameProducer.prototype._ensureState = function() { |
| if (this.desiredState.empty()) { |
| return |
| } |
| |
| if (this.failed) { |
| log.warn('Will not apply desired state due to too many failures') |
| return |
| } |
| |
| switch (this.runningState) { |
| case FrameProducer.STATE_STARTING: |
| case FrameProducer.STATE_STOPPING: |
| // Just wait. |
| break |
| case FrameProducer.STATE_STOPPED: |
| if (this.desiredState.next() === FrameProducer.STATE_STARTED) { |
| this.runningState = FrameProducer.STATE_STARTING |
| this._startService().bind(this) |
| .then(function(out) { |
| this.output = new RiskyStream(out) |
| .on('unexpectedEnd', this._outputEnded.bind(this)) |
| return this._readOutput(this.output.stream) |
| }) |
| .then(function() { |
| return this._waitForPid() |
| }) |
| .then(function() { |
| return this._connectService() |
| }) |
| .then(function(socket) { |
| this.parser = new FrameParser() |
| this.socket = new RiskyStream(socket) |
| .on('unexpectedEnd', this._socketEnded.bind(this)) |
| return this._readBanner(this.socket.stream) |
| }) |
| .then(function(banner) { |
| this.banner = banner |
| return this._readFrames(this.socket.stream) |
| }) |
| .then(function() { |
| this.runningState = FrameProducer.STATE_STARTED |
| this.emit('start') |
| }) |
| .catch(Promise.CancellationError, function() { |
| return this._stop() |
| }) |
| .catch(function(err) { |
| return this._stop().finally(function() { |
| this.failCounter.inc() |
| this.emit('error', err) |
| }) |
| }) |
| .finally(function() { |
| this._ensureState() |
| }) |
| } |
| else { |
| setImmediate(this._ensureState.bind(this)) |
| } |
| break |
| case FrameProducer.STATE_STARTED: |
| if (this.desiredState.next() === FrameProducer.STATE_STOPPED) { |
| this.runningState = FrameProducer.STATE_STOPPING |
| this._stop().finally(function() { |
| this._ensureState() |
| }) |
| } |
| else { |
| setImmediate(this._ensureState.bind(this)) |
| } |
| break |
| } |
| } |
| |
| FrameProducer.prototype.start = function() { |
| log.info('Requesting frame producer to start') |
| this.desiredState.push(FrameProducer.STATE_STARTED) |
| this._ensureState() |
| } |
| |
| FrameProducer.prototype.stop = function() { |
| log.info('Requesting frame producer to stop') |
| this.desiredState.push(FrameProducer.STATE_STOPPED) |
| this._ensureState() |
| } |
| |
| FrameProducer.prototype.restart = function() { |
| switch (this.runningState) { |
| case FrameProducer.STATE_STARTED: |
| case FrameProducer.STATE_STARTING: |
| this.desiredState.push(FrameProducer.STATE_STOPPED) |
| this.desiredState.push(FrameProducer.STATE_STARTED) |
| this._ensureState() |
| break |
| } |
| } |
| |
| FrameProducer.prototype.updateRotation = function(rotation) { |
| if (this.frameConfig.rotation === rotation) { |
| log.info('Keeping %d as current frame producer rotation', rotation) |
| return |
| } |
| |
| log.info('Setting frame producer rotation to %d', rotation) |
| this.frameConfig.rotation = rotation |
| this._configChanged() |
| } |
| |
| FrameProducer.prototype.updateProjection = function(width, height) { |
| if (this.frameConfig.virtualWidth === width && |
| this.frameConfig.virtualHeight === height) { |
| log.info( |
| 'Keeping %dx%d as current frame producer projection', width, height) |
| return |
| } |
| |
| log.info('Setting frame producer projection to %dx%d', width, height) |
| this.frameConfig.virtualWidth = width |
| this.frameConfig.virtualHeight = height |
| this._configChanged() |
| } |
| |
| FrameProducer.prototype.setStaticProjection = function() { |
| if (this.frameConfig.virtualWidth === FINAL_DEVICE_CAPTURE_WIDTH && |
| this.frameConfig.virtualHeight === FINAL_DEVICE_CAPTURE_HEIGHT) { |
| log.info('Keeping %dx%d as current frame producer projection', |
| FINAL_DEVICE_CAPTURE_WIDTH, FINAL_DEVICE_CAPTURE_HEIGHT) |
| return |
| } |
| |
| log.info('Setting frame producer projection to %dx%d', |
| FINAL_DEVICE_CAPTURE_WIDTH, |
| FINAL_DEVICE_CAPTURE_HEIGHT) |
| this.frameConfig.virtualWidth = FINAL_DEVICE_CAPTURE_WIDTH |
| this.frameConfig.virtualHeight = FINAL_DEVICE_CAPTURE_HEIGHT |
| this._configChanged() |
| } |
| |
| FrameProducer.prototype.nextFrame = function() { |
| var frame = null |
| var chunk |
| |
| if (this.parser) { |
| while ((frame = this.parser.nextFrame()) === null) { |
| chunk = this.socket.stream.read() |
| if (chunk) { |
| this.parser.push(chunk) |
| } |
| else { |
| this.readable = false |
| break |
| } |
| } |
| } |
| |
| return frame |
| } |
| |
| FrameProducer.prototype.needFrame = function() { |
| this.needsReadable = true |
| this._maybeEmitReadable() |
| } |
| |
| FrameProducer.prototype._configChanged = function() { |
| this.restart() |
| } |
| |
| FrameProducer.prototype._socketEnded = function() { |
| log.warn('Connection to minicap ended unexpectedly') |
| this.failCounter.inc() |
| this.restart() |
| } |
| |
| FrameProducer.prototype._outputEnded = function() { |
| log.warn('Shell keeping minicap running ended unexpectedly') |
| this.failCounter.inc() |
| this.restart() |
| } |
| |
| FrameProducer.prototype._failLimitExceeded = function(limit, time) { |
| this._stop() |
| this.failed = true |
| this.emit('error', new Error(util.format( |
| 'Failed more than %d times in %dms' |
| , limit |
| , time |
| ))) |
| } |
| |
| FrameProducer.prototype._startService = function() { |
| log.info('Launching screen service') |
| return minicap.run(util.format('-S -P %s', this.frameConfig.toString())) |
| .timeout(10000) |
| } |
| |
| FrameProducer.prototype._readOutput = function(out) { |
| out.pipe(split()).on('data', function(line) { |
| var trimmed = line.toString().trim() |
| |
| if (trimmed === '') { |
| return |
| } |
| |
| if (/ERROR/.test(line)) { |
| log.fatal('minicap error: "%s"', line) |
| return lifecycle.fatal() |
| } |
| |
| var match = /^PID: (\d+)$/.exec(line) |
| if (match) { |
| this.pid = Number(match[1]) |
| this.emit('pid', this.pid) |
| } |
| |
| log.info('minicap says: "%s"', line) |
| }.bind(this)) |
| } |
| |
| FrameProducer.prototype._waitForPid = function() { |
| if (this.pid > 0) { |
| return Promise.resolve(this.pid) |
| } |
| |
| var pidListener |
| return new Promise(function(resolve) { |
| this.on('pid', pidListener = resolve) |
| }.bind(this)).bind(this) |
| .timeout(2000) |
| .finally(function() { |
| this.removeListener('pid', pidListener) |
| }) |
| } |
| |
| FrameProducer.prototype._connectService = function() { |
| function tryConnect(times, delay) { |
| return adb.openLocal(options.serial, 'localabstract:minicap') |
| .timeout(10000) |
| .then(function(out) { |
| return out |
| }) |
| .catch(function(err) { |
| if (/closed/.test(err.message) && times > 1) { |
| return Promise.delay(delay) |
| .then(function() { |
| return tryConnect(times - 1, delay * 2) |
| }) |
| } |
| return Promise.reject(err) |
| }) |
| } |
| log.info('Connecting to minicap service') |
| return tryConnect(5, 100) |
| } |
| |
| FrameProducer.prototype._stop = function() { |
| return this._disconnectService(this.socket).bind(this) |
| .timeout(2000) |
| .then(function() { |
| return this._stopService(this.output).timeout(10000) |
| }) |
| .then(function() { |
| this.runningState = FrameProducer.STATE_STOPPED |
| this.emit('stop') |
| }) |
| .catch(function(err) { |
| // In practice we _should_ never get here due to _stopService() |
| // being quite aggressive. But if we do, well... assume it |
| // stopped anyway for now. |
| this.runningState = FrameProducer.STATE_STOPPED |
| this.emit('error', err) |
| this.emit('stop') |
| }) |
| .finally(function() { |
| this.output = null |
| this.socket = null |
| this.pid = -1 |
| this.banner = null |
| this.parser = null |
| }) |
| } |
| |
| FrameProducer.prototype._disconnectService = function(socket) { |
| log.info('Disconnecting from minicap service') |
| |
| if (!socket || socket.ended) { |
| return Promise.resolve(true) |
| } |
| |
| socket.stream.removeListener('readable', this.readableListener) |
| |
| var endListener |
| return new Promise(function(resolve) { |
| socket.on('end', endListener = function() { |
| resolve(true) |
| }) |
| |
| socket.stream.resume() |
| socket.end() |
| }) |
| .finally(function() { |
| socket.removeListener('end', endListener) |
| }) |
| } |
| |
| FrameProducer.prototype._stopService = function(output) { |
| log.info('Stopping minicap service') |
| |
| if (!output || output.ended) { |
| return Promise.resolve(true) |
| } |
| |
| var pid = this.pid |
| |
| function kill(signal) { |
| if (pid <= 0) { |
| return Promise.reject(new Error('Minicap service pid is unknown')) |
| } |
| |
| var signum = { |
| SIGTERM: -15 |
| , SIGKILL: -9 |
| }[signal] |
| |
| log.info('Sending %s to minicap', signal) |
| return Promise.all([ |
| output.waitForEnd() |
| , adb.shell(options.serial, ['kill', signum, pid]) |
| .then(adbkit.util.readAll) |
| .return(true) |
| ]) |
| .timeout(2000) |
| } |
| |
| function kindKill() { |
| return kill('SIGTERM') |
| } |
| |
| function forceKill() { |
| return kill('SIGKILL') |
| } |
| |
| function forceEnd() { |
| log.info('Ending minicap I/O as a last resort') |
| output.end() |
| return Promise.resolve(true) |
| } |
| |
| return kindKill() |
| .catch(Promise.TimeoutError, forceKill) |
| .catch(forceEnd) |
| } |
| |
| FrameProducer.prototype._readBanner = function(socket) { |
| log.info('Reading minicap banner') |
| return bannerutil.read(socket).timeout(2000) |
| } |
| |
| FrameProducer.prototype._readFrames = function(socket) { |
| this.needsReadable = true |
| socket.on('readable', this.readableListener) |
| |
| // We may already have data pending. Let the user know they should |
| // at least attempt to read frames now. |
| this.readableListener() |
| } |
| |
| FrameProducer.prototype._maybeEmitReadable = function() { |
| if (this.readable && this.needsReadable) { |
| this.needsReadable = false |
| this.emit('readable') |
| } |
| } |
| |
| FrameProducer.prototype._readableListener = function() { |
| this.readable = true |
| this._maybeEmitReadable() |
| } |
| |
| function createServer() { |
| log.info('Starting WebSocket server on port %d', screenOptions.publicPort) |
| |
| var wss = new WebSocket.Server({ |
| port: screenOptions.publicPort |
| , perMessageDeflate: false |
| }) |
| |
| var listeningListener, errorListener |
| return new Promise(function(resolve, reject) { |
| listeningListener = function() { |
| return resolve(wss) |
| } |
| |
| errorListener = function(err) { |
| return reject(err) |
| } |
| |
| wss.on('listening', listeningListener) |
| wss.on('error', errorListener) |
| }) |
| .finally(function() { |
| wss.removeListener('listening', listeningListener) |
| wss.removeListener('error', errorListener) |
| }) |
| } |
| |
| return createServer() |
| .then(function(wss) { |
| var frameProducer = new FrameProducer( |
| new FrameConfig(display.properties, display.properties)) |
| var broadcastSet = frameProducer.broadcastSet = new BroadcastSet() |
| |
| broadcastSet.on('nonempty', function() { |
| frameProducer.start() |
| }) |
| |
| broadcastSet.on('empty', function() { |
| frameProducer.stop() |
| }) |
| |
| broadcastSet.on('insert', function(id) { |
| // If two clients join a session in the middle, one of them |
| // may not release the initial size because the projection |
| // doesn't necessarily change, and the producer doesn't Getting |
| // restarted. Therefore we have to call onStart() manually |
| // if the producer is already up and running. |
| switch (frameProducer.runningState) { |
| case FrameProducer.STATE_STARTED: |
| broadcastSet.get(id).onStart(frameProducer) |
| break |
| } |
| }) |
| |
| display.on('rotationChange', function(newRotation) { |
| frameProducer.updateRotation(newRotation) |
| }) |
| |
| frameProducer.on('start', function() { |
| broadcastSet.keys().map(function(id) { |
| return broadcastSet.get(id).onStart(frameProducer) |
| }) |
| }) |
| |
| frameProducer.on('readable', function next() { |
| var frame = frameProducer.nextFrame() |
| if (frame) { |
| Promise.settle([broadcastSet.keys().map(function(id) { |
| return broadcastSet.get(id).onFrame(frame, id) |
| })]).then(next) |
| } |
| else { |
| frameProducer.needFrame() |
| } |
| }) |
| |
| frameProducer.on('error', function(err) { |
| log.fatal('Frame producer had an error', err.stack) |
| lifecycle.fatal() |
| }) |
| |
| wss.on('connection', function(ws) { |
| var id = uuid.v4() |
| |
| function wsStartNotifier() { |
| return new Promise(function(resolve, reject) { |
| frameProducer.banner.wsId = id; |
| |
| var message = util.format('start %s', |
| JSON.stringify(frameProducer.banner)) |
| |
| switch (ws.readyState) { |
| case WebSocket.OPENING: |
| // This should never happen. |
| log.warn('Unable to send banner to OPENING client "%s"', id) |
| break |
| case WebSocket.OPEN: |
| // This is what SHOULD happen. |
| ws.send(message, function(err) { |
| return err ? reject(err) : resolve() |
| }) |
| break |
| case WebSocket.CLOSING: |
| // Ok, a 'close' event should remove the client from the set |
| // soon. |
| break |
| case WebSocket.CLOSED: |
| // This should never happen. |
| log.warn('Unable to send banner to CLOSED client "%s"', id) |
| broadcastSet.remove(id) |
| break |
| } |
| }) |
| } |
| |
| function wsFrameNotifier(frame, id) { |
| return new Promise(function(resolve, reject) { |
| switch (ws.readyState) { |
| case WebSocket.OPENING: |
| // This should never happen. |
| return reject(new Error(util.format( |
| 'Unable to send frame to OPENING client "%s"', id))) |
| case WebSocket.OPEN: |
| var fileName = frameStore.storeFrame(frame, id); |
| |
| var message = util.format( |
| 'nextImgId %s' |
| , fileName |
| ) |
| |
| // Send the next img file id first |
| ws.send(message, function (err) { |
| return err ? reject(err) : resolve() |
| }) |
| |
| // This is what SHOULD happen. |
| ws.send(frame, { |
| binary: true |
| }, function(err) { |
| return err ? reject(err) : resolve() |
| }) |
| return |
| case WebSocket.CLOSING: |
| // Ok, a 'close' event should remove the client from the set |
| // soon. |
| return |
| case WebSocket.CLOSED: |
| // This should never happen. |
| broadcastSet.remove(id) |
| return reject(new Error(util.format( |
| 'Unable to send frame to CLOSED client "%s"', id))) |
| } |
| }) |
| } |
| |
| ws.on('message', function(data) { |
| var match = /^(on|off|(size) ([0-9]+)x([0-9]+))$/.exec(data) |
| if (match) { |
| switch (match[2] || match[1]) { |
| case 'on': |
| broadcastSet.insert(id, { |
| onStart: wsStartNotifier |
| , onFrame: wsFrameNotifier |
| }) |
| break |
| case 'off': |
| broadcastSet.remove(id) |
| break |
| case 'size': |
| frameProducer.setStaticProjection() |
| break |
| } |
| } |
| }) |
| |
| ws.on('close', function() { |
| broadcastSet.remove(id) |
| }) |
| }) |
| |
| lifecycle.observe(function() { |
| wss.close() |
| }) |
| |
| lifecycle.observe(function() { |
| frameProducer.stop() |
| }) |
| |
| return frameProducer |
| }) |
| }) |