Skip to content

Commit

Permalink
Added experimental background job queue (#20985)
Browse files Browse the repository at this point in the history
ref https://linear.app/tryghost/issue/ENG-1556/
- added background job queue behind config flags
- when enabled, is only used for the member email analytics updates in
order to speed up the parent job, and take load off of the main process
that is serving requests

The intent here is to decouple certain code paths from the main process where it is unnecessary, or worse, where it's part of the request. Primary use cases are email analytics (particularly the member stats [open rate]) which are not particularly helpful in the period immediately following an email send, while the click traffic and delivered/opened events are.

Related, the email link clicks themselves send off a cascade of events that are quite a burden on the main process currently and are somewhat tied to the request response when they needn't be. We'll be looking to tackle that after some initial testing with the email analytics job.
  • Loading branch information
9larsons authored Nov 4, 2024
1 parent 6e8e817 commit 88db66a
Show file tree
Hide file tree
Showing 24 changed files with 1,213 additions and 81 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const {combineNonTransactionalMigrations, createAddColumnMigration} = require('../../utils');

module.exports = combineNonTransactionalMigrations(
createAddColumnMigration('jobs', 'metadata', {
type: 'string',
maxlength: 2000,
nullable: true
}),
createAddColumnMigration('jobs', 'queue_entry', {
type: 'integer',
nullable: true,
unsigned: true
})
);
4 changes: 3 additions & 1 deletion ghost/core/core/server/data/schema/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,9 @@ module.exports = {
started_at: {type: 'dateTime', nullable: true},
finished_at: {type: 'dateTime', nullable: true},
created_at: {type: 'dateTime', nullable: false},
updated_at: {type: 'dateTime', nullable: true}
updated_at: {type: 'dateTime', nullable: true},
metadata: {type: 'string', maxlength: 2000, nullable: true},
queue_entry: {type: 'integer', nullable: true, unsigned: true}
},
redirects: {
id: {type: 'string', maxlength: 24, nullable: false, primary: true},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const logging = require('@tryghost/logging');
const JobManager = require('../../services/jobs');
const path = require('path');

class EmailAnalyticsServiceWrapper {
init() {
Expand All @@ -11,7 +13,7 @@ class EmailAnalyticsServiceWrapper {
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const {EmailRecipientFailure, EmailSpamComplaintEvent, Email} = require('../../models');
const StartEmailAnalyticsJobEvent = require('./events/StartEmailAnalyticsJobEvent');

const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const settings = require('../../../shared/settings-cache');
Expand Down Expand Up @@ -47,14 +49,28 @@ class EmailAnalyticsServiceWrapper {
providers: [
new MailgunProvider({config, settings})
],
queries
queries,
domainEvents
});

// We currently cannot trigger a non-offloaded job from the job manager
// So the email analytics jobs simply emits an event.
domainEvents.subscribe(StartEmailAnalyticsJobEvent, async () => {
await this.startFetch();
});

domainEvents.subscribe(MemberEmailAnalyticsUpdateEvent, async (event) => {
const memberId = event.data.memberId;
await JobManager.addQueuedJob({
name: `update-member-email-analytics-${memberId}`,
metadata: {
job: path.resolve(__dirname, 'jobs/update-member-email-analytics'),
data: {
memberId
}
}
});
});
}

async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const queries = require('../../lib/queries');

/**
* Updates email analytics for a specific member
*
* @param {Object} options - The options object
* @param {string} options.memberId - The ID of the member to update analytics for
* @returns {Promise<Object>} The result of the aggregation query (1/0)
*/
module.exports = async function updateMemberEmailAnalytics({memberId}) {
const result = await queries.aggregateMemberStats(memberId);
return result;
};
5 changes: 3 additions & 2 deletions ghost/core/core/server/services/jobs/job-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const logging = require('@tryghost/logging');
const models = require('../../models');
const sentry = require('../../../shared/sentry');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');

const errorHandler = (error, workerMeta) => {
logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`);
Expand All @@ -24,7 +25,7 @@ const workerMessageHandler = ({name, message}) => {
const initTestMode = () => {
// Output job queue length every 5 seconds
setInterval(() => {
logging.warn(`${jobManager.queue.length()} jobs in the queue. Idle: ${jobManager.queue.idle()}`);
logging.warn(`${jobManager.inlineQueue.length()} jobs in the queue. Idle: ${jobManager.inlineQueue.idle()}`);

const runningScheduledjobs = Object.keys(jobManager.bree.workers);
if (Object.keys(jobManager.bree.workers).length) {
Expand All @@ -42,7 +43,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config});

module.exports = jobManager;
module.exports.initTestMode = initTestMode;
4 changes: 2 additions & 2 deletions ghost/core/core/server/services/mentions-jobs/job-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const workerMessageHandler = ({name, message}) => {
const initTestMode = () => {
// Output job queue length every 5 seconds
setInterval(() => {
logging.warn(`${jobManager.queue.length()} jobs in the queue. Idle: ${jobManager.queue.idle()}`);
logging.warn(`${jobManager.inlineQueue.length()} jobs in the queue. Idle: ${jobManager.inlineQueue.idle()}`);

const runningScheduledjobs = Object.keys(jobManager.bree.workers);
if (Object.keys(jobManager.bree.workers).length) {
Expand All @@ -42,7 +42,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, isDuplicate: true});

module.exports = jobManager;
module.exports.initTestMode = initTestMode;
87 changes: 87 additions & 0 deletions ghost/core/test/integration/jobs/job-queue.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
const assert = require('assert/strict');
const path = require('path');
const configUtils = require('../../utils/configUtils');
const models = require('../../../core/server/models');
const testUtils = require('../../utils/');

// Helper function to wait for job completion
async function waitForJobCompletion(jobName, maxWaitTimeMs = 5000, checkIntervalMs = 50) {
return new Promise((resolve, reject) => {
const startTime = Date.now();
const intervalId = setInterval(async () => {
if (Date.now() - startTime >= maxWaitTimeMs) {
clearInterval(intervalId);
reject(new Error(`Job ${jobName} did not complete within ${maxWaitTimeMs}ms`));
}
const job = await models.Job.findOne({name: jobName});
if (!job) {
clearInterval(intervalId);
resolve();
}
}, checkIntervalMs);
});
}

describe('Job Queue', function () {
let jobService;
before(testUtils.setup('default')); // this generates the tables in the db
afterEach(async function () {
await configUtils.restore();
});

describe('enabled by config', function () {
beforeEach(async function () {
configUtils.set('services:jobs:queue:enabled', true);
jobService = require('../../../core/server/services/jobs/job-service');
});

it('should add and execute a job in the queue', async function () {
this.timeout(10000);
const job = {
name: `add-random-numbers-${Date.now()}`,
metadata: {
job: path.resolve(__dirname, './test-job.js'),
data: {}
}
};

// Add the job to the queue
const result = await jobService.addQueuedJob(job);
assert.ok(result);

// Wait for the job to complete
await waitForJobCompletion(job.name, 8000); // Increase wait time

// Check job status
const jobEntry = await models.Job.findOne({name: job.name});

// Verify that the job no longer exists in the queue
assert.equal(jobEntry, null);
});
});

describe('not enabled', function () {
beforeEach(async function () {
configUtils.set('services:jobs:queue:enabled', false);
jobService = require('../../../core/server/services/jobs/job-service');
});

it('should not add a job to the queue when disabled', async function () {
const job = {
name: `add-random-numbers-${Date.now()}`,
metadata: {
job: path.resolve(__dirname, './test-job.js'),
data: {}
}
};

// Attempt to add the job to the queue
const result = await jobService.addQueuedJob(job);
assert.equal(result, undefined);

// Verify that the job doesn't exist in the queue
const jobEntry = await models.Job.findOne({name: job.name});
assert.equal(jobEntry, null);
});
});
});
7 changes: 7 additions & 0 deletions ghost/core/test/integration/jobs/test-job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = function testJob() {
const num1 = Math.floor(Math.random() * 100);
const num2 = Math.floor(Math.random() * 100);
const result = num1 + num2;

return result;
};
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
const assert = require('assert/strict');
const http = require('http');
const path = require('path');

const models = require('../../../core/server/models');

models.init();

const testUtils = require('../../utils');
const jobService = require('../../../core/server/services/jobs/job-service');

const JOB_NAME = 'update-check';
Expand All @@ -14,6 +10,8 @@ const JOB_PATH = path.resolve(__dirname, '../../../core/server/run-update-check.
describe('Run Update Check', function () {
let mockUpdateServer;

before(testUtils.setup('default'));

afterEach(function () {
if (mockUpdateServer) {
mockUpdateServer.close();
Expand Down
2 changes: 1 addition & 1 deletion ghost/core/test/unit/server/data/schema/integrity.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const validateRouteSettings = require('../../../../../core/server/services/route
*/
describe('DB version integrity', function () {
// Only these variables should need updating
const currentSchemaHash = 'a4f016480ff73c6f52ee4c86482b45a7';
const currentSchemaHash = '1110f25f639c22135b9845c72f0be7ef';
const currentFixturesHash = '475f488105c390bb0018db90dce845f1';
const currentSettingsHash = '47a75e8898fab270174a0c905cb3e914';
const currentRoutesHash = '3d180d52c663d173a6be791ef411ed01';
Expand Down
4 changes: 3 additions & 1 deletion ghost/core/test/utils/fixture-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const knexMigrator = new KnexMigrator();
// Ghost Internals
const models = require('../../core/server/models');
const {fixtureManager} = require('../../core/server/data/schema/fixtures');
const emailAnalyticsService = require('../../core/server/services/email-analytics');
const permissions = require('../../core/server/services/permissions');
const settingsService = require('../../core/server/services/settings/settings-service');
const labsService = require('../../core/shared/labs');
Expand Down Expand Up @@ -630,6 +629,9 @@ const fixtures = {
},

insertEmailsAndRecipients: function insertEmailsAndRecipients(withFailed = false) {
// NOTE: This require results in the jobs service being loaded prematurely, which breaks any tests relevant to it.
// This MUST be done in here and not at the top of the file to prevent that from happening as test setup is being performed.
const emailAnalyticsService = require('../../core/server/services/email-analytics');
return sequence(_.cloneDeep(DataGenerator.forKnex.emails).map(email => () => {
return models.Email.add(email, context.internal);
})).then(function () {
Expand Down
1 change: 1 addition & 0 deletions ghost/core/test/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const _ = require('lodash');

// Ghost Internals
const models = require('../../core/server/models');
models.init();

// Other Test Utilities
const e2eUtils = require('./e2e-utils');
Expand Down
12 changes: 10 additions & 2 deletions ghost/email-analytics-service/lib/EmailAnalyticsService.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const EventProcessingResult = require('./EventProcessingResult');
const logging = require('@tryghost/logging');
const errors = require('@tryghost/errors');
const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');

/**
* @typedef {import('@tryghost/email-service').EmailEventProcessor} EmailEventProcessor
Expand Down Expand Up @@ -73,13 +74,15 @@ module.exports = class EmailAnalyticsService {
* @param {object} dependencies.queries
* @param {EmailEventProcessor} dependencies.eventProcessor
* @param {object} dependencies.providers
* @param {import('@tryghost/domain-events')} dependencies.domainEvents
*/
constructor({config, settings, queries, eventProcessor, providers}) {
constructor({config, settings, queries, eventProcessor, providers, domainEvents}) {
this.config = config;
this.settings = settings;
this.queries = queries;
this.eventProcessor = eventProcessor;
this.providers = providers;
this.domainEvents = domainEvents;
}

getStatus() {
Expand Down Expand Up @@ -511,7 +514,12 @@ module.exports = class EmailAnalyticsService {
startTime = Date.now();
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`);
for (const memberId of memberIds) {
await this.aggregateMemberStats(memberId);
if (this.config?.get('services:jobs:queue:enabled')) {
// With the queue enabled we will dispatch an event to update the member email analytics on the background queue (multithreaded :))
await this.domainEvents.dispatch(MemberEmailAnalyticsUpdateEvent.create({memberId}));
} else {
await this.aggregateMemberStats(memberId);
}
}
endTime = Date.now() - startTime;
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members took ${endTime}ms`);
Expand Down
Loading

0 comments on commit 88db66a

Please sign in to comment.