Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor]: worker as a separate entry point for cluster primary #835

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 86 additions & 45 deletions packages/fastboot-app-server/src/fastboot-app-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
const assert = require('assert');
const cluster = require('cluster');
const os = require('os');

const Worker = require('./worker');
const path = require('path');
const serialize = require('./utils/serialization').serialize;

class FastBootAppServer {
constructor(options) {
Expand All @@ -21,11 +21,10 @@ class FastBootAppServer {
this.username = options.username;
this.password = options.password;
this.httpServer = options.httpServer;
this.beforeMiddleware = options.beforeMiddleware;
this.afterMiddleware = options.afterMiddleware;
this.buildSandboxGlobals = options.buildSandboxGlobals;
this.chunkedResponse = options.chunkedResponse;
this.log = options.log;
this.workerPath = options.workerPath;

if (!this.ui) {
let UI = require('./ui');
Expand All @@ -34,37 +33,17 @@ class FastBootAppServer {

this.propagateUI();

if (cluster.isWorker) {
this.worker = new Worker({
ui: this.ui,
distPath: this.distPath || process.env.FASTBOOT_DIST_PATH,
cache: this.cache,
gzip: this.gzip,
host: this.host,
port: this.port,
username: this.username,
password: this.password,
httpServer: this.httpServer,
beforeMiddleware: this.beforeMiddleware,
afterMiddleware: this.afterMiddleware,
buildSandboxGlobals: this.buildSandboxGlobals,
chunkedResponse: this.chunkedResponse,
});
this.workerCount = options.workerCount ||
(process.env.NODE_ENV === 'test' ? 1 : null) ||
os.cpus().length;

this.worker.start();
} else {
this.workerCount = options.workerCount ||
(process.env.NODE_ENV === 'test' ? 1 : null) ||
os.cpus().length;
this._clusterInitialized = false;

assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option.");
assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both.");
}
assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option.");
assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both.");
}

start() {
if (cluster.isWorker) { return; }

return this.initializeApp()
.then(() => this.subscribeToNotifier())
.then(() => this.forkWorkers())
Expand All @@ -75,6 +54,9 @@ class FastBootAppServer {
})
.catch(err => {
this.ui.writeLine(err.stack);
})
.finally(() => {
this._clusterInitialized = true;
});
}

Expand Down Expand Up @@ -138,6 +120,12 @@ class FastBootAppServer {
}
}

/**
* send message to worker
*
* @method broadcast
* @param {Object} message
*/
broadcast(message) {
let workers = cluster.workers;

Expand All @@ -153,6 +141,10 @@ class FastBootAppServer {
forkWorkers() {
let promises = [];

// https://nodejs.org/api/cluster.html#cluster_cluster_setupprimary_settings
// Note: cluster.setupPrimary in v16.0.0
cluster.setupMaster(this.clusterSetupPrimary());

for (let i = 0; i < this.workerCount; i++) {
promises.push(this.forkWorker());
}
Expand All @@ -161,31 +153,53 @@ class FastBootAppServer {
}

forkWorker() {
let env = this.buildWorkerEnv();
let worker = cluster.fork(env);
let worker = cluster.fork(this.buildWorkerEnv());

this.ui.writeLine(`forked worker ${worker.process.pid}`);
this.ui.writeLine(`Worker ${worker.process.pid} forked`);

let firstBootResolve;
let firstBootReject;
const firstBootPromise = new Promise((resolve, reject) => {
firstBootResolve = resolve;
firstBootReject = reject;
});

if (this._clusterInitialized) {
firstBootResolve();
}

worker.on('online', () => {
this.ui.writeLine(`Worker ${worker.process.pid} online.`);
});

worker.on('message', (message) => {
if (message.event === 'http-online') {
this.ui.writeLine(`Worker ${worker.process.pid} healthy.`);
firstBootResolve();
}
});

worker.on('exit', (code, signal) => {
let error;
if (signal) {
this.ui.writeLine(`worker was killed by signal: ${signal}`);
error = new Error(`Worker ${worker.process.pid} killed by signal: ${signal}`);
} else if (code !== 0) {
this.ui.writeLine(`worker exited with error code: ${code}`);
error = new Error(`Worker ${worker.process.pid} exited with error code: ${code}`);
} else {
this.ui.writeLine(`worker exited`);
error = new Error(`Worker ${worker.process.pid} exited gracefully. It should only exit when told to do so.`);
}

this.forkWorker();
if (!this._clusterInitialized) {
// Do not respawn for a failed first launch.
firstBootReject(error);
} else {
// Do respawn if you've ever successfully been initialized.
this.ui.writeLine(error);
this.forkWorker();
}
});

return new Promise(resolve => {
this.ui.writeLine('worker online');
worker.on('message', message => {
if (message.event === 'http-online') {
resolve();
}
});
});
return firstBootPromise;
}

buildWorkerEnv() {
Expand All @@ -198,6 +212,33 @@ class FastBootAppServer {
return env;
}

/**
* Extension point to allow configuring the default fork configuration.
*
* @method clusterSetupPrimary
* @returns {Object}
* @public
*/
clusterSetupPrimary() {
const workerOptions = {
distPath: this.distPath || process.env.FASTBOOT_DIST_PATH,
cache: this.cache,
gzip: this.gzip,
host: this.host,
port: this.port,
username: this.username,
password: this.password,
httpServer: this.httpServer,
buildSandboxGlobals: this.buildSandboxGlobals,
chunkedResponse: this.chunkedResponse,
};

const workerPath = this.workerPath || path.join(__dirname, './worker-start.js');
return {
exec: workerPath,
args: [serialize(workerOptions)]
};
}
}

module.exports = FastBootAppServer;
38 changes: 38 additions & 0 deletions packages/fastboot-app-server/src/utils/serialization.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';

/**
* The purpose of this module is to provide a serialization layer for passing arguments to a new Worker instance
* This allows us to completely separate the cluster worker from the cluster primary
*/

function circularReplacer() {
const seen = new WeakSet();

return (key, value) => {
if (typeof value === 'object' && value !== null) {
if (seen.has(value)) {
return;
}

seen.add(value);
}

return value;
}
}

function serialize(object) {
let data = encodeURIComponent(JSON.stringify(object, circularReplacer()));
let buff = new Buffer.from(data);
return buff.toString('base64');
}

function deserialize(string) {
let buff = new Buffer.from(string, 'base64');
return JSON.parse(decodeURIComponent(buff.toString('ascii')));
}

module.exports = {
serialize,
deserialize
};
10 changes: 10 additions & 0 deletions packages/fastboot-app-server/src/worker-start.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
'use strict';

// This file is where you can configure
// - distPath, host, port,
// - httpServer
// - Middleware order
const ClusterWorker = require('./worker');
const worker = new ClusterWorker();

worker.start();
16 changes: 14 additions & 2 deletions packages/fastboot-app-server/src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
const FastBoot = require('fastboot');
const fastbootMiddleware = require('fastboot-express-middleware');
const ExpressHTTPServer = require('./express-http-server');
const deserialize = require('./utils/serialization').deserialize;
const UI = require('./ui');

class Worker {
constructor(options) {
constructor(argOptions) {
this.forkOptions = deserialize(process.argv[2])
// Define the enumerated options set.
// Combination of any launch options and any directly passed options.
const options = Object.assign({}, this.forkOptions, argOptions);

this.ui = new UI();
this.distPath = options.distPath;
this.httpServer = options.httpServer;
this.ui = options.ui;
this.cache = options.cache;
this.gzip = options.gzip;
this.host = options.host;
Expand Down Expand Up @@ -57,6 +64,11 @@ class Worker {
process.on('message', message => this.handleMessage(message));
}

/**
* received messages from primary
* @method handleMessage
* @param {Object} message
*/
handleMessage(message) {
switch (message.event) {
case 'reload':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,9 @@
var path = require('path');
const FastBootAppServer = require('../../src/fastboot-app-server');

function setXTestHeader(err, req, res, next) {
res.set('x-test-header', 'testing')
next();
}

var server = new FastBootAppServer({
distPath: path.resolve(__dirname, './broken-app'),
afterMiddleware: function (app) {
app.use(setXTestHeader);
},
workerPath: path.resolve(__dirname, './cluster-worker-start'),
resilient: true,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,9 @@
var path = require('path');
const FastBootAppServer = require('../../src/fastboot-app-server');

function setStatusCode418(req, res, next) {
res.status(418);
next();
}

function setXTestHeader(req, res, next) {
res.set('X-Test-Header', 'testing')
next();
}

function sendJsonAndTerminate(req, res, next) {
res.json({ send: 'json back' });
res.send();
}

var server = new FastBootAppServer({
distPath: path.resolve(__dirname, './basic-app'),
beforeMiddleware: function (app) {
app.use(setStatusCode418);
app.use(setXTestHeader);
app.use(sendJsonAndTerminate);
}
workerPath: path.resolve(__dirname, './cluster-worker-start'),
});

server.start();
37 changes: 37 additions & 0 deletions packages/fastboot-app-server/test/fixtures/cluster-worker-start.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'use strict';

const ClusterWorker = require('../../src/worker');

class CustomClusterWorker extends ClusterWorker {}

function setStatusCode418(req, res, next) {
res.status(418);
next();
}

function setXTestHeader(req, res, next) {
res.set('X-Test-Header', 'testing')
next();
}

function sendJsonAndTerminate(req, res, next) {
res.json({ send: 'json back' });
res.send();
}

function beforeMiddleware(app) {
app.use(setStatusCode418);
app.use(setXTestHeader);
app.use(sendJsonAndTerminate);
}

function afterMiddleware(app) {
app.use(setXTestHeader);
}

const worker = new CustomClusterWorker({
beforeMiddleware,
afterMiddleware,
});

worker.start();