1 /* 2 * Geddy JavaScript Web development framework 3 * Copyright 2112 Matthew Eernisse (mde@fleegix.org) 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 var VERSION = '0.1.0' 20 , RETIREMENT_WINDOW = 5 * 60 * 1000 21 , ROTATION_WINDOW = 2 * 60 * 60 * 1000 22 , binding = process.binding('net') 23 , net = require('net') 24 , childProcess = require('child_process') 25 , vm = require('vm') 26 , fs = require('fs') 27 , Log = require('../deps/log') 28 , metrics 29 , util = require('./utils') 30 , Parser 31 , optsReg 32 , parsed 33 , opts 34 , pids = '' 35 , shutdownMode = false 36 , restartMode = false; 37 38 try { 39 metrics = require('metrics'); 40 } 41 catch(e) {} 42 43 /** 44 * @namespace The server 45 */ 46 var server = module.exports = new function () { 47 // Private vars 48 // ------------- 49 // The file-descriptor shared with all the worker-processes 50 var _fd; 51 52 // Public properties 53 // ------------- 54 // Passed-in opts 55 this.opts = null; 56 57 // Base config, can be overridden by opts passed to `start` 58 this.config = { 59 // Default to prod 60 environment: 'production' 61 // Number of worker-processes to spawn 62 , workers: 2 63 // Port to listen on 64 , port: 4000 65 // Set stdout to debug log-level 66 , debug: false 67 // Use worker-process rotation 68 , rotateWorkers: true 69 // How long for a full rotation 70 , rotationWindow: 2 * 60 * 60 * 1000 71 // Default logfile location 72 , logDir: process.cwd() + '/log' 73 // How long to wait for in-flight requests before killing 74 , gracefulRestartTimeout: 60000 75 // Number of milliseconds old a heartbeat-timestamp can be 76 // before killing the worker process 77 , heartbeatWindow: 20000 78 // Place to look for static content to serve in dev-mode 79 , staticFilePath: process.cwd() + '/public' 80 // Default session-settings -- setting to null will mean no sessions 81 , sessions: { 82 store: 'memory', 83 key: 'sid', 84 expiry: 14 * 24 * 60 * 60 85 } 86 // Key for when using Cookie session-store 87 , cookieSessionKey: 'sdata' 88 }; 89 // Registry of the worker-processes 90 this.workers = {}; 91 // List of the worker-PIDs, in order 92 this.workerPidList = []; 93 94 // Metrics Server 95 this.metricsServer = null; 96 97 // Private functions 98 // ------------- 99 var _readConfig = function () { 100 var opts = this.opts 101 , dir = process.cwd() 102 // TODO: make server configs reload on graceful restart? 103 , baseConfig = require(dir + '/config/environment.js') 104 , envConfig; 105 106 this.config.environment = opts.environment || this.config.environment; 107 envConfig = require(dir + '/config/' + opts.environment + '.js'); 108 109 // Start with a blank slate, mix everything in 110 // TODO: Do in one go with recursive mixin 111 util.mixin(this.config, baseConfig); 112 util.mixin(this.config, envConfig); 113 util.mixin(this.config, opts); 114 if (this.config.workers < 2) { 115 this.config.rotateWorkers = false; 116 } 117 } 118 /** 119 * Creates the shared file-descriptor shared with the worker-processes 120 */ 121 , _createFd = function () { 122 _fd = binding.socket('tcp4'); 123 binding.bind(_fd, parseInt(this.config.port)); 124 binding.listen(_fd, 128); 125 } 126 127 /** 128 * Start the metrics server 129 */ 130 , _doMetricsTracking = function () { 131 if (metrics) { 132 this.metricsServer = new metrics.Server(this.config.metricsPort || 9091); 133 } 134 } 135 136 /** 137 * Creates the worker-processes that respond to proxy requests 138 */ 139 , _createWorkerProcesses = function () { 140 var self = this 141 , configCount = this.config.workers 142 , currentCount = this.workerPidList.length 143 , needed = configCount - currentCount 144 , rotationWindow = this.config.rotationWindow 145 , staggerInterval = rotationWindow / needed 146 , retirement = (new Date()).getTime() + rotationWindow 147 , msg; 148 if (needed) { 149 msg = 'Creating ' + needed + ' worker process'; 150 msg += needed > 1 ? 'es.' : '.'; 151 this.stdoutLog.info(msg); 152 while (currentCount < configCount) { 153 currentCount++; 154 this.createWorkerProcess(retirement); 155 retirement -= staggerInterval; 156 } 157 } 158 } 159 160 , _doProcessAccounting = function () { 161 var self = this 162 , workers = this.workers 163 , worker 164 , configCount = this.config.workers 165 , now = (new Date()).getTime() 166 , heartbeatWindow = this.config.heartbeatWindow; 167 for (var p in workers) { 168 worker = workers[p]; 169 if (this.config.rotateWorkers && now > worker.retireAt) { 170 worker.retired = true; 171 worker.sendMessage({method: 'retire'}); 172 } 173 if ((now - worker.heartbeatAt) > heartbeatWindow) { 174 this.stdoutLog.warning("No current heartbeat from " + worker.pid + ", killing process."); 175 this.kill(worker.pid); 176 } 177 178 } 179 if (!shutdownMode) { 180 _createWorkerProcesses.call(this); 181 setTimeout(function () { 182 _doProcessAccounting.call(self); 183 }, 5000); 184 } 185 } 186 187 , _initLogging = function (callback) { 188 var self = this 189 , levelsByType 190 , stdoutLevel 191 , types = ['stdout', 'stderr', 'access'] 192 , now = (new Date()).getTime() 193 , loggly 194 , dir = this.config.logDir 195 // Recursive function for rotating and initializing each of the log-types. 196 // Calls the passed-in callback when the entire process is done 197 , rotateAndInitByType = function () { 198 var type = types.shift() 199 , cmd 200 , next; 201 // Grab the next logger-type, if any 202 if (type) { 203 // Rename the log file, ex.: mv logs/access.log logs/access.<TIMESTAMP>.log 204 cmd = 'mv ' + dir + '/' + type + '.log ' + dir + '/' + type + '.' + now + '.log'; 205 // After the file is renmaed, create the new logger with the original filename 206 // e.g., access.log 207 next = function () { 208 self[type + 'Log'] = new Log(levelsByType[type], fs.createWriteStream(dir + 209 '/' + type + '.log'), true, loggly); 210 // Go on to the next logger type until none are left 211 rotateAndInitByType(); 212 }; 213 childProcess.exec(cmd, next); 214 } 215 // No logger-types left, continue on with the main init process 216 else { 217 callback(); 218 } 219 }; 220 221 // Set the logging level for stdout 222 if (this.config.debug) { 223 stdoutLevel = Log.DEBUG; 224 } 225 else if (this.config.logLevel) { 226 stdoutLevel = Log[this.config.logLevel](); 227 } 228 else { 229 stdoutLevel = Log.INFO; // Default to info 230 } 231 232 // Now that we have the desired level for stdout, set up log levels by key 233 levelsByType = { 234 access: 'access' 235 , stderr: 'error' 236 , stdout: stdoutLevel 237 }; 238 239 // Create the log directory if it doesn't exist 240 childProcess.exec('mkdir -p ' + dir, function (err) { 241 if (err) { 242 throw err; 243 } 244 }); 245 246 // Kick the log-init process off 247 rotateAndInitByType(); 248 }; 249 250 // Public methods 251 // ------------- 252 /** 253 * Starts up the proxy server -- creates the shared FD, and the 254 * worker-processes 255 */ 256 this.start = function (restart) { 257 var self = this 258 , msg = restart ? 'restarting' : 'starting'; 259 260 _readConfig.call(this); 261 262 if (this.config.metricsPort) { 263 _doMetricsTracking.call(self); 264 } 265 266 if (!restart) { 267 _createFd.call(self); 268 } 269 270 _initLogging.call(this, function () { 271 self.stdoutLog.info('Server ' + msg + ' with config: ' + JSON.stringify(self.config)); 272 _doProcessAccounting.call(self); 273 }); 274 }; 275 276 this.restart = function () { 277 restartMode = false; 278 shutdownMode = false; 279 this.start(true); 280 }; 281 282 this.emergency = function (msg) { 283 fs.writeFileSync(this.config.logDir + '/emergency.log', msg); 284 }; 285 286 this.createWorkerProcess = function (dt) { 287 var retireAt = dt || (new Date()).getTime() + this.config.rotationWindow 288 , worker = new server.WorkerProcess(retireAt); 289 // Pass the shared FD to the worker-processes 290 worker.init(_fd); 291 // Regsiter the worker-process and record the PID 292 this.workers[worker.pid] = worker; 293 this.workerPidList.push(worker.pid); 294 }; 295 296 297 this.sendShutdownToWorkers = function () { 298 var pid 299 , pidList = this.workerPidList 300 , workers = this.workers 301 , worker; 302 for (var i = 0, ii = pidList.length; i < ii; i++) { 303 pid = pidList[i]; 304 worker = workers[pid]; 305 worker.sendMessage({method: 'shutdown'}); 306 } 307 }; 308 309 this.shutdown = function (pid) { 310 var havePids = false; 311 for (var p in this.workers) { 312 havePids = true; 313 } 314 // Once all the child-processes are gone, and no more requests are 315 // in-flight, commit seppuku 316 if (!havePids) { 317 if (!restartMode) { 318 this.die("Exiting..."); 319 } 320 this.restart(); 321 } 322 }; 323 324 this.kill = function (pid) { 325 var pidList = this.workerPidList 326 , index; 327 328 try { process.kill(pid); } 329 catch(e) {} 330 331 delete this.workers[pid]; 332 for (var i = 0, ii = pidList.length; i < ii; i++) { 333 if (pidList[i] == pid) { 334 index = i; 335 } 336 } 337 this.workerPidList.splice(index, 1); 338 }; 339 340 /** 341 * Prints out a message and ends the program. 342 * @param {String} str The message to print out before dying. 343 */ 344 this.die = function (str) { 345 console.log(str); 346 process.exit(); 347 }; 348 349 }(); 350 351 /** 352 * @constructor Worker-process that handles proxy requests 353 */ 354 server.WorkerProcess = function (retireAt) { 355 this.retireAt = retireAt; 356 this.heartbeatAt = (new Date()).getTime(); 357 this.process = null; 358 this.pid = null; 359 this.fd = null; 360 this.retired = false; 361 this.messageParser = null; 362 363 // Create a pair of sockets that the master process and the 364 // child will use to communicate 365 // http://osr507doc.sco.com/en/netguide/dusockD.socketpairs_codetext.html 366 // Credits: Ext's Connect, http://github.com/extjs/Connect 367 var fds = binding.socketpair(); 368 369 // Spawn the child process 370 this.process = childProcess.spawn( 371 eval(server.config.worker_executable) || process.execPath, 372 //eval(server.config.worker_arguments) || [process.cwd() + '/app.js'], 373 [__dirname+ '/geddy.js'], 374 {customFds: [fds[1], -1, -1]} 375 ); 376 this.pid = this.process.pid; 377 378 379 var self = this; 380 // Wait a moment, then patch child's stdin 381 if (!self.process.stdin) { 382 self.process.stdin = new net.Stream(fds[0], 'unix'); 383 } 384 } 385 386 server.WorkerProcess.prototype = new function () { 387 var _dispatch = { 388 retired: function (msg) { 389 this.commitSeppuku(true, 'retiring'); 390 } 391 , shutdown: function (msg) { 392 console.log("IN SHUTDOWN - SERVER"); 393 this.commitSeppuku(false, 'shutting down'); 394 server.shutdown(); 395 } 396 , log: function (msg) { 397 // If for some reason, there's no logType, assume debug? 398 var type = msg.logType || 'debug' 399 // access/error have their own loggers, everything else 400 // goes to stdoutLog 401 , logger = server[type + 'Log'] || server.stdoutLog; 402 logger[type](msg.message); 403 } 404 , heartbeat: function (msg) { 405 this.heartbeatAt = (new Date()).getTime(); 406 } 407 , createMetric: function (msg) { 408 if (server.metricsServer) { 409 msg.type = msg.type[0].toUpperCase() + msg.type.substring(1) 410 server.metricsServer.addMetric(msg.eventType, new metrics[msg.type]); 411 } 412 } 413 , updateMetric: function (msg) { 414 if (server.metricsServer) { 415 var namespaces = msg.eventType.split('.') 416 , event = namespaces.pop() 417 , namespace = namespaces.join('.'); 418 var metric = server.metricsServer.trackedMetrics[namespace][event]; 419 metric[msg.metricMethod].apply(metric, msg.metricArgs); 420 } 421 } 422 , error: function (msg) { 423 server.stderrLog.error('Caught error-message from child-process.'); 424 server.stderrLog.error(msg.data.stack); 425 this.commitSeppuku(false, 'killing'); 426 } 427 }; 428 429 this.commitSeppuku = function (respectShutdown, msg) { 430 if (respectShutdown && shutdownMode) { 431 return; 432 } 433 var child = this.process; 434 server.stdoutLog.warning(msg + ' ' + child.pid); 435 server.kill(child.pid); 436 }; 437 438 /** 439 * @param {Number} fd pointer to the shared file descriptor 440 * to pass to the HTTP server in the worker-process 441 */ 442 this.init = function (fd) { 443 // Pass a dummy config and fd via the child's stdin 444 this.fd = fd; 445 var msg = { 446 method: 'config' 447 , data: server.config 448 }; 449 this.messageParser = new util.MessageParser(_dispatch, this, function (notParsed) { 450 process.stdout.write(notParsed + '\n'); 451 }); 452 this.sendMessage(msg); 453 this.addListeners(); 454 }; 455 456 this.addListeners = function () { 457 var self = this 458 , child = this.process; 459 460 child.stdout.addListener('data', function (d) { 461 self.receiveMessage(d); 462 }); 463 464 child.stderr.addListener('data', function (d) { 465 var data = d.toString(); 466 if (data.indexOf('###shutdown###') > -1) { 467 process.kill(child.pid); 468 process.exit(); 469 } 470 else { 471 process.stdout.write(data); 472 } 473 }); 474 475 child.addListener('exit', function (code) { 476 self.fixBrokenWorker(); 477 }); 478 479 }; 480 481 this.fixBrokenWorker = function () { 482 if (!this.retired) { 483 server.stdoutLog.warning("Worker " + this.pid + " died without being retired."); 484 server.kill(this.pid); 485 } 486 }; 487 488 this.receiveMessage = function (d) { 489 this.messageParser.handle(d); 490 }; 491 492 this.sendMessage = function (msg) { 493 try { 494 var output = JSON.stringify(msg); 495 this.process.stdin.write(output + '\n', 'ascii', this.fd); 496 } catch (err) { 497 this.fixBrokenWorker(); 498 } 499 }; 500 501 }(); 502