From 714c0e274677656026753494ae7ab6c200c989a4 Mon Sep 17 00:00:00 2001 From: snewcomer Date: Tue, 1 Jun 2021 08:45:58 -0500 Subject: [PATCH] [Refactor]: worker as a separate entry point for cluster primary --- .../src/fastboot-app-server.js | 131 ++++++++++++------ .../src/utils/serialization.js | 38 +++++ .../fastboot-app-server/src/worker-start.js | 6 + packages/fastboot-app-server/src/worker.js | 16 ++- 4 files changed, 146 insertions(+), 45 deletions(-) create mode 100644 packages/fastboot-app-server/src/utils/serialization.js create mode 100644 packages/fastboot-app-server/src/worker-start.js diff --git a/packages/fastboot-app-server/src/fastboot-app-server.js b/packages/fastboot-app-server/src/fastboot-app-server.js index 36802e634..8e0cebd0e 100644 --- a/packages/fastboot-app-server/src/fastboot-app-server.js +++ b/packages/fastboot-app-server/src/fastboot-app-server.js @@ -3,8 +3,8 @@ const assert = require('assert'); const cluster = require('cluster'); const os = require('os'); - -const Worker = require('./worker'); +const path = require('path'); +const serialize = require('./utils/serialization').serialize; class FastBootAppServer { constructor(options) { @@ -33,37 +33,17 @@ class FastBootAppServer { this.propagateUI(); - if (cluster.isWorker) { - this.worker = new Worker({ - ui: this.ui, - distPath: this.distPath || process.env.FASTBOOT_DIST_PATH, - cache: this.cache, - gzip: this.gzip, - host: this.host, - port: this.port, - username: this.username, - password: this.password, - httpServer: this.httpServer, - beforeMiddleware: this.beforeMiddleware, - afterMiddleware: this.afterMiddleware, - buildSandboxGlobals: this.buildSandboxGlobals, - chunkedResponse: this.chunkedResponse, - }); + this.workerCount = options.workerCount || + (process.env.NODE_ENV === 'test' ? 1 : null) || + os.cpus().length; - this.worker.start(); - } else { - this.workerCount = options.workerCount || - (process.env.NODE_ENV === 'test' ? 1 : null) || - os.cpus().length; + this._clusterInitialized = false; - assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option."); - assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both."); - } + assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option."); + assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both."); } start() { - if (cluster.isWorker) { return; } - return this.initializeApp() .then(() => this.subscribeToNotifier()) .then(() => this.forkWorkers()) @@ -74,6 +54,9 @@ class FastBootAppServer { }) .catch(err => { this.ui.writeLine(err.stack); + }) + .finally(() => { + this._clusterInitialized = true; }); } @@ -137,6 +120,12 @@ class FastBootAppServer { } } + /** + * send message to worker + * + * @method broadcast + * @param {Object} message + */ broadcast(message) { let workers = cluster.workers; @@ -152,6 +141,10 @@ class FastBootAppServer { forkWorkers() { let promises = []; + // https://nodejs.org/api/cluster.html#cluster_cluster_setupprimary_settings + // Note: cluster.setupPrimary in v16.0.0 + cluster.setupMaster(this.clusterSetupPrimary()); + for (let i = 0; i < this.workerCount; i++) { promises.push(this.forkWorker()); } @@ -160,31 +153,53 @@ class FastBootAppServer { } forkWorker() { - let env = this.buildWorkerEnv(); - let worker = cluster.fork(env); + let worker = cluster.fork(this.buildWorkerEnv()); - this.ui.writeLine(`forked worker ${worker.process.pid}`); + this.ui.writeLine(`Worker ${worker.process.pid} forked`); + + let firstBootResolve; + let firstBootReject; + const firstBootPromise = new Promise((resolve, reject) => { + firstBootResolve = resolve; + firstBootReject = reject; + }); + + if (this._clusterInitialized) { + firstBootResolve(); + } + + worker.on('online', () => { + this.ui.writeLine(`Worker ${worker.process.pid} online.`); + }); + + worker.on('message', (message) => { + if (message.event === 'http-online') { + this.ui.writeLine(`Worker ${worker.process.pid} healthy.`); + firstBootResolve(); + } + }); worker.on('exit', (code, signal) => { + let error; if (signal) { - this.ui.writeLine(`worker was killed by signal: ${signal}`); + error = new Error(`Worker ${worker.process.pid} killed by signal: ${signal}`); } else if (code !== 0) { - this.ui.writeLine(`worker exited with error code: ${code}`); + error = new Error(`Worker ${worker.process.pid} exited with error code: ${code}`); } else { - this.ui.writeLine(`worker exited`); + error = new Error(`Worker ${worker.process.pid} exited gracefully. It should only exit when told to do so.`); } - this.forkWorker(); + if (!this._clusterInitialized) { + // Do not respawn for a failed first launch. + firstBootReject(error); + } else { + // Do respawn if you've ever successfully been initialized. + this.ui.writeLine(error); + this.forkWorker(); + } }); - return new Promise(resolve => { - this.ui.writeLine('worker online'); - worker.on('message', message => { - if (message.event === 'http-online') { - resolve(); - } - }); - }); + return firstBootPromise; } buildWorkerEnv() { @@ -197,6 +212,36 @@ class FastBootAppServer { return env; } + /** + * Extension point to allow configuring the default fork configuration. + * + * @method clusterSetupPrimary + * @returns {Object} + * @public + */ + clusterSetupPrimary() { + const workerOptions = { + ui: this.ui, + distPath: this.distPath || process.env.FASTBOOT_DIST_PATH, + cache: this.cache, + gzip: this.gzip, + host: this.host, + port: this.port, + username: this.username, + password: this.password, + httpServer: this.httpServer, + beforeMiddleware: this.beforeMiddleware, + afterMiddleware: this.afterMiddleware, + buildSandboxGlobals: this.buildSandboxGlobals, + chunkedResponse: this.chunkedResponse, + }; + + const workerPath = this.workerPath || path.join(__dirname, './worker-start.js'); + return { + exec: workerPath, + args: [serialize(workerOptions)] + }; + } } module.exports = FastBootAppServer; diff --git a/packages/fastboot-app-server/src/utils/serialization.js b/packages/fastboot-app-server/src/utils/serialization.js new file mode 100644 index 000000000..61a02b4ef --- /dev/null +++ b/packages/fastboot-app-server/src/utils/serialization.js @@ -0,0 +1,38 @@ +'use strict'; + +/** + * The purpose of this module is to provide a serialization layer for passing arguments to a new Worker instance + * This allows us to completely separate the cluster worker from the cluster primary + */ + +function circularReplacer() { + const seen = new WeakSet(); + + return (key, value) => { + if (typeof value === 'object' && value !== null) { + if (seen.has(value)) { + return; + } + + seen.add(value); + } + + return value; + } +} + +function serialize(object) { + let data = encodeURIComponent(JSON.stringify(object, circularReplacer())); + let buff = new Buffer.from(data); + return buff.toString('base64'); +} + +function deserialize(string) { + let buff = new Buffer.from(string, 'base64'); + return JSON.parse(decodeURIComponent(buff.toString('ascii'))); +} + +module.exports = { + serialize, + deserialize +}; diff --git a/packages/fastboot-app-server/src/worker-start.js b/packages/fastboot-app-server/src/worker-start.js new file mode 100644 index 000000000..b81d23c71 --- /dev/null +++ b/packages/fastboot-app-server/src/worker-start.js @@ -0,0 +1,6 @@ +'use strict'; + +const ClusterWorker = require('./worker'); +const worker = new ClusterWorker(); + +worker.start(); diff --git a/packages/fastboot-app-server/src/worker.js b/packages/fastboot-app-server/src/worker.js index 81b21c2b8..7640b9fef 100644 --- a/packages/fastboot-app-server/src/worker.js +++ b/packages/fastboot-app-server/src/worker.js @@ -3,12 +3,19 @@ const FastBoot = require('fastboot'); const fastbootMiddleware = require('fastboot-express-middleware'); const ExpressHTTPServer = require('./express-http-server'); +const deserialize = require('./utils/serialization').deserialize; +const UI = require('./ui'); class Worker { - constructor(options) { + constructor(argOptions) { + this.forkOptions = deserialize(process.argv[2]) + // Define the enumerated options set. + // Combination of any launch options and any directly passed options. + const options = Object.assign({}, this.forkOptions, argOptions); + + this.ui = new UI(); this.distPath = options.distPath; this.httpServer = options.httpServer; - this.ui = options.ui; this.cache = options.cache; this.gzip = options.gzip; this.host = options.host; @@ -56,6 +63,11 @@ class Worker { process.on('message', message => this.handleMessage(message)); } + /** + * received messages from primary + * @method handleMessage + * @param {Object} message + */ handleMessage(message) { switch (message.event) { case 'reload':