1 /*
  2  * Geddy JavaScript Web development framework
  3  * Copyright 2112 Matthew Eernisse (mde@fleegix.org)
  4  *
  5  * Licensed under the Apache License, Version 2.0 (the "License");
  6  * you may not use this file except in compliance with the License.
  7  * You may obtain a copy of the License at
  8  *
  9  *         http://www.apache.org/licenses/LICENSE-2.0
 10  *
 11  * Unless required by applicable law or agreed to in writing, software
 12  * distributed under the License is distributed on an "AS IS" BASIS,
 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  * See the License for the specific language governing permissions and
 15  * limitations under the License.
 16  *
 17 */
 18 
 19 var VERSION = '0.1.0'
 20   , RETIREMENT_WINDOW = 5 * 60 * 1000
 21   , ROTATION_WINDOW = 2 * 60 * 60 * 1000
 22   , binding = process.binding('net')
 23   , net = require('net')
 24   , childProcess = require('child_process')
 25   , vm = require('vm')
 26   , fs = require('fs')
 27   , Log = require('../deps/log')
 28   , metrics
 29   , util = require('./utils')
 30   , Parser
 31   , optsReg
 32   , parsed
 33   , opts
 34   , pids = ''
 35   , shutdownMode = false
 36   , restartMode = false;
 37 
 38 try {
 39   metrics = require('metrics');
 40 }
 41 catch(e) {}
 42 
 43 /**
 44  * @namespace The server
 45  */
 46 var server = module.exports = new function () {
 47   // Private vars
 48   // -------------
 49   // The file-descriptor shared with all the worker-processes
 50   var _fd;
 51 
 52   // Public properties
 53   // -------------
 54   // Passed-in opts
 55   this.opts = null;
 56 
 57   // Base config, can be overridden by opts passed to `start`
 58   this.config = {
 59   // Default to prod
 60     environment: 'production'
 61   // Number of worker-processes to spawn
 62   , workers: 2
 63   // Port to listen on
 64   , port: 4000
 65   // Set stdout to debug log-level
 66   , debug: false
 67   // Use worker-process rotation
 68   , rotateWorkers: true
 69   // How long for a full rotation
 70   , rotationWindow: 2 * 60 * 60 * 1000
 71   // Default logfile location
 72   , logDir: process.cwd() + '/log'
 73   // How long to wait for in-flight requests before killing
 74   , gracefulRestartTimeout: 60000
 75   // Number of milliseconds old a heartbeat-timestamp can be
 76   // before killing the worker process
 77   , heartbeatWindow: 20000
 78   // Place to look for static content to serve in dev-mode
 79   , staticFilePath: process.cwd() + '/public'
 80   // Default session-settings -- setting to null will mean no sessions
 81   , sessions: {
 82       store: 'memory',
 83       key: 'sid',
 84       expiry: 14 * 24 * 60 * 60
 85     }
 86   // Key for when using Cookie session-store
 87   , cookieSessionKey: 'sdata'
 88   };
 89   // Registry of the worker-processes
 90   this.workers = {};
 91   // List of the worker-PIDs, in order
 92   this.workerPidList = [];
 93 
 94   // Metrics Server
 95   this.metricsServer = null;
 96 
 97   // Private functions
 98   // -------------
 99   var _readConfig = function () {
100         var opts = this.opts
101           , dir = process.cwd()
102         // TODO: make server configs reload on graceful restart?
103           , baseConfig = require(dir + '/config/environment.js')
104           , envConfig;
105 
106         this.config.environment = opts.environment || this.config.environment;
107         envConfig = require(dir + '/config/' + opts.environment + '.js');
108 
109         // Start with a blank slate, mix everything in
110         // TODO: Do in one go with recursive mixin
111         util.mixin(this.config, baseConfig);
112         util.mixin(this.config, envConfig);
113         util.mixin(this.config, opts);
114         if (this.config.workers < 2) {
115           this.config.rotateWorkers = false;
116         }
117       }
118   /**
119    * Creates the shared file-descriptor shared with the worker-processes
120    */
121     , _createFd = function () {
122         _fd = binding.socket('tcp4');
123         binding.bind(_fd, parseInt(this.config.port));
124         binding.listen(_fd, 128);
125       }
126 
127    /**
128     * Start the metrics server
129     */
130     , _doMetricsTracking = function () {
131         if (metrics) {
132           this.metricsServer = new metrics.Server(this.config.metricsPort || 9091);
133         }
134     }
135 
136   /**
137    * Creates the worker-processes that respond to proxy requests
138    */
139     , _createWorkerProcesses = function () {
140         var self = this
141           , configCount = this.config.workers
142           , currentCount = this.workerPidList.length
143           , needed = configCount - currentCount
144           , rotationWindow = this.config.rotationWindow
145           , staggerInterval = rotationWindow / needed
146           , retirement = (new Date()).getTime() + rotationWindow
147           , msg;
148         if (needed) {
149           msg = 'Creating ' + needed + ' worker process';
150           msg += needed > 1 ? 'es.' : '.';
151           this.stdoutLog.info(msg);
152           while (currentCount < configCount) {
153             currentCount++;
154             this.createWorkerProcess(retirement);
155             retirement -= staggerInterval;
156           }
157         }
158       }
159 
160     , _doProcessAccounting = function () {
161         var self = this
162           , workers = this.workers
163           , worker
164           , configCount = this.config.workers
165           , now = (new Date()).getTime()
166           , heartbeatWindow = this.config.heartbeatWindow;
167         for (var p in workers) {
168           worker = workers[p];
169           if (this.config.rotateWorkers && now > worker.retireAt) {
170             worker.retired = true;
171             worker.sendMessage({method: 'retire'});
172           }
173           if ((now - worker.heartbeatAt) > heartbeatWindow) {
174             this.stdoutLog.warning("No current heartbeat from " + worker.pid + ", killing process.");
175             this.kill(worker.pid);
176           }
177 178         }
179         if (!shutdownMode) {
180           _createWorkerProcesses.call(this);
181           setTimeout(function () {
182             _doProcessAccounting.call(self);
183           }, 5000);
184         }
185       }
186 
187     , _initLogging = function (callback) {
188         var self = this
189           , levelsByType
190           , stdoutLevel
191           , types = ['stdout', 'stderr', 'access']
192           , now = (new Date()).getTime()
193           , loggly
194           , dir = this.config.logDir
195           // Recursive function for rotating and initializing each of the log-types.
196           // Calls the passed-in callback when the entire process is done
197           , rotateAndInitByType = function () {
198               var type = types.shift()
199                 , cmd
200                 , next;
201               // Grab the next logger-type, if any
202               if (type) {
203                 // Rename the log file, ex.: mv logs/access.log logs/access.<TIMESTAMP>.log
204                 cmd = 'mv ' + dir + '/' + type + '.log ' + dir + '/' + type + '.' + now + '.log';
205                 // After the file is renmaed, create the new logger with the original filename
206                 // e.g., access.log
207                 next = function () {
208                   self[type + 'Log'] = new Log(levelsByType[type], fs.createWriteStream(dir +
209                       '/' + type + '.log'), true, loggly);
210                   // Go on to the next logger type until none are left
211                   rotateAndInitByType();
212                 };
213                 childProcess.exec(cmd, next);
214               }
215               // No logger-types left, continue on with the main init process
216               else {
217                 callback();
218               }
219             };
220 
221         // Set the logging level for stdout
222         if (this.config.debug) {
223           stdoutLevel = Log.DEBUG;
224         }
225         else if (this.config.logLevel) {
226           stdoutLevel = Log[this.config.logLevel]();
227         }
228         else {
229           stdoutLevel = Log.INFO; // Default to info
230         }
231 
232         // Now that we have the desired level for stdout, set up log levels by key
233         levelsByType = {
234           access: 'access'
235         , stderr: 'error'
236         , stdout: stdoutLevel
237         };
238 
239         // Create the log directory if it doesn't exist
240         childProcess.exec('mkdir -p ' + dir, function (err) {
241           if (err) {
242             throw err;
243           }
244         });
245 
246         // Kick the log-init process off
247         rotateAndInitByType();
248       };
249 
250   // Public methods
251   // -------------
252   /**
253    * Starts up the proxy server -- creates the shared FD, and the
254    * worker-processes
255    */
256   this.start = function (restart) {
257     var self = this
258       , msg = restart ? 'restarting' : 'starting';
259 
260     _readConfig.call(this);
261 
262     if (this.config.metricsPort) {
263       _doMetricsTracking.call(self);
264     }
265 
266     if (!restart) {
267       _createFd.call(self);
268     }
269 
270     _initLogging.call(this, function () {
271       self.stdoutLog.info('Server ' + msg + ' with config: ' + JSON.stringify(self.config));
272       _doProcessAccounting.call(self);
273     });
274   };
275 
276   this.restart = function () {
277     restartMode = false;
278     shutdownMode = false;
279     this.start(true);
280   };
281 
282   this.emergency = function (msg) {
283     fs.writeFileSync(this.config.logDir + '/emergency.log', msg);
284   };
285 
286   this.createWorkerProcess = function (dt) {
287     var retireAt = dt || (new Date()).getTime() + this.config.rotationWindow
288       , worker = new server.WorkerProcess(retireAt);
289     // Pass the shared FD to the worker-processes
290     worker.init(_fd);
291     // Regsiter the worker-process and record the PID
292     this.workers[worker.pid] = worker;
293     this.workerPidList.push(worker.pid);
294   };
295 
296 
297   this.sendShutdownToWorkers = function () {
298     var pid
299       , pidList = this.workerPidList
300       , workers = this.workers
301       , worker;
302     for (var i = 0, ii = pidList.length; i < ii; i++) {
303       pid = pidList[i];
304       worker = workers[pid];
305       worker.sendMessage({method: 'shutdown'});
306     }
307   };
308 
309   this.shutdown = function (pid) {
310     var havePids = false;
311     for (var p in this.workers) {
312       havePids = true;
313     }
314     // Once all the child-processes are gone, and no more requests are
315     // in-flight, commit seppuku
316     if (!havePids) {
317       if (!restartMode) {
318         this.die("Exiting...");
319       }
320       this.restart();
321     }
322   };
323 
324   this.kill = function (pid) {
325     var pidList = this.workerPidList
326       , index;
327 
328     try { process.kill(pid); }
329     catch(e) {}
330 
331     delete this.workers[pid];
332     for (var i = 0, ii = pidList.length; i < ii; i++) {
333       if (pidList[i] == pid) {
334         index = i;
335       }
336     }
337     this.workerPidList.splice(index, 1);
338   };
339 
340   /**
341    * Prints out a message and ends the program.
342    * @param {String} str The message to print out before dying.
343    */
344   this.die = function (str) {
345     console.log(str);
346     process.exit();
347   };
348 
349 }();
350 
351 /**
352  * @constructor Worker-process that handles proxy requests
353  */
354 server.WorkerProcess = function (retireAt) {
355   this.retireAt = retireAt;
356   this.heartbeatAt = (new Date()).getTime();
357   this.process = null;
358   this.pid = null;
359   this.fd = null;
360   this.retired = false;
361   this.messageParser = null;
362 
363   // Create a pair of sockets that the master process and the
364   // child will use to communicate
365   // http://osr507doc.sco.com/en/netguide/dusockD.socketpairs_codetext.html
366   // Credits: Ext's Connect, http://github.com/extjs/Connect
367   var fds = binding.socketpair();
368 
369   // Spawn the child process
370   this.process = childProcess.spawn(
371     eval(server.config.worker_executable) || process.execPath,
372     //eval(server.config.worker_arguments) || [process.cwd() + '/app.js'],
373     [__dirname+ '/geddy.js'],
374     {customFds: [fds[1], -1, -1]}
375   );
376   this.pid = this.process.pid;
377 
378 
379   var self = this;
380   // Wait a moment, then patch child's stdin
381   if (!self.process.stdin) {
382     self.process.stdin = new net.Stream(fds[0], 'unix');
383   }
384 }
385 
386 server.WorkerProcess.prototype = new function () {
387   var _dispatch = {
388         retired: function (msg) {
389           this.commitSeppuku(true, 'retiring');
390         }
391       , shutdown: function (msg) {
392       console.log("IN SHUTDOWN - SERVER");
393           this.commitSeppuku(false, 'shutting down');
394           server.shutdown();
395         }
396       , log: function (msg) {
397           // If for some reason, there's no logType, assume debug?
398           var type = msg.logType || 'debug'
399           // access/error have their own loggers, everything else
400           // goes to stdoutLog
401             , logger = server[type + 'Log'] || server.stdoutLog;
402           logger[type](msg.message);
403         }
404       , heartbeat: function (msg) {
405           this.heartbeatAt = (new Date()).getTime();
406         }
407       , createMetric: function (msg) {
408           if (server.metricsServer) {
409             msg.type = msg.type[0].toUpperCase() + msg.type.substring(1)
410             server.metricsServer.addMetric(msg.eventType, new metrics[msg.type]);
411           }
412        }
413       , updateMetric: function (msg) {
414         if (server.metricsServer) {
415           var namespaces = msg.eventType.split('.')
416             , event = namespaces.pop()
417             , namespace = namespaces.join('.');
418           var metric = server.metricsServer.trackedMetrics[namespace][event];
419           metric[msg.metricMethod].apply(metric, msg.metricArgs);
420         }
421       }
422       , error: function (msg) {
423         server.stderrLog.error('Caught error-message from child-process.');
424         server.stderrLog.error(msg.data.stack);
425         this.commitSeppuku(false, 'killing');
426       }
427    };
428 
429    this.commitSeppuku = function (respectShutdown, msg) {
430      if (respectShutdown && shutdownMode) {
431        return;
432      }
433      var child = this.process;
434      server.stdoutLog.warning(msg + ' ' + child.pid);
435      server.kill(child.pid);
436    };
437 
438   /**
439    * @param {Number} fd pointer to the shared file descriptor
440    * to pass to the HTTP server in the worker-process
441    */
442   this.init = function (fd) {
443     // Pass a dummy config and fd via the child's stdin
444     this.fd = fd;
445     var msg = {
446       method: 'config'
447     , data: server.config
448     };
449     this.messageParser = new util.MessageParser(_dispatch, this, function (notParsed) {
450       process.stdout.write(notParsed + '\n');
451     });
452     this.sendMessage(msg);
453     this.addListeners();
454   };
455 
456   this.addListeners = function () {
457     var self = this
458       , child = this.process;
459 
460     child.stdout.addListener('data', function (d) {
461       self.receiveMessage(d);
462     });
463 
464     child.stderr.addListener('data', function (d) {
465       var data = d.toString();
466       if (data.indexOf('###shutdown###') > -1) {
467         process.kill(child.pid);
468         process.exit();
469       }
470       else {
471         process.stdout.write(data);
472       }
473     });
474 
475     child.addListener('exit', function (code) {
476       self.fixBrokenWorker();
477     });
478 
479   };
480 
481   this.fixBrokenWorker = function () {
482      if (!this.retired) {
483        server.stdoutLog.warning("Worker " + this.pid + " died without being retired.");
484        server.kill(this.pid);
485      }
486   };
487 
488   this.receiveMessage = function (d) {
489     this.messageParser.handle(d);
490   };
491 
492   this.sendMessage = function (msg) {
493     try {
494       var output = JSON.stringify(msg);
495       this.process.stdin.write(output + '\n', 'ascii', this.fd);
496     } catch (err) {
497       this.fixBrokenWorker();
498     }
499   };
500 
501 }();
502