Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added experimental background job queue #20985

Merged
merged 37 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fa3773a
Added metadata and queue_entry to jobs schema
9larsons Sep 11, 2024
0944f8e
Added and wired up the jobs queue into JobManager
9larsons Sep 12, 2024
cbfb1a2
Revert the schedule change for the email analytics job
9larsons Sep 12, 2024
d74d339
Updated metadata field to be a 2000 char string value
9larsons Sep 12, 2024
e3b68ec
Updated refs to jobManager.queue to be inlineQueue
9larsons Sep 12, 2024
f47448b
Refactored queue processor code to be more testable/readable
9larsons Sep 12, 2024
713dcc4
Removed db dependency
9larsons Sep 13, 2024
4b3b928
Added bulk of unit tests
9larsons Sep 16, 2024
85c28ea
Fixed linting
9larsons Sep 16, 2024
010369b
Fixed unit tests
9larsons Sep 16, 2024
62807b9
Fixed linting
9larsons Sep 16, 2024
e253203
Fixed db unit test for schema change
9larsons Sep 16, 2024
125f3d7
Add workerpool
9larsons Sep 17, 2024
7aeb7e0
Added and updated jobs integration tests
9larsons Sep 17, 2024
5508cfd
Fixed linting
9larsons Sep 17, 2024
c602c67
Attempt longer timeout for CI
9larsons Sep 18, 2024
2cd905f
add more debug logging
9larsons Sep 18, 2024
e5529c0
Add even more logging
9larsons Sep 18, 2024
aa8fc8b
closure
9larsons Sep 18, 2024
6694210
.
9larsons Sep 18, 2024
2fd8caf
.
9larsons Sep 18, 2024
22b76b0
.
9larsons Sep 18, 2024
1d77f5a
,..
9larsons Sep 18, 2024
d341de1
Updated integration tests and removed clogs
9larsons Sep 25, 2024
e5ea94f
Removed blank line
9larsons Sep 25, 2024
0a1d943
Removed clogs
9larsons Sep 25, 2024
ff7a699
Update email analytics require
9larsons Sep 25, 2024
06cc10c
Updated integration tests
9larsons Sep 26, 2024
55cdf52
Merge branch 'main' into add-job-queue
9larsons Sep 30, 2024
1adeee1
Merge branch 'main' into add-job-queue
9larsons Oct 7, 2024
635a6b8
Updated testutils call
9larsons Oct 7, 2024
48aef4e
Merge branch 'main' into add-job-queue
9larsons Oct 16, 2024
8790ae5
Fix merge
9larsons Oct 16, 2024
cad48b8
Merge branch 'main' into add-job-queue
9larsons Oct 31, 2024
c623c50
Added migration
9larsons Oct 31, 2024
89d54bc
Updated path for migration to account for new release
9larsons Nov 4, 2024
6da5fbf
Merge branch 'main' into add-job-queue
9larsons Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ghost/core/core/server/data/schema/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,9 @@ module.exports = {
started_at: {type: 'dateTime', nullable: true},
finished_at: {type: 'dateTime', nullable: true},
created_at: {type: 'dateTime', nullable: false},
updated_at: {type: 'dateTime', nullable: true}
updated_at: {type: 'dateTime', nullable: true},
metadata: {type: 'text', maxlength: 1000000000, fieldtype: 'long', nullable: true},
9larsons marked this conversation as resolved.
Show resolved Hide resolved
queue_entry: {type: 'integer', nullable: true, unsigned: true}
9larsons marked this conversation as resolved.
Show resolved Hide resolved
},
redirects: {
id: {type: 'string', maxlength: 24, nullable: false, primary: true},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const logging = require('@tryghost/logging');
const JobManager = require('../../services/jobs');
const path = require('path');

class EmailAnalyticsServiceWrapper {
init() {
Expand All @@ -11,7 +13,7 @@ class EmailAnalyticsServiceWrapper {
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const {EmailRecipientFailure, EmailSpamComplaintEvent, Email} = require('../../models');
const StartEmailAnalyticsJobEvent = require('./events/StartEmailAnalyticsJobEvent');

const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const settings = require('../../../shared/settings-cache');
Expand Down Expand Up @@ -47,14 +49,28 @@ class EmailAnalyticsServiceWrapper {
providers: [
new MailgunProvider({config, settings})
],
queries
queries,
domainEvents
});

// We currently cannot trigger a non-offloaded job from the job manager
// So the email analytics jobs simply emits an event.
domainEvents.subscribe(StartEmailAnalyticsJobEvent, async () => {
await this.startFetch();
});

domainEvents.subscribe(MemberEmailAnalyticsUpdateEvent, async (event) => {
const memberId = event.data.memberId;
await JobManager.addQueuedJob({
name: `update-member-email-analytics-${memberId}`,
metadata: {
job: path.resolve(__dirname, 'jobs/update-member-email-analytics'),
data: {
memberId
}
}
});
});
}

async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const queries = require('../../lib/queries');

/**
* Updates email analytics for a specific member
*
* @param {Object} options - The options object
* @param {string} options.memberId - The ID of the member to update analytics for
* @returns {Promise<Object>} The result of the aggregation query (1/0)
*/
module.exports = async function updateMemberEmailAnalytics({memberId}) {
const result = await queries.aggregateMemberStats(memberId);
return result;
};
4 changes: 3 additions & 1 deletion ghost/core/core/server/services/jobs/job-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const logging = require('@tryghost/logging');
const models = require('../../models');
const sentry = require('../../../shared/sentry');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const db = require('../../data/db');

const errorHandler = (error, workerMeta) => {
logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`);
Expand Down Expand Up @@ -42,7 +44,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, db: db});

module.exports = jobManager;
module.exports.initTestMode = initTestMode;
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, isDuplicate: true});
9larsons marked this conversation as resolved.
Show resolved Hide resolved

module.exports = jobManager;
module.exports.initTestMode = initTestMode;
12 changes: 10 additions & 2 deletions ghost/email-analytics-service/lib/EmailAnalyticsService.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const EventProcessingResult = require('./EventProcessingResult');
const logging = require('@tryghost/logging');
const errors = require('@tryghost/errors');
const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');

/**
* @typedef {import('@tryghost/email-service').EmailEventProcessor} EmailEventProcessor
Expand Down Expand Up @@ -73,13 +74,15 @@ module.exports = class EmailAnalyticsService {
* @param {object} dependencies.queries
* @param {EmailEventProcessor} dependencies.eventProcessor
* @param {object} dependencies.providers
* @param {import('@tryghost/domain-events')} dependencies.domainEvents
*/
constructor({config, settings, queries, eventProcessor, providers}) {
constructor({config, settings, queries, eventProcessor, providers, domainEvents}) {
this.config = config;
this.settings = settings;
this.queries = queries;
this.eventProcessor = eventProcessor;
this.providers = providers;
this.domainEvents = domainEvents;
}

getStatus() {
Expand Down Expand Up @@ -511,7 +514,12 @@ module.exports = class EmailAnalyticsService {
startTime = Date.now();
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`);
for (const memberId of memberIds) {
await this.aggregateMemberStats(memberId);
if (this.config.get('services:jobs:queue:enabled')) {
// With the queue enabled we will dispatch an event to update the member email analytics on the background queue (multithreaded :))
await this.domainEvents.dispatch(MemberEmailAnalyticsUpdateEvent.create({memberId}));
} else {
await this.aggregateMemberStats(memberId);
}
}
endTime = Date.now() - startTime;
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members took ${endTime}ms`);
Expand Down
84 changes: 65 additions & 19 deletions ghost/job-manager/lib/JobManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const logging = require('@tryghost/logging');
const isCronExpression = require('./is-cron-expression');
const assembleBreeJob = require('./assemble-bree-job');
const JobsRepository = require('./JobsRepository');
const JobQueueManager = require('./JobQueueManager');

const worker = async (task, callback) => {
try {
Expand All @@ -27,22 +28,36 @@ const ALL_STATUSES = {
queued: 'queued'
};

/**
* @typedef {Object} ScheduledJob
* @property {Function | string} job - Function or path to a module defining a job
* @property {string} [name] - Unique job name, if not provided takes function name or job script filename
* @property {string | Date} [at] - Date, cron or human readable schedule format
* @property {Object} [data] - Data to be passed into the job
* @property {boolean} [offloaded=true] - If true, creates an "offloaded" job running in a worker thread. If false, runs an "inline" job on the same event loop
*/
class JobManager {
#domainEvents;
#completionPromises = new Map();
#jobQueueManager = null;
#config;

/**
* @param {Object} options
* @param {Function} [options.errorHandler] - custom job error handler
* @param {Function} [options.workerMessageHandler] - custom message handler coming from workers
* @param {Object} [options.JobModel] - a model which can persist job data in the storage
* @param {Object} [options.domainEvents] - domain events emitter
* @param {Object} [options.config] - config
* @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue
* @param {Object} [options.db] - the database object
*/
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents}) {
this.queue = fastq(this, worker, 3);
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, db}) {
this.inlineQueue = fastq(this, worker, 3);
9larsons marked this conversation as resolved.
Show resolved Hide resolved
this._jobMessageHandler = this._jobMessageHandler.bind(this);
this._jobErrorHandler = this._jobErrorHandler.bind(this);
this.#domainEvents = domainEvents;
this.#config = config;

const combinedMessageHandler = workerMessageHandler
? ({name, message}) => {
Expand Down Expand Up @@ -72,7 +87,15 @@ class JobManager {
});

if (JobModel) {
this._jobsRepository = new JobsRepository({JobModel});
this._jobsRepository = new JobsRepository({JobModel, db});
}

// We have a duplicate job manager in Ghost core for the mentions job service that should be
// refactored to use the job queue when we're able.
if (!isDuplicate && this.#config.get('services:jobs:queue:enabled')) {
logging.info(`[JobManager] Initializing job queue based on config`);
this.#jobQueueManager = new JobQueueManager({JobModel, config, db});
this.#jobQueueManager.init();
}
}

Expand All @@ -94,6 +117,31 @@ class JobManager {
};
}

/**
* @typedef {Object} QueuedJob
* @property {string} name - The name or identifier of the job.
* @property {Object} metadata - Metadata associated with the job.
* @property {string} metadata.job - The absolute path to the job to execute.
* @property {Object} metadata.data - The data associated with the job.
*/

/**
* @method addQueuedJob
* @async
* @description Adds a new job to the job repository, which will be polled and executed by the job queue manager.
* @param {QueuedJob} job - The job to be added to the queue.
* @returns {Promise<Object>} The added job model.
*/
async addQueuedJob({name, metadata}) {
// Adding some extra security so we don't add jobs when the queue is disabled from callers.
if (this.#config.get('services:jobs:queue:enabled')) {
const model = await this.#jobQueueManager.addJob({name, metadata});
return model;
}
logging.warn('[JobManager] Job queue is disabled but job was attempted to be added. job: ', name);
return Promise.reject();
}

async _jobMessageHandler({name, message}) {
if (name) {
if (message === ALL_STATUSES.started) {
Expand Down Expand Up @@ -128,7 +176,7 @@ class JobManager {
this.#completionPromises.delete(name);
}

if (this.queue.length() <= 1) {
if (this.inlineQueue.length() <= 1) {
if (this.#completionPromises.has('all')) {
for (const listeners of this.#completionPromises.get('all')) {
listeners.resolve();
Expand Down Expand Up @@ -168,7 +216,7 @@ class JobManager {
this.#completionPromises.delete(jobMeta.name);
}

if (this.queue.length() <= 1) {
if (this.inlineQueue.length() <= 1) {
if (this.#completionPromises.has('all')) {
for (const listeners of this.#completionPromises.get('all')) {
listeners.reject(error);
Expand All @@ -181,7 +229,7 @@ class JobManager {

/**
* By default schedules an "offloaded" job. If `offloaded: true` parameter is set,
* puts an "inline" immediate job into the queue.
* puts an "inline" immediate job into the inlineQueue.
*
* @param {Object} GhostJob - job options
* @prop {Function | String} GhostJob.job - function or path to a module defining a job
Expand All @@ -192,7 +240,7 @@ class JobManager {
*/
addJob({name, at, job, data, offloaded = true}) {
if (offloaded) {
logging.info('Adding offloaded job to the queue');
logging.info('Adding offloaded job to the inline job queue');
let schedule;

if (!name) {
Expand Down Expand Up @@ -229,9 +277,9 @@ class JobManager {
this.bree.add(breeJob);
return this.bree.start(name);
} else {
logging.info(`Adding one-off job to queue with current length = ${this.queue.length()} called '${name || 'anonymous'}'`);
logging.info(`Adding one-off job to inlineQueue with current length = ${this.inlineQueue.length()} called '${name || 'anonymous'}'`);

this.queue.push(async () => {
this.inlineQueue.push(async () => {
try {
// NOTE: setting the status here otherwise it is impossible to
// distinguish between states when the job fails immediately
Expand Down Expand Up @@ -325,13 +373,11 @@ class JobManager {
/**
* Awaits completion of the offloaded one-off job.
* CAUTION: it might take a long time to resolve!
* @param {String} name one-off job name
* @param {string} name one-off job name
* @returns resolves with a Job model at current state
*/
async awaitOneOffCompletion(name) {
const persistedJob = await this._jobsRepository.read({
name
});
const persistedJob = await this._jobsRepository.read(name);

if (!persistedJob || ![ALL_STATUSES.finished, ALL_STATUSES.failed].includes(persistedJob.get('status'))) {
// NOTE: can implement exponential backoff here if that's ever needed
Expand Down Expand Up @@ -366,7 +412,7 @@ class JobManager {
const name = 'all';

return new Promise((resolve, reject) => {
if (this.queue.idle()) {
if (this.inlineQueue.idle()) {
resolve();
return;
}
Expand All @@ -379,7 +425,7 @@ class JobManager {
}

/**
* Removes an "offloaded" job from scheduled jobs queue.
* Removes an "offloaded" job from scheduled jobs inlineQueue.
* It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68).
* The method will throw an Error if job with provided name does not exist.
*
Expand All @@ -398,15 +444,15 @@ class JobManager {
async shutdown(options) {
await this.bree.stop();

if (this.queue.idle()) {
if (this.inlineQueue.idle()) {
return;
}

logging.warn('Waiting for busy job queue');
logging.warn('Waiting for busy job in inline job queue');

await pWaitFor(() => this.queue.idle() === true, options);
await pWaitFor(() => this.inlineQueue.idle() === true, options);

logging.warn('Job queue finished');
logging.warn('Inline job queue finished');
}
}

Expand Down
Loading
Loading