| var Promise = require('bluebird') |
| |
| var logger = require('../../util/logger') |
| var wire = require('../../wire') |
| var wireutil = require('../../wire/util') |
| var wirerouter = require('../../wire/router') |
| var dbapi = require('../../db/api') |
| var lifecycle = require('../../util/lifecycle') |
| var srv = require('../../util/srv') |
| var TtlSet = require('../../util/ttlset') |
| var zmqutil = require('../../util/zmqutil') |
| |
| module.exports = function(options) { |
| var log = logger.createLogger('reaper') |
| var ttlset = new TtlSet(options.heartbeatTimeout) |
| |
| if (options.name) { |
| logger.setGlobalIdentifier(options.name) |
| } |
| |
| // Input |
| var sub = zmqutil.socket('sub') |
| Promise.map(options.endpoints.sub, function(endpoint) { |
| return srv.resolve(endpoint).then(function(records) { |
| return srv.attempt(records, function(record) { |
| log.info('Receiving input from "%s"', record.url) |
| sub.connect(record.url) |
| return Promise.resolve(true) |
| }) |
| }) |
| }) |
| .catch(function(err) { |
| log.fatal('Unable to connect to sub endpoint', err) |
| lifecycle.fatal() |
| }) |
| |
| // Establish always-on channels |
| ;[wireutil.global].forEach(function(channel) { |
| log.info('Subscribing to permanent channel "%s"', channel) |
| sub.subscribe(channel) |
| }) |
| |
| // Output |
| var push = zmqutil.socket('push') |
| Promise.map(options.endpoints.push, function(endpoint) { |
| return srv.resolve(endpoint).then(function(records) { |
| return srv.attempt(records, function(record) { |
| log.info('Sending output to "%s"', record.url) |
| push.connect(record.url) |
| return Promise.resolve(true) |
| }) |
| }) |
| }) |
| .catch(function(err) { |
| log.fatal('Unable to connect to push endpoint', err) |
| lifecycle.fatal() |
| }) |
| |
| ttlset.on('insert', function(serial) { |
| log.info('Device "%s" is present', serial) |
| push.send([ |
| wireutil.global |
| , wireutil.envelope(new wire.DevicePresentMessage( |
| serial |
| )) |
| ]) |
| }) |
| |
| ttlset.on('drop', function(serial) { |
| log.info('Reaping device "%s" due to heartbeat timeout', serial) |
| push.send([ |
| wireutil.global |
| , wireutil.envelope(new wire.DeviceAbsentMessage( |
| serial |
| )) |
| ]) |
| }) |
| |
| function loadInitialState() { |
| return dbapi.loadPresentDevices() |
| .then(function(cursor) { |
| return Promise.promisify(cursor.toArray, cursor)() |
| .then(function(list) { |
| var now = Date.now() |
| list.forEach(function(device) { |
| ttlset.bump(device.serial, now, TtlSet.SILENT) |
| }) |
| }) |
| }) |
| } |
| |
| function listenToChanges() { |
| sub.on('message', wirerouter() |
| .on(wire.DeviceIntroductionMessage, function(channel, message) { |
| ttlset.drop(message.serial, TtlSet.SILENT) |
| ttlset.bump(message.serial, Date.now()) |
| }) |
| .on(wire.DeviceHeartbeatMessage, function(channel, message) { |
| ttlset.bump(message.serial, Date.now()) |
| }) |
| .on(wire.DeviceAbsentMessage, function(channel, message) { |
| ttlset.drop(message.serial, TtlSet.SILENT) |
| }) |
| .handler()) |
| } |
| |
| log.info('Reaping devices with no heartbeat') |
| |
| lifecycle.observe(function() { |
| [push, sub].forEach(function(sock) { |
| try { |
| sock.close() |
| } |
| catch (err) { |
| // No-op |
| } |
| }) |
| |
| ttlset.stop() |
| }) |
| |
| loadInitialState().then(listenToChanges).catch(function(err) { |
| log.fatal('Unable to load initial state', err) |
| lifecycle.fatal() |
| }) |
| } |