blob: 3e4c1fa78160ede96377b93de01fa0f9e2b59dfd [file] [log] [blame]
var util = require('util')
var Promise = require('bluebird')
var syrup = require('stf-syrup')
var WebSocket = require('ws')
var uuid = require('node-uuid')
var EventEmitter = require('eventemitter3').EventEmitter
var split = require('split')
var adbkit = require('adbkit')
var logger = require('../../../../util/logger')
var lifecycle = require('../../../../util/lifecycle')
var bannerutil = require('./util/banner')
var FrameParser = require('./util/frameparser')
var FrameConfig = require('./util/frameconfig')
var FrameStore = require('./util/framestore')
var BroadcastSet = require('./util/broadcastset')
var StateQueue = require('../../../../util/statequeue')
var RiskyStream = require('../../../../util/riskystream')
var FailCounter = require('../../../../util/failcounter')
const FINAL_DEVICE_CAPTURE_WIDTH = 1080;
const FINAL_DEVICE_CAPTURE_HEIGHT = 1920;
module.exports = syrup.serial()
.dependency(require('../../support/adb'))
.dependency(require('../../resources/minicap'))
.dependency(require('../util/display'))
.dependency(require('./options'))
.define(function(options, adb, minicap, display, screenOptions) {
var log = logger.createLogger('device:plugins:screen:stream')
var frameStore = new FrameStore();
function FrameProducer(config) {
EventEmitter.call(this)
this.actionQueue = []
this.runningState = FrameProducer.STATE_STOPPED
this.desiredState = new StateQueue()
this.output = null
this.socket = null
this.pid = -1
this.banner = null
this.parser = null
this.frameConfig = config
this.readable = false
this.needsReadable = false
this.failCounter = new FailCounter(3, 10000)
this.failCounter.on('exceedLimit', this._failLimitExceeded.bind(this))
this.failed = false
this.readableListener = this._readableListener.bind(this)
}
util.inherits(FrameProducer, EventEmitter)
FrameProducer.STATE_STOPPED = 1
FrameProducer.STATE_STARTING = 2
FrameProducer.STATE_STARTED = 3
FrameProducer.STATE_STOPPING = 4
FrameProducer.prototype._ensureState = function() {
if (this.desiredState.empty()) {
return
}
if (this.failed) {
log.warn('Will not apply desired state due to too many failures')
return
}
switch (this.runningState) {
case FrameProducer.STATE_STARTING:
case FrameProducer.STATE_STOPPING:
// Just wait.
break
case FrameProducer.STATE_STOPPED:
if (this.desiredState.next() === FrameProducer.STATE_STARTED) {
this.runningState = FrameProducer.STATE_STARTING
this._startService().bind(this)
.then(function(out) {
this.output = new RiskyStream(out)
.on('unexpectedEnd', this._outputEnded.bind(this))
return this._readOutput(this.output.stream)
})
.then(function() {
return this._waitForPid()
})
.then(function() {
return this._connectService()
})
.then(function(socket) {
this.parser = new FrameParser()
this.socket = new RiskyStream(socket)
.on('unexpectedEnd', this._socketEnded.bind(this))
return this._readBanner(this.socket.stream)
})
.then(function(banner) {
this.banner = banner
return this._readFrames(this.socket.stream)
})
.then(function() {
this.runningState = FrameProducer.STATE_STARTED
this.emit('start')
})
.catch(Promise.CancellationError, function() {
return this._stop()
})
.catch(function(err) {
return this._stop().finally(function() {
this.failCounter.inc()
this.emit('error', err)
})
})
.finally(function() {
this._ensureState()
})
}
else {
setImmediate(this._ensureState.bind(this))
}
break
case FrameProducer.STATE_STARTED:
if (this.desiredState.next() === FrameProducer.STATE_STOPPED) {
this.runningState = FrameProducer.STATE_STOPPING
this._stop().finally(function() {
this._ensureState()
})
}
else {
setImmediate(this._ensureState.bind(this))
}
break
}
}
FrameProducer.prototype.start = function() {
log.info('Requesting frame producer to start')
this.desiredState.push(FrameProducer.STATE_STARTED)
this._ensureState()
}
FrameProducer.prototype.stop = function() {
log.info('Requesting frame producer to stop')
this.desiredState.push(FrameProducer.STATE_STOPPED)
this._ensureState()
}
FrameProducer.prototype.restart = function() {
switch (this.runningState) {
case FrameProducer.STATE_STARTED:
case FrameProducer.STATE_STARTING:
this.desiredState.push(FrameProducer.STATE_STOPPED)
this.desiredState.push(FrameProducer.STATE_STARTED)
this._ensureState()
break
}
}
FrameProducer.prototype.updateRotation = function(rotation) {
if (this.frameConfig.rotation === rotation) {
log.info('Keeping %d as current frame producer rotation', rotation)
return
}
log.info('Setting frame producer rotation to %d', rotation)
this.frameConfig.rotation = rotation
this._configChanged()
}
FrameProducer.prototype.updateProjection = function(width, height) {
if (this.frameConfig.virtualWidth === width &&
this.frameConfig.virtualHeight === height) {
log.info(
'Keeping %dx%d as current frame producer projection', width, height)
return
}
log.info('Setting frame producer projection to %dx%d', width, height)
this.frameConfig.virtualWidth = width
this.frameConfig.virtualHeight = height
this._configChanged()
}
FrameProducer.prototype.setStaticProjection = function() {
if (this.frameConfig.virtualWidth === FINAL_DEVICE_CAPTURE_WIDTH &&
this.frameConfig.virtualHeight === FINAL_DEVICE_CAPTURE_HEIGHT) {
log.info('Keeping %dx%d as current frame producer projection',
FINAL_DEVICE_CAPTURE_WIDTH, FINAL_DEVICE_CAPTURE_HEIGHT)
return
}
log.info('Setting frame producer projection to %dx%d',
FINAL_DEVICE_CAPTURE_WIDTH,
FINAL_DEVICE_CAPTURE_HEIGHT)
this.frameConfig.virtualWidth = FINAL_DEVICE_CAPTURE_WIDTH
this.frameConfig.virtualHeight = FINAL_DEVICE_CAPTURE_HEIGHT
this._configChanged()
}
FrameProducer.prototype.nextFrame = function() {
var frame = null
var chunk
if (this.parser) {
while ((frame = this.parser.nextFrame()) === null) {
chunk = this.socket.stream.read()
if (chunk) {
this.parser.push(chunk)
}
else {
this.readable = false
break
}
}
}
return frame
}
FrameProducer.prototype.needFrame = function() {
this.needsReadable = true
this._maybeEmitReadable()
}
FrameProducer.prototype._configChanged = function() {
this.restart()
}
FrameProducer.prototype._socketEnded = function() {
log.warn('Connection to minicap ended unexpectedly')
this.failCounter.inc()
this.restart()
}
FrameProducer.prototype._outputEnded = function() {
log.warn('Shell keeping minicap running ended unexpectedly')
this.failCounter.inc()
this.restart()
}
FrameProducer.prototype._failLimitExceeded = function(limit, time) {
this._stop()
this.failed = true
this.emit('error', new Error(util.format(
'Failed more than %d times in %dms'
, limit
, time
)))
}
FrameProducer.prototype._startService = function() {
log.info('Launching screen service')
return minicap.run(util.format('-S -P %s', this.frameConfig.toString()))
.timeout(10000)
}
FrameProducer.prototype._readOutput = function(out) {
out.pipe(split()).on('data', function(line) {
var trimmed = line.toString().trim()
if (trimmed === '') {
return
}
if (/ERROR/.test(line)) {
log.fatal('minicap error: "%s"', line)
return lifecycle.fatal()
}
var match = /^PID: (\d+)$/.exec(line)
if (match) {
this.pid = Number(match[1])
this.emit('pid', this.pid)
}
log.info('minicap says: "%s"', line)
}.bind(this))
}
FrameProducer.prototype._waitForPid = function() {
if (this.pid > 0) {
return Promise.resolve(this.pid)
}
var pidListener
return new Promise(function(resolve) {
this.on('pid', pidListener = resolve)
}.bind(this)).bind(this)
.timeout(2000)
.finally(function() {
this.removeListener('pid', pidListener)
})
}
FrameProducer.prototype._connectService = function() {
function tryConnect(times, delay) {
return adb.openLocal(options.serial, 'localabstract:minicap')
.timeout(10000)
.then(function(out) {
return out
})
.catch(function(err) {
if (/closed/.test(err.message) && times > 1) {
return Promise.delay(delay)
.then(function() {
return tryConnect(times - 1, delay * 2)
})
}
return Promise.reject(err)
})
}
log.info('Connecting to minicap service')
return tryConnect(5, 100)
}
FrameProducer.prototype._stop = function() {
return this._disconnectService(this.socket).bind(this)
.timeout(2000)
.then(function() {
return this._stopService(this.output).timeout(10000)
})
.then(function() {
this.runningState = FrameProducer.STATE_STOPPED
this.emit('stop')
})
.catch(function(err) {
// In practice we _should_ never get here due to _stopService()
// being quite aggressive. But if we do, well... assume it
// stopped anyway for now.
this.runningState = FrameProducer.STATE_STOPPED
this.emit('error', err)
this.emit('stop')
})
.finally(function() {
this.output = null
this.socket = null
this.pid = -1
this.banner = null
this.parser = null
})
}
FrameProducer.prototype._disconnectService = function(socket) {
log.info('Disconnecting from minicap service')
if (!socket || socket.ended) {
return Promise.resolve(true)
}
socket.stream.removeListener('readable', this.readableListener)
var endListener
return new Promise(function(resolve) {
socket.on('end', endListener = function() {
resolve(true)
})
socket.stream.resume()
socket.end()
})
.finally(function() {
socket.removeListener('end', endListener)
})
}
FrameProducer.prototype._stopService = function(output) {
log.info('Stopping minicap service')
if (!output || output.ended) {
return Promise.resolve(true)
}
var pid = this.pid
function kill(signal) {
if (pid <= 0) {
return Promise.reject(new Error('Minicap service pid is unknown'))
}
var signum = {
SIGTERM: -15
, SIGKILL: -9
}[signal]
log.info('Sending %s to minicap', signal)
return Promise.all([
output.waitForEnd()
, adb.shell(options.serial, ['kill', signum, pid])
.then(adbkit.util.readAll)
.return(true)
])
.timeout(2000)
}
function kindKill() {
return kill('SIGTERM')
}
function forceKill() {
return kill('SIGKILL')
}
function forceEnd() {
log.info('Ending minicap I/O as a last resort')
output.end()
return Promise.resolve(true)
}
return kindKill()
.catch(Promise.TimeoutError, forceKill)
.catch(forceEnd)
}
FrameProducer.prototype._readBanner = function(socket) {
log.info('Reading minicap banner')
return bannerutil.read(socket).timeout(2000)
}
FrameProducer.prototype._readFrames = function(socket) {
this.needsReadable = true
socket.on('readable', this.readableListener)
// We may already have data pending. Let the user know they should
// at least attempt to read frames now.
this.readableListener()
}
FrameProducer.prototype._maybeEmitReadable = function() {
if (this.readable && this.needsReadable) {
this.needsReadable = false
this.emit('readable')
}
}
FrameProducer.prototype._readableListener = function() {
this.readable = true
this._maybeEmitReadable()
}
function createServer() {
log.info('Starting WebSocket server on port %d', screenOptions.publicPort)
var wss = new WebSocket.Server({
port: screenOptions.publicPort
, perMessageDeflate: false
})
var listeningListener, errorListener
return new Promise(function(resolve, reject) {
listeningListener = function() {
return resolve(wss)
}
errorListener = function(err) {
return reject(err)
}
wss.on('listening', listeningListener)
wss.on('error', errorListener)
})
.finally(function() {
wss.removeListener('listening', listeningListener)
wss.removeListener('error', errorListener)
})
}
return createServer()
.then(function(wss) {
var frameProducer = new FrameProducer(
new FrameConfig(display.properties, display.properties))
var broadcastSet = frameProducer.broadcastSet = new BroadcastSet()
broadcastSet.on('nonempty', function() {
frameProducer.start()
})
broadcastSet.on('empty', function() {
frameProducer.stop()
})
broadcastSet.on('insert', function(id) {
// If two clients join a session in the middle, one of them
// may not release the initial size because the projection
// doesn't necessarily change, and the producer doesn't Getting
// restarted. Therefore we have to call onStart() manually
// if the producer is already up and running.
switch (frameProducer.runningState) {
case FrameProducer.STATE_STARTED:
broadcastSet.get(id).onStart(frameProducer)
break
}
})
display.on('rotationChange', function(newRotation) {
frameProducer.updateRotation(newRotation)
})
frameProducer.on('start', function() {
broadcastSet.keys().map(function(id) {
return broadcastSet.get(id).onStart(frameProducer)
})
})
frameProducer.on('readable', function next() {
var frame = frameProducer.nextFrame()
if (frame) {
Promise.settle([broadcastSet.keys().map(function(id) {
return broadcastSet.get(id).onFrame(frame, id)
})]).then(next)
}
else {
frameProducer.needFrame()
}
})
frameProducer.on('error', function(err) {
log.fatal('Frame producer had an error', err.stack)
lifecycle.fatal()
})
wss.on('connection', function(ws) {
var id = uuid.v4()
function wsStartNotifier() {
return new Promise(function(resolve, reject) {
frameProducer.banner.wsId = id;
var message = util.format('start %s',
JSON.stringify(frameProducer.banner))
switch (ws.readyState) {
case WebSocket.OPENING:
// This should never happen.
log.warn('Unable to send banner to OPENING client "%s"', id)
break
case WebSocket.OPEN:
// This is what SHOULD happen.
ws.send(message, function(err) {
return err ? reject(err) : resolve()
})
break
case WebSocket.CLOSING:
// Ok, a 'close' event should remove the client from the set
// soon.
break
case WebSocket.CLOSED:
// This should never happen.
log.warn('Unable to send banner to CLOSED client "%s"', id)
broadcastSet.remove(id)
break
}
})
}
function wsFrameNotifier(frame, id) {
return new Promise(function(resolve, reject) {
switch (ws.readyState) {
case WebSocket.OPENING:
// This should never happen.
return reject(new Error(util.format(
'Unable to send frame to OPENING client "%s"', id)))
case WebSocket.OPEN:
var fileName = frameStore.storeFrame(frame, id);
var message = util.format(
'nextImgId %s'
, fileName
)
// Send the next img file id first
ws.send(message, function (err) {
return err ? reject(err) : resolve()
})
// This is what SHOULD happen.
ws.send(frame, {
binary: true
}, function(err) {
return err ? reject(err) : resolve()
})
return
case WebSocket.CLOSING:
// Ok, a 'close' event should remove the client from the set
// soon.
return
case WebSocket.CLOSED:
// This should never happen.
broadcastSet.remove(id)
return reject(new Error(util.format(
'Unable to send frame to CLOSED client "%s"', id)))
}
})
}
ws.on('message', function(data) {
var match = /^(on|off|(size) ([0-9]+)x([0-9]+))$/.exec(data)
if (match) {
switch (match[2] || match[1]) {
case 'on':
broadcastSet.insert(id, {
onStart: wsStartNotifier
, onFrame: wsFrameNotifier
})
break
case 'off':
broadcastSet.remove(id)
break
case 'size':
frameProducer.setStaticProjection()
break
}
}
})
ws.on('close', function() {
broadcastSet.remove(id)
})
})
lifecycle.observe(function() {
wss.close()
})
lifecycle.observe(function() {
frameProducer.stop()
})
return frameProducer
})
})