| // Copyright Joyent, Inc. and other Node contributors. |
| // |
| // Permission is hereby granted, free of charge, to any person obtaining a |
| // copy of this software and associated documentation files (the |
| // "Software"), to deal in the Software without restriction, including |
| // without limitation the rights to use, copy, modify, merge, publish, |
| // distribute, sublicense, and/or sell copies of the Software, and to permit |
| // persons to whom the Software is furnished to do so, subject to the |
| // following conditions: |
| // |
| // The above copyright notice and this permission notice shall be included |
| // in all copies or substantial portions of the Software. |
| // |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
| // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN |
| // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, |
| // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
| // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE |
| // USE OR OTHER DEALINGS IN THE SOFTWARE. |
| |
| var assert = require('assert'); |
| var fork = require('child_process').fork; |
| var net = require('net'); |
| var EventEmitter = require('events').EventEmitter; |
| var util = require('util'); |
| |
| function isObject(o) { |
| return (typeof o === 'object' && o !== null); |
| } |
| |
| var debug; |
| if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { |
| debug = function(x) { |
| var prefix = process.pid + ',' + |
| (process.env.NODE_UNIQUE_ID ? 'Worker' : 'Master'); |
| console.error(prefix, x); |
| }; |
| } else { |
| debug = function() { }; |
| } |
| |
| // cluster object: |
| function Cluster() { |
| EventEmitter.call(this); |
| } |
| |
| util.inherits(Cluster, EventEmitter); |
| |
| var cluster = module.exports = new Cluster(); |
| |
| // Used in the master: |
| var masterStarted = false; |
| var ids = 0; |
| var serverHandlers = {}; |
| |
| // Used in the worker: |
| var serverListeners = {}; |
| var queryIds = 0; |
| var queryCallbacks = {}; |
| |
| // Define isWorker and isMaster |
| cluster.isWorker = 'NODE_UNIQUE_ID' in process.env; |
| cluster.isMaster = ! cluster.isWorker; |
| |
| // The worker object is only used in a worker |
| cluster.worker = cluster.isWorker ? {} : null; |
| // The workers array is only used in the master |
| cluster.workers = cluster.isMaster ? {} : null; |
| |
| // Settings object |
| var settings = cluster.settings = {}; |
| |
| // Simple function to call a function on each worker |
| function eachWorker(cb) { |
| // Go through all workers |
| for (var id in cluster.workers) { |
| if (cluster.workers.hasOwnProperty(id)) { |
| cb(cluster.workers[id]); |
| } |
| } |
| } |
| |
| // Extremely simple progress tracker |
| function ProgressTracker(missing, callback) { |
| this.missing = missing; |
| this.callback = callback; |
| } |
| ProgressTracker.prototype.done = function() { |
| this.missing -= 1; |
| this.check(); |
| }; |
| ProgressTracker.prototype.check = function() { |
| if (this.missing === 0) this.callback(); |
| }; |
| |
| cluster.setupMaster = function(options) { |
| // This can only be called from the master. |
| assert(cluster.isMaster); |
| |
| // Don't allow this function to run more than once |
| if (masterStarted) return; |
| masterStarted = true; |
| |
| // Get filename and arguments |
| options = options || {}; |
| |
| // By default, V8 writes the profile data of all processes to a single |
| // v8.log. |
| // |
| // Running that log file through a tick processor produces bogus numbers |
| // because many events won't match up with the recorded memory mappings |
| // and you end up with graphs where 80+% of ticks is unaccounted for. |
| // |
| // Fixing the tick processor to deal with multi-process output is not very |
| // useful because the processes may be running wildly disparate workloads. |
| // |
| // That's why we fix up the command line arguments to include |
| // a "--logfile=v8-%p.log" argument (where %p is expanded to the PID) |
| // unless it already contains a --logfile argument. |
| var execArgv = options.execArgv || process.execArgv; |
| if (execArgv.some(function(s) { return /^--prof/.test(s); }) && |
| !execArgv.some(function(s) { return /^--logfile=/.test(s); })) |
| { |
| execArgv = execArgv.slice(); |
| execArgv.push('--logfile=v8-%p.log'); |
| } |
| |
| // Set settings object |
| settings = cluster.settings = { |
| exec: options.exec || process.argv[1], |
| execArgv: execArgv, |
| args: options.args || process.argv.slice(2), |
| silent: options.silent || false |
| }; |
| |
| // emit setup event |
| cluster.emit('setup'); |
| }; |
| |
| // Check if a message is internal only |
| var INTERNAL_PREFIX = 'NODE_CLUSTER_'; |
| function isInternalMessage(message) { |
| return isObject(message) && |
| typeof message.cmd === 'string' && |
| message.cmd.length > INTERNAL_PREFIX.length && |
| message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX; |
| } |
| |
| // Modify message object to be internal |
| function internalMessage(inMessage) { |
| var outMessage = util._extend({}, inMessage); |
| |
| // Add internal prefix to cmd |
| outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || ''); |
| |
| return outMessage; |
| } |
| |
| // Handle callback messages |
| function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) { |
| |
| // The message there will be sent |
| var message = internalMessage(outMessage); |
| |
| // callback id - will be undefined if not set |
| message._queryEcho = inMessage._requestEcho; |
| |
| // Call callback if a query echo is received |
| if (inMessage._queryEcho) { |
| queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle); |
| delete queryCallbacks[inMessage._queryEcho]; |
| } |
| |
| // Send if outWrap contains something useful |
| if (!(outMessage === undefined && message._queryEcho === undefined)) { |
| sendInternalMessage(worker, message, outHandle); |
| } |
| } |
| |
| // Handle messages from both master and workers |
| var messageHandler = {}; |
| function handleMessage(worker, inMessage, inHandle) { |
| |
| // Remove internal prefix |
| var message = util._extend({}, inMessage); |
| message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length); |
| |
| var respondUsed = false; |
| function respond(outMessage, outHandler) { |
| respondUsed = true; |
| handleResponse(outMessage, outHandler, inMessage, inHandle, worker); |
| } |
| |
| // Run handler if it exists |
| if (messageHandler[message.cmd]) { |
| messageHandler[message.cmd](message, worker, respond); |
| } |
| |
| // Send respond if it hasn't been called yet |
| if (respondUsed === false) { |
| respond(); |
| } |
| } |
| |
| // Messages to the master will be handled using these methods |
| if (cluster.isMaster) { |
| |
| // Handle online messages from workers |
| messageHandler.online = function(message, worker) { |
| worker.state = 'online'; |
| debug('Worker ' + worker.process.pid + ' online'); |
| worker.emit('online'); |
| cluster.emit('online', worker); |
| }; |
| |
| // Handle queryServer messages from workers |
| messageHandler.queryServer = function(message, worker, send) { |
| |
| // This sequence of information is unique to the connection |
| // but not to the worker |
| var args = [message.address, |
| message.port, |
| message.addressType, |
| message.fd]; |
| var key = args.join(':'); |
| var handler; |
| |
| if (serverHandlers.hasOwnProperty(key)) { |
| handler = serverHandlers[key]; |
| } else if (message.addressType === 'udp4' || |
| message.addressType === 'udp6') { |
| var dgram = require('dgram'); |
| handler = dgram._createSocketHandle.apply(net, args); |
| serverHandlers[key] = handler; |
| } else { |
| handler = net._createServerHandle.apply(net, args); |
| serverHandlers[key] = handler; |
| } |
| |
| // echo callback with the fd handler associated with it |
| send({}, handler); |
| }; |
| |
| // Handle listening messages from workers |
| messageHandler.listening = function(message, worker) { |
| |
| worker.state = 'listening'; |
| |
| // Emit listening, now that we know the worker is listening |
| worker.emit('listening', { |
| address: message.address, |
| port: message.port, |
| addressType: message.addressType, |
| fd: message.fd |
| }); |
| cluster.emit('listening', worker, { |
| address: message.address, |
| port: message.port, |
| addressType: message.addressType, |
| fd: message.fd |
| }); |
| }; |
| |
| // Handle suicide messages from workers |
| messageHandler.suicide = function(message, worker) { |
| worker.suicide = true; |
| }; |
| } |
| |
| |
| // Messages to a worker will be handled using these methods |
| else if (cluster.isWorker) { |
| |
| // Handle worker.disconnect from master |
| messageHandler.disconnect = function(message, worker) { |
| worker.disconnect(); |
| }; |
| } |
| |
| function toDecInt(value) { |
| value = parseInt(value, 10); |
| return isNaN(value) ? null : value; |
| } |
| |
| // Create a worker object, that works both for master and worker |
| function Worker(customEnv) { |
| if (!(this instanceof Worker)) return new Worker(); |
| EventEmitter.call(this); |
| |
| var self = this; |
| var env = process.env; |
| |
| // Assign a unique id, default null |
| this.id = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID); |
| |
| // XXX: Legacy. Remove in 0.9 |
| this.workerID = this.uniqueID = this.id; |
| |
| // Assign state |
| this.state = 'none'; |
| |
| // Create or get process |
| if (cluster.isMaster) { |
| |
| // Create env object |
| // first: copy and add id property |
| var envCopy = util._extend({}, env); |
| envCopy['NODE_UNIQUE_ID'] = this.id; |
| // second: extend envCopy with the env argument |
| if (isObject(customEnv)) { |
| envCopy = util._extend(envCopy, customEnv); |
| } |
| |
| // fork worker |
| this.process = fork(settings.exec, settings.args, { |
| 'env': envCopy, |
| 'silent': settings.silent, |
| 'execArgv': settings.execArgv |
| }); |
| } else { |
| this.process = process; |
| } |
| |
| if (cluster.isMaster) { |
| // Save worker in the cluster.workers array |
| cluster.workers[this.id] = this; |
| |
| // Emit a fork event, on next tick |
| // There is no worker.fork event since this has no real purpose |
| process.nextTick(function() { |
| cluster.emit('fork', self); |
| }); |
| } |
| |
| // handle internalMessage, exit and disconnect event |
| this.process.on('internalMessage', handleMessage.bind(null, this)); |
| this.process.once('exit', function(exitCode, signalCode) { |
| prepareExit(self, 'dead'); |
| self.emit('exit', exitCode, signalCode); |
| cluster.emit('exit', self, exitCode, signalCode); |
| }); |
| this.process.once('disconnect', function() { |
| prepareExit(self, 'disconnected'); |
| self.emit('disconnect'); |
| cluster.emit('disconnect', self); |
| }); |
| |
| // relay message and error |
| this.process.on('message', this.emit.bind(this, 'message')); |
| this.process.on('error', this.emit.bind(this, 'error')); |
| |
| } |
| util.inherits(Worker, EventEmitter); |
| cluster.Worker = Worker; |
| |
| function prepareExit(worker, state) { |
| |
| // set state to disconnect |
| worker.state = state; |
| |
| // Make suicide a boolean |
| worker.suicide = !!worker.suicide; |
| |
| // Remove from workers in the master |
| if (cluster.isMaster) { |
| delete cluster.workers[worker.id]; |
| } |
| } |
| |
| // Send internal message |
| function sendInternalMessage(worker, message/*, handler, callback*/) { |
| |
| // Exist callback |
| var callback = arguments[arguments.length - 1]; |
| if (typeof callback !== 'function') { |
| callback = undefined; |
| } |
| |
| // exist handler |
| var handler = arguments[2] !== callback ? arguments[2] : undefined; |
| |
| if (!isInternalMessage(message)) { |
| message = internalMessage(message); |
| } |
| |
| // Store callback for later |
| if (callback) { |
| message._requestEcho = worker.id + ':' + (++queryIds); |
| queryCallbacks[message._requestEcho] = callback; |
| } |
| |
| |
| worker.send(message, handler); |
| } |
| |
| // Send message to worker or master |
| Worker.prototype.send = function() { |
| |
| // You could also just use process.send in a worker |
| this.process.send.apply(this.process, arguments); |
| }; |
| |
| // Kill the worker without restarting |
| Worker.prototype.kill = Worker.prototype.destroy = function(signal) { |
| if (!signal) |
| signal = 'SIGTERM'; |
| |
| var self = this; |
| |
| this.suicide = true; |
| |
| if (cluster.isMaster) { |
| // Disconnect IPC channel |
| // this way the worker won't need to propagate suicide state to master |
| if (self.process.connected) { |
| self.process.once('disconnect', function() { |
| self.process.kill(signal); |
| }); |
| self.process.disconnect(); |
| } else { |
| self.process.kill(signal); |
| } |
| |
| } else { |
| // Channel is open |
| if (this.process.connected) { |
| |
| // Inform master to suicide and then kill |
| sendInternalMessage(this, {cmd: 'suicide'}, function() { |
| process.exit(0); |
| }); |
| |
| // When channel is closed, terminate the process |
| this.process.once('disconnect', function() { |
| process.exit(0); |
| }); |
| } else { |
| process.exit(0); |
| } |
| } |
| }; |
| |
| // The .disconnect function will close all servers |
| // and then disconnect the IPC channel. |
| if (cluster.isMaster) { |
| // Used in master |
| Worker.prototype.disconnect = function() { |
| this.suicide = true; |
| |
| sendInternalMessage(this, {cmd: 'disconnect'}); |
| }; |
| |
| } else { |
| // Used in workers |
| Worker.prototype.disconnect = function() { |
| var self = this; |
| |
| this.suicide = true; |
| |
| // keep track of open servers |
| var servers = Object.keys(serverListeners).length; |
| var progress = new ProgressTracker(servers, function() { |
| // There are no more servers open so we will close the IPC channel. |
| // Closing the IPC channel will emit a disconnect event |
| // in both master and worker on the process object. |
| // This event will be handled by prepareExit. |
| self.process.disconnect(); |
| }); |
| |
| // depending on where this function was called from (master or worker) |
| // The suicide state has already been set, |
| // but it doesn't really matter if we set it again. |
| sendInternalMessage(this, {cmd: 'suicide'}, function() { |
| // in case there are no servers |
| progress.check(); |
| |
| // closing all servers gracefully |
| var server; |
| for (var key in serverListeners) { |
| server = serverListeners[key]; |
| |
| // in case the server is closed we won't close it again |
| if (server._handle === null) { |
| progress.done(); |
| continue; |
| } |
| |
| server.on('close', progress.done.bind(progress)); |
| server.close(); |
| } |
| }); |
| |
| }; |
| } |
| |
| // Fork a new worker |
| cluster.fork = function(env) { |
| // This can only be called from the master. |
| assert(cluster.isMaster); |
| |
| // Make sure that the master has been initialized |
| cluster.setupMaster(); |
| |
| return (new cluster.Worker(env)); |
| }; |
| |
| // execute .disconnect on all workers and close handlers when done |
| cluster.disconnect = function(callback) { |
| // This can only be called from the master. |
| assert(cluster.isMaster); |
| |
| // Close all TCP handlers when all workers are disconnected |
| var workers = Object.keys(cluster.workers).length; |
| var progress = new ProgressTracker(workers, function() { |
| for (var key in serverHandlers) { |
| serverHandlers[key].close(); |
| delete serverHandlers[key]; |
| } |
| |
| // call callback when done |
| if (callback) callback(); |
| }); |
| |
| // begin disconnecting all workers |
| eachWorker(function(worker) { |
| worker.once('disconnect', progress.done.bind(progress)); |
| worker.disconnect(); |
| }); |
| |
| // in case there weren't any workers |
| progress.check(); |
| }; |
| |
| // Internal function. Called from src/node.js when worker process starts. |
| cluster._setupWorker = function() { |
| |
| // Get worker class |
| var worker = cluster.worker = new Worker(); |
| |
| // we will terminate the worker |
| // when the worker is disconnected from the parent accidentally |
| process.once('disconnect', function() { |
| if (worker.suicide !== true) { |
| process.exit(0); |
| } |
| }); |
| |
| // Tell master that the worker is online |
| worker.state = 'online'; |
| sendInternalMessage(worker, { cmd: 'online' }); |
| }; |
| |
| // Internal function. Called by net.js and dgram.js when attempting to bind a |
| // TCP server or UDP socket. |
| cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) { |
| // This can only be called from a worker. |
| assert(cluster.isWorker); |
| |
| // Store tcp instance for later use |
| var key = [address, port, addressType, fd].join(':'); |
| serverListeners[key] = tcpSelf; |
| |
| // Send a listening message to the master |
| tcpSelf.once('listening', function() { |
| cluster.worker.state = 'listening'; |
| sendInternalMessage(cluster.worker, { |
| cmd: 'listening', |
| address: address, |
| port: tcpSelf.address().port || port, |
| addressType: addressType, |
| fd: fd |
| }); |
| }); |
| |
| // Request the fd handler from the master process |
| var message = { |
| cmd: 'queryServer', |
| address: address, |
| port: port, |
| addressType: addressType, |
| fd: fd |
| }; |
| |
| // The callback will be stored until the master has responded |
| sendInternalMessage(cluster.worker, message, function(msg, handle) { |
| cb(handle); |
| }); |
| |
| }; |