Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added experimental background job queue #20985

Merged
merged 37 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fa3773a
Added metadata and queue_entry to jobs schema
9larsons Sep 11, 2024
0944f8e
Added and wired up the jobs queue into JobManager
9larsons Sep 12, 2024
cbfb1a2
Revert the schedule change for the email analytics job
9larsons Sep 12, 2024
d74d339
Updated metadata field to be a 2000 char string value
9larsons Sep 12, 2024
e3b68ec
Updated refs to jobManager.queue to be inlineQueue
9larsons Sep 12, 2024
f47448b
Refactored queue processor code to be more testable/readable
9larsons Sep 12, 2024
713dcc4
Removed db dependency
9larsons Sep 13, 2024
4b3b928
Added bulk of unit tests
9larsons Sep 16, 2024
85c28ea
Fixed linting
9larsons Sep 16, 2024
010369b
Fixed unit tests
9larsons Sep 16, 2024
62807b9
Fixed linting
9larsons Sep 16, 2024
e253203
Fixed db unit test for schema change
9larsons Sep 16, 2024
125f3d7
Add workerpool
9larsons Sep 17, 2024
7aeb7e0
Added and updated jobs integration tests
9larsons Sep 17, 2024
5508cfd
Fixed linting
9larsons Sep 17, 2024
c602c67
Attempt longer timeout for CI
9larsons Sep 18, 2024
2cd905f
add more debug logging
9larsons Sep 18, 2024
e5529c0
Add even more logging
9larsons Sep 18, 2024
aa8fc8b
closure
9larsons Sep 18, 2024
6694210
.
9larsons Sep 18, 2024
2fd8caf
.
9larsons Sep 18, 2024
22b76b0
.
9larsons Sep 18, 2024
1d77f5a
,..
9larsons Sep 18, 2024
d341de1
Updated integration tests and removed clogs
9larsons Sep 25, 2024
e5ea94f
Removed blank line
9larsons Sep 25, 2024
0a1d943
Removed clogs
9larsons Sep 25, 2024
ff7a699
Update email analytics require
9larsons Sep 25, 2024
06cc10c
Updated integration tests
9larsons Sep 26, 2024
55cdf52
Merge branch 'main' into add-job-queue
9larsons Sep 30, 2024
1adeee1
Merge branch 'main' into add-job-queue
9larsons Oct 7, 2024
635a6b8
Updated testutils call
9larsons Oct 7, 2024
48aef4e
Merge branch 'main' into add-job-queue
9larsons Oct 16, 2024
8790ae5
Fix merge
9larsons Oct 16, 2024
cad48b8
Merge branch 'main' into add-job-queue
9larsons Oct 31, 2024
c623c50
Added migration
9larsons Oct 31, 2024
89d54bc
Updated path for migration to account for new release
9larsons Nov 4, 2024
6da5fbf
Merge branch 'main' into add-job-queue
9larsons Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const {combineNonTransactionalMigrations, createAddColumnMigration} = require('../../utils');

module.exports = combineNonTransactionalMigrations(
createAddColumnMigration('jobs', 'metadata', {
type: 'string',
maxlength: 2000,
nullable: true
}),
createAddColumnMigration('jobs', 'queue_entry', {
type: 'integer',
nullable: true,
unsigned: true
})
);
4 changes: 3 additions & 1 deletion ghost/core/core/server/data/schema/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,9 @@ module.exports = {
started_at: {type: 'dateTime', nullable: true},
finished_at: {type: 'dateTime', nullable: true},
created_at: {type: 'dateTime', nullable: false},
updated_at: {type: 'dateTime', nullable: true}
updated_at: {type: 'dateTime', nullable: true},
metadata: {type: 'string', maxlength: 2000, nullable: true},
queue_entry: {type: 'integer', nullable: true, unsigned: true}
9larsons marked this conversation as resolved.
Show resolved Hide resolved
},
redirects: {
id: {type: 'string', maxlength: 24, nullable: false, primary: true},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const logging = require('@tryghost/logging');
const JobManager = require('../../services/jobs');
const path = require('path');

class EmailAnalyticsServiceWrapper {
init() {
Expand All @@ -11,7 +13,7 @@ class EmailAnalyticsServiceWrapper {
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const {EmailRecipientFailure, EmailSpamComplaintEvent, Email} = require('../../models');
const StartEmailAnalyticsJobEvent = require('./events/StartEmailAnalyticsJobEvent');

const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const settings = require('../../../shared/settings-cache');
Expand Down Expand Up @@ -47,14 +49,28 @@ class EmailAnalyticsServiceWrapper {
providers: [
new MailgunProvider({config, settings})
],
queries
queries,
domainEvents
});

// We currently cannot trigger a non-offloaded job from the job manager
// So the email analytics jobs simply emits an event.
domainEvents.subscribe(StartEmailAnalyticsJobEvent, async () => {
await this.startFetch();
});

domainEvents.subscribe(MemberEmailAnalyticsUpdateEvent, async (event) => {
const memberId = event.data.memberId;
await JobManager.addQueuedJob({
name: `update-member-email-analytics-${memberId}`,
metadata: {
job: path.resolve(__dirname, 'jobs/update-member-email-analytics'),
data: {
memberId
}
}
});
});
}

async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const queries = require('../../lib/queries');

/**
* Updates email analytics for a specific member
*
* @param {Object} options - The options object
* @param {string} options.memberId - The ID of the member to update analytics for
* @returns {Promise<Object>} The result of the aggregation query (1/0)
*/
module.exports = async function updateMemberEmailAnalytics({memberId}) {
const result = await queries.aggregateMemberStats(memberId);
return result;
};
5 changes: 3 additions & 2 deletions ghost/core/core/server/services/jobs/job-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const logging = require('@tryghost/logging');
const models = require('../../models');
const sentry = require('../../../shared/sentry');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');

const errorHandler = (error, workerMeta) => {
logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`);
Expand All @@ -24,7 +25,7 @@ const workerMessageHandler = ({name, message}) => {
const initTestMode = () => {
// Output job queue length every 5 seconds
setInterval(() => {
logging.warn(`${jobManager.queue.length()} jobs in the queue. Idle: ${jobManager.queue.idle()}`);
logging.warn(`${jobManager.inlineQueue.length()} jobs in the queue. Idle: ${jobManager.inlineQueue.idle()}`);

const runningScheduledjobs = Object.keys(jobManager.bree.workers);
if (Object.keys(jobManager.bree.workers).length) {
Expand All @@ -42,7 +43,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config});

module.exports = jobManager;
module.exports.initTestMode = initTestMode;
4 changes: 2 additions & 2 deletions ghost/core/core/server/services/mentions-jobs/job-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const workerMessageHandler = ({name, message}) => {
const initTestMode = () => {
// Output job queue length every 5 seconds
setInterval(() => {
logging.warn(`${jobManager.queue.length()} jobs in the queue. Idle: ${jobManager.queue.idle()}`);
logging.warn(`${jobManager.inlineQueue.length()} jobs in the queue. Idle: ${jobManager.inlineQueue.idle()}`);

const runningScheduledjobs = Object.keys(jobManager.bree.workers);
if (Object.keys(jobManager.bree.workers).length) {
Expand All @@ -42,7 +42,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, isDuplicate: true});
9larsons marked this conversation as resolved.
Show resolved Hide resolved

module.exports = jobManager;
module.exports.initTestMode = initTestMode;
87 changes: 87 additions & 0 deletions ghost/core/test/integration/jobs/job-queue.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
const assert = require('assert/strict');
const path = require('path');
const configUtils = require('../../utils/configUtils');
const models = require('../../../core/server/models');
const testUtils = require('../../utils/');

// Helper function to wait for job completion
async function waitForJobCompletion(jobName, maxWaitTimeMs = 5000, checkIntervalMs = 50) {
return new Promise((resolve, reject) => {
const startTime = Date.now();
const intervalId = setInterval(async () => {
if (Date.now() - startTime >= maxWaitTimeMs) {
clearInterval(intervalId);
reject(new Error(`Job ${jobName} did not complete within ${maxWaitTimeMs}ms`));
}
const job = await models.Job.findOne({name: jobName});
if (!job) {
clearInterval(intervalId);
resolve();
}
}, checkIntervalMs);
});
}

describe('Job Queue', function () {
let jobService;
before(testUtils.setup('default')); // this generates the tables in the db
afterEach(async function () {
await configUtils.restore();
});

describe('enabled by config', function () {
beforeEach(async function () {
configUtils.set('services:jobs:queue:enabled', true);
jobService = require('../../../core/server/services/jobs/job-service');
});

it('should add and execute a job in the queue', async function () {
this.timeout(10000);
const job = {
name: `add-random-numbers-${Date.now()}`,
metadata: {
job: path.resolve(__dirname, './test-job.js'),
data: {}
}
};

// Add the job to the queue
const result = await jobService.addQueuedJob(job);
assert.ok(result);

// Wait for the job to complete
await waitForJobCompletion(job.name, 8000); // Increase wait time

// Check job status
const jobEntry = await models.Job.findOne({name: job.name});

// Verify that the job no longer exists in the queue
assert.equal(jobEntry, null);
});
});

describe('not enabled', function () {
beforeEach(async function () {
configUtils.set('services:jobs:queue:enabled', false);
jobService = require('../../../core/server/services/jobs/job-service');
});

it('should not add a job to the queue when disabled', async function () {
const job = {
name: `add-random-numbers-${Date.now()}`,
metadata: {
job: path.resolve(__dirname, './test-job.js'),
data: {}
}
};

// Attempt to add the job to the queue
const result = await jobService.addQueuedJob(job);
assert.equal(result, undefined);

// Verify that the job doesn't exist in the queue
const jobEntry = await models.Job.findOne({name: job.name});
assert.equal(jobEntry, null);
});
});
});
7 changes: 7 additions & 0 deletions ghost/core/test/integration/jobs/test-job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = function testJob() {
const num1 = Math.floor(Math.random() * 100);
const num2 = Math.floor(Math.random() * 100);
const result = num1 + num2;

return result;
};
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
const assert = require('assert/strict');
const http = require('http');
const path = require('path');

const models = require('../../../core/server/models');

models.init();

const testUtils = require('../../utils');
const jobService = require('../../../core/server/services/jobs/job-service');

const JOB_NAME = 'update-check';
Expand All @@ -14,6 +10,8 @@ const JOB_PATH = path.resolve(__dirname, '../../../core/server/run-update-check.
describe('Run Update Check', function () {
let mockUpdateServer;

before(testUtils.setup('default'));

afterEach(function () {
if (mockUpdateServer) {
mockUpdateServer.close();
Expand Down
2 changes: 1 addition & 1 deletion ghost/core/test/unit/server/data/schema/integrity.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const validateRouteSettings = require('../../../../../core/server/services/route
*/
describe('DB version integrity', function () {
// Only these variables should need updating
const currentSchemaHash = 'a4f016480ff73c6f52ee4c86482b45a7';
const currentSchemaHash = '1110f25f639c22135b9845c72f0be7ef';
const currentFixturesHash = '475f488105c390bb0018db90dce845f1';
const currentSettingsHash = '47a75e8898fab270174a0c905cb3e914';
const currentRoutesHash = '3d180d52c663d173a6be791ef411ed01';
Expand Down
4 changes: 3 additions & 1 deletion ghost/core/test/utils/fixture-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const knexMigrator = new KnexMigrator();
// Ghost Internals
const models = require('../../core/server/models');
const {fixtureManager} = require('../../core/server/data/schema/fixtures');
const emailAnalyticsService = require('../../core/server/services/email-analytics');
const permissions = require('../../core/server/services/permissions');
const settingsService = require('../../core/server/services/settings/settings-service');
const labsService = require('../../core/shared/labs');
Expand Down Expand Up @@ -630,6 +629,9 @@ const fixtures = {
},

insertEmailsAndRecipients: function insertEmailsAndRecipients(withFailed = false) {
// NOTE: This require results in the jobs service being loaded prematurely, which breaks any tests relevant to it.
// This MUST be done in here and not at the top of the file to prevent that from happening as test setup is being performed.
const emailAnalyticsService = require('../../core/server/services/email-analytics');
return sequence(_.cloneDeep(DataGenerator.forKnex.emails).map(email => () => {
return models.Email.add(email, context.internal);
})).then(function () {
Expand Down
1 change: 1 addition & 0 deletions ghost/core/test/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const _ = require('lodash');

// Ghost Internals
const models = require('../../core/server/models');
models.init();

// Other Test Utilities
const e2eUtils = require('./e2e-utils');
Expand Down
12 changes: 10 additions & 2 deletions ghost/email-analytics-service/lib/EmailAnalyticsService.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const EventProcessingResult = require('./EventProcessingResult');
const logging = require('@tryghost/logging');
const errors = require('@tryghost/errors');
const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');

/**
* @typedef {import('@tryghost/email-service').EmailEventProcessor} EmailEventProcessor
Expand Down Expand Up @@ -73,13 +74,15 @@ module.exports = class EmailAnalyticsService {
* @param {object} dependencies.queries
* @param {EmailEventProcessor} dependencies.eventProcessor
* @param {object} dependencies.providers
* @param {import('@tryghost/domain-events')} dependencies.domainEvents
*/
constructor({config, settings, queries, eventProcessor, providers}) {
constructor({config, settings, queries, eventProcessor, providers, domainEvents}) {
this.config = config;
this.settings = settings;
this.queries = queries;
this.eventProcessor = eventProcessor;
this.providers = providers;
this.domainEvents = domainEvents;
}

getStatus() {
Expand Down Expand Up @@ -511,7 +514,12 @@ module.exports = class EmailAnalyticsService {
startTime = Date.now();
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`);
for (const memberId of memberIds) {
await this.aggregateMemberStats(memberId);
if (this.config?.get('services:jobs:queue:enabled')) {
// With the queue enabled we will dispatch an event to update the member email analytics on the background queue (multithreaded :))
await this.domainEvents.dispatch(MemberEmailAnalyticsUpdateEvent.create({memberId}));
} else {
await this.aggregateMemberStats(memberId);
}
}
endTime = Date.now() - startTime;
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members took ${endTime}ms`);
Expand Down
Loading
Loading