Skip to content

Commit

Permalink
[Refactor]: worker as a separate entry point for cluster primary
Browse files Browse the repository at this point in the history
  • Loading branch information
snewcomer committed Jun 1, 2021
1 parent 0d11ef2 commit 714c0e2
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 45 deletions.
131 changes: 88 additions & 43 deletions packages/fastboot-app-server/src/fastboot-app-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
const assert = require('assert');
const cluster = require('cluster');
const os = require('os');

const Worker = require('./worker');
const path = require('path');
const serialize = require('./utils/serialization').serialize;

class FastBootAppServer {
constructor(options) {
Expand Down Expand Up @@ -33,37 +33,17 @@ class FastBootAppServer {

this.propagateUI();

if (cluster.isWorker) {
this.worker = new Worker({
ui: this.ui,
distPath: this.distPath || process.env.FASTBOOT_DIST_PATH,
cache: this.cache,
gzip: this.gzip,
host: this.host,
port: this.port,
username: this.username,
password: this.password,
httpServer: this.httpServer,
beforeMiddleware: this.beforeMiddleware,
afterMiddleware: this.afterMiddleware,
buildSandboxGlobals: this.buildSandboxGlobals,
chunkedResponse: this.chunkedResponse,
});
this.workerCount = options.workerCount ||
(process.env.NODE_ENV === 'test' ? 1 : null) ||
os.cpus().length;

this.worker.start();
} else {
this.workerCount = options.workerCount ||
(process.env.NODE_ENV === 'test' ? 1 : null) ||
os.cpus().length;
this._clusterInitialized = false;

assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option.");
assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both.");
}
assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option.");
assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both.");
}

start() {
if (cluster.isWorker) { return; }

return this.initializeApp()
.then(() => this.subscribeToNotifier())
.then(() => this.forkWorkers())
Expand All @@ -74,6 +54,9 @@ class FastBootAppServer {
})
.catch(err => {
this.ui.writeLine(err.stack);
})
.finally(() => {
this._clusterInitialized = true;
});
}

Expand Down Expand Up @@ -137,6 +120,12 @@ class FastBootAppServer {
}
}

/**
* send message to worker
*
* @method broadcast
* @param {Object} message
*/
broadcast(message) {
let workers = cluster.workers;

Expand All @@ -152,6 +141,10 @@ class FastBootAppServer {
forkWorkers() {
let promises = [];

// https://nodejs.org/api/cluster.html#cluster_cluster_setupprimary_settings
// Note: cluster.setupPrimary in v16.0.0
cluster.setupMaster(this.clusterSetupPrimary());

for (let i = 0; i < this.workerCount; i++) {
promises.push(this.forkWorker());
}
Expand All @@ -160,31 +153,53 @@ class FastBootAppServer {
}

forkWorker() {
let env = this.buildWorkerEnv();
let worker = cluster.fork(env);
let worker = cluster.fork(this.buildWorkerEnv());

this.ui.writeLine(`forked worker ${worker.process.pid}`);
this.ui.writeLine(`Worker ${worker.process.pid} forked`);

let firstBootResolve;
let firstBootReject;
const firstBootPromise = new Promise((resolve, reject) => {
firstBootResolve = resolve;
firstBootReject = reject;
});

if (this._clusterInitialized) {
firstBootResolve();
}

worker.on('online', () => {
this.ui.writeLine(`Worker ${worker.process.pid} online.`);
});

worker.on('message', (message) => {
if (message.event === 'http-online') {
this.ui.writeLine(`Worker ${worker.process.pid} healthy.`);
firstBootResolve();
}
});

worker.on('exit', (code, signal) => {
let error;
if (signal) {
this.ui.writeLine(`worker was killed by signal: ${signal}`);
error = new Error(`Worker ${worker.process.pid} killed by signal: ${signal}`);
} else if (code !== 0) {
this.ui.writeLine(`worker exited with error code: ${code}`);
error = new Error(`Worker ${worker.process.pid} exited with error code: ${code}`);
} else {
this.ui.writeLine(`worker exited`);
error = new Error(`Worker ${worker.process.pid} exited gracefully. It should only exit when told to do so.`);
}

this.forkWorker();
if (!this._clusterInitialized) {
// Do not respawn for a failed first launch.
firstBootReject(error);
} else {
// Do respawn if you've ever successfully been initialized.
this.ui.writeLine(error);
this.forkWorker();
}
});

return new Promise(resolve => {
this.ui.writeLine('worker online');
worker.on('message', message => {
if (message.event === 'http-online') {
resolve();
}
});
});
return firstBootPromise;
}

buildWorkerEnv() {
Expand All @@ -197,6 +212,36 @@ class FastBootAppServer {
return env;
}

/**
* Extension point to allow configuring the default fork configuration.
*
* @method clusterSetupPrimary
* @returns {Object}
* @public
*/
clusterSetupPrimary() {
const workerOptions = {
ui: this.ui,
distPath: this.distPath || process.env.FASTBOOT_DIST_PATH,
cache: this.cache,
gzip: this.gzip,
host: this.host,
port: this.port,
username: this.username,
password: this.password,
httpServer: this.httpServer,
beforeMiddleware: this.beforeMiddleware,
afterMiddleware: this.afterMiddleware,
buildSandboxGlobals: this.buildSandboxGlobals,
chunkedResponse: this.chunkedResponse,
};

const workerPath = this.workerPath || path.join(__dirname, './worker-start.js');
return {
exec: workerPath,
args: [serialize(workerOptions)]
};
}
}

module.exports = FastBootAppServer;
38 changes: 38 additions & 0 deletions packages/fastboot-app-server/src/utils/serialization.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';

/**
* The purpose of this module is to provide a serialization layer for passing arguments to a new Worker instance
* This allows us to completely separate the cluster worker from the cluster primary
*/

function circularReplacer() {
const seen = new WeakSet();

return (key, value) => {
if (typeof value === 'object' && value !== null) {
if (seen.has(value)) {
return;
}

seen.add(value);
}

return value;
}
}

function serialize(object) {
let data = encodeURIComponent(JSON.stringify(object, circularReplacer()));
let buff = new Buffer.from(data);
return buff.toString('base64');
}

function deserialize(string) {
let buff = new Buffer.from(string, 'base64');
return JSON.parse(decodeURIComponent(buff.toString('ascii')));
}

module.exports = {
serialize,
deserialize
};
6 changes: 6 additions & 0 deletions packages/fastboot-app-server/src/worker-start.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'use strict';

const ClusterWorker = require('./worker');
const worker = new ClusterWorker();

worker.start();
16 changes: 14 additions & 2 deletions packages/fastboot-app-server/src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
const FastBoot = require('fastboot');
const fastbootMiddleware = require('fastboot-express-middleware');
const ExpressHTTPServer = require('./express-http-server');
const deserialize = require('./utils/serialization').deserialize;
const UI = require('./ui');

class Worker {
constructor(options) {
constructor(argOptions) {
this.forkOptions = deserialize(process.argv[2])
// Define the enumerated options set.
// Combination of any launch options and any directly passed options.
const options = Object.assign({}, this.forkOptions, argOptions);

this.ui = new UI();
this.distPath = options.distPath;
this.httpServer = options.httpServer;
this.ui = options.ui;
this.cache = options.cache;
this.gzip = options.gzip;
this.host = options.host;
Expand Down Expand Up @@ -56,6 +63,11 @@ class Worker {
process.on('message', message => this.handleMessage(message));
}

/**
* received messages from primary
* @method handleMessage
* @param {Object} message
*/
handleMessage(message) {
switch (message.event) {
case 'reload':
Expand Down

0 comments on commit 714c0e2

Please sign in to comment.