blob: 7b022d91a7d606f626fd05a710aaa1354c72e300 [file] [log] [blame]
var Promise = require('bluebird')
var logger = require('../../util/logger')
var wire = require('../../wire')
var wirerouter = require('../../wire/router')
var wireutil = require('../../wire/util')
var dbapi = require('../../db/api')
var lifecycle = require('../../util/lifecycle')
var srv = require('../../util/srv')
var zmqutil = require('../../util/zmqutil')
module.exports = function(options) {
var log = logger.createLogger('processor')
if (options.name) {
logger.setGlobalIdentifier(options.name)
}
// App side
var appDealer = zmqutil.socket('dealer')
Promise.map(options.endpoints.appDealer, function(endpoint) {
return srv.resolve(endpoint).then(function(records) {
return srv.attempt(records, function(record) {
log.info('App dealer connected to "%s"', record.url)
appDealer.connect(record.url)
return Promise.resolve(true)
})
})
})
.catch(function(err) {
log.fatal('Unable to connect to app dealer endpoint', err)
lifecycle.fatal()
})
// Device side
var devDealer = zmqutil.socket('dealer')
appDealer.on('message', function(channel, data) {
devDealer.send([channel, data])
})
Promise.map(options.endpoints.devDealer, function(endpoint) {
return srv.resolve(endpoint).then(function(records) {
return srv.attempt(records, function(record) {
log.info('Device dealer connected to "%s"', record.url)
devDealer.connect(record.url)
return Promise.resolve(true)
})
})
})
.catch(function(err) {
log.fatal('Unable to connect to dev dealer endpoint', err)
lifecycle.fatal()
})
devDealer.on('message', wirerouter()
// Initial device message
.on(wire.DeviceIntroductionMessage, function(channel, message, data) {
dbapi.saveDeviceInitialState(message.serial, message)
.then(function() {
devDealer.send([
message.provider.channel
, wireutil.envelope(new wire.DeviceRegisteredMessage(
message.serial
))
])
appDealer.send([channel, data])
})
})
// Workerless messages
.on(wire.DevicePresentMessage, function(channel, message, data) {
dbapi.setDevicePresent(message.serial)
appDealer.send([channel, data])
})
.on(wire.DeviceAbsentMessage, function(channel, message, data) {
dbapi.setDeviceAbsent(message.serial)
appDealer.send([channel, data])
})
.on(wire.DeviceStatusMessage, function(channel, message, data) {
dbapi.saveDeviceStatus(message.serial, message.status)
appDealer.send([channel, data])
})
.on(wire.DeviceHeartbeatMessage, function(channel, message, data) {
appDealer.send([channel, data])
})
// Worker initialized
.on(wire.DeviceReadyMessage, function(channel, message, data) {
dbapi.setDeviceReady(message.serial, message.channel)
.then(function() {
devDealer.send([
message.channel
, wireutil.envelope(new wire.ProbeMessage())
])
appDealer.send([channel, data])
})
})
// Worker messages
.on(wire.JoinGroupByAdbFingerprintMessage, function(channel, message) {
dbapi.lookupUserByAdbFingerprint(message.fingerprint)
.then(function(user) {
if (user) {
devDealer.send([
channel
, wireutil.envelope(new wire.AutoGroupMessage(
new wire.OwnerMessage(
user.email
, user.name
, user.group
)
, message.fingerprint
))
])
}
else if (message.currentGroup) {
appDealer.send([
message.currentGroup
, wireutil.envelope(new wire.JoinGroupByAdbFingerprintMessage(
message.serial
, message.fingerprint
, message.comment
))
])
}
})
.catch(function(err) {
log.error(
'Unable to lookup user by ADB fingerprint "%s"'
, message.fingerprint
, err.stack
)
})
})
.on(wire.JoinGroupByVncAuthResponseMessage, function(channel, message) {
dbapi.lookupUserByVncAuthResponse(message.response, message.serial)
.then(function(user) {
if (user) {
devDealer.send([
channel
, wireutil.envelope(new wire.AutoGroupMessage(
new wire.OwnerMessage(
user.email
, user.name
, user.group
)
, message.response
))
])
}
else if (message.currentGroup) {
appDealer.send([
message.currentGroup
, wireutil.envelope(new wire.JoinGroupByVncAuthResponseMessage(
message.serial
, message.response
))
])
}
})
.catch(function(err) {
log.error(
'Unable to lookup user by VNC auth response "%s"'
, message.response
, err.stack
)
})
})
.on(wire.JoinGroupMessage, function(channel, message, data) {
dbapi.setDeviceOwner(message.serial, message.owner)
appDealer.send([channel, data])
})
.on(wire.LeaveGroupMessage, function(channel, message, data) {
dbapi.unsetDeviceOwner(message.serial, message.owner)
appDealer.send([channel, data])
})
.on(wire.DeviceLogMessage, function(channel, message, data) {
appDealer.send([channel, data])
})
.on(wire.DeviceIdentityMessage, function(channel, message, data) {
dbapi.saveDeviceIdentity(message.serial, message)
appDealer.send([channel, data])
})
.on(wire.TransactionProgressMessage, function(channel, message, data) {
appDealer.send([channel, data])
})
.on(wire.TransactionDoneMessage, function(channel, message, data) {
appDealer.send([channel, data])
})
.on(wire.DeviceLogcatEntryMessage, function(channel, message, data) {
appDealer.send([channel, data])
})
.on(wire.AirplaneModeEvent, function(channel, message, data) {
dbapi.setDeviceAirplaneMode(message.serial, message.enabled)
appDealer.send([channel, data])
})
.on(wire.BatteryEvent, function(channel, message, data) {
dbapi.setDeviceBattery(message.serial, message)
appDealer.send([channel, data])
})
.on(wire.DeviceBrowserMessage, function(channel, message, data) {
dbapi.setDeviceBrowser(message.serial, message)
appDealer.send([channel, data])
})
.on(wire.ConnectivityEvent, function(channel, message, data) {
dbapi.setDeviceConnectivity(message.serial, message)
appDealer.send([channel, data])
})
.on(wire.PhoneStateEvent, function(channel, message, data) {
dbapi.setDevicePhoneState(message.serial, message)
appDealer.send([channel, data])
})
.on(wire.RotationEvent, function(channel, message, data) {
dbapi.setDeviceRotation(message.serial, message.rotation)
appDealer.send([channel, data])
})
.on(wire.ReverseForwardsEvent, function(channel, message, data) {
dbapi.setDeviceReverseForwards(message.serial, message.forwards)
appDealer.send([channel, data])
})
.handler())
lifecycle.observe(function() {
[appDealer, devDealer].forEach(function(sock) {
try {
sock.close()
}
catch (err) {
// No-op
}
})
})
}