Skip to content

Commit

Permalink
Use poolId rather than workerId when locking/unlocking jobs (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie authored Jun 11, 2024
2 parents 778d9e4 + 40a960e commit 572ebea
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 51 deletions.
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ Read more:

## Pending

- BREAKING: Jobs and queues are now `locked_by` their `WorkerPool`'s id rather
than the `workerId`. Be sure to upgrade
[Worker Pro](https://worker.graphile.org/docs/pro) at the same time if you're
using it!
- Fixes bug where CLI defaults override `graphile.config.js` settings (by
removing CLI defaults)

Expand Down
2 changes: 1 addition & 1 deletion __tests__/main.runTaskListOnce.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ test("runs jobs asynchronously", () =>
expect(q.job_count).toEqual(1);
expect(+q.locked_at).toBeGreaterThanOrEqual(+start);
expect(+q.locked_at).toBeLessThanOrEqual(+new Date());
expect(q.locked_by).toEqual(worker.workerId);
expect(q.locked_by).toEqual(worker.workerPool.id);
}

jobPromise!.resolve();
Expand Down
2 changes: 1 addition & 1 deletion __tests__/resetLockedAt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ test("main will execute jobs as they come up, and exits cleanly", () =>
`\
update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs
set
locked_by = 'some_worker_id',
locked_by = 'some_pool_id',
locked_at = now() - (
case payload->>'id'
when 'locked_recently' then interval '5 minutes'
Expand Down
20 changes: 10 additions & 10 deletions __tests__/workerUtils.cleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,26 @@ test("cleanup with GC_JOB_QUEUES", () =>
});

const jobs: Job[] = [];
const WORKER_ID_1 = "worker1";
const WORKER_ID_2 = "worker2";
const WORKER_ID_3 = "worker3";
const POOL_ID_1 = "pool-1";
const POOL_ID_2 = "pool-2";
const POOL_ID_3 = "pool-3";
let a = 0;
const date = new Date();
const specs = [
[WORKER_ID_1, "test", "test_job1"],
[WORKER_ID_2, "test2", "test_job2"],
[WORKER_ID_3, "test3", "test_job3"],
[POOL_ID_1, "test", "test_job1"],
[POOL_ID_2, "test2", "test_job2"],
[POOL_ID_3, "test3", "test_job3"],
[null, null, "test_job4"],
] as const;
for (const [workerId, queueName, taskIdentifier] of specs) {
for (const [poolId, queueName, taskIdentifier] of specs) {
date.setMinutes(date.getMinutes() - 1);
const job = await utils.addJob(
taskIdentifier,
{ a: ++a },
{ queueName: queueName ?? undefined },
);
jobs.push(job);
if (workerId) {
if (poolId) {
await pgClient.query(
`\
with j as (
Expand All @@ -107,7 +107,7 @@ with j as (
where job_queues.id = j.job_queue_id
)
select * from j`,
[date.toISOString(), workerId, job.id],
[date.toISOString(), poolId, job.id],
);
}
}
Expand All @@ -121,7 +121,7 @@ select * from j`,
"test3",
]);

await utils.forceUnlockWorkers(["worker3"]);
await utils.forceUnlockWorkers([POOL_ID_3]);
const thirdJob = jobs[2]; // Belongs to queueName 'task3'
await utils.completeJobs([thirdJob.id]);
await utils.cleanup({ tasks: ["GC_JOB_QUEUES"] });
Expand Down
28 changes: 14 additions & 14 deletions __tests__/workerUtils.forceUnlockWorkers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@ test("unlocks jobs for the given workers, leaves others unaffected", () =>
});

const jobs: Job[] = [];
const WORKER_ID_1 = "worker1";
const WORKER_ID_2 = "worker2";
const WORKER_ID_3 = "worker3";
const POOL_ID_1 = "pool-1";
const POOL_ID_2 = "pool-2";
const POOL_ID_3 = "pool-3";
let a = 0;
const date = new Date();
const specs = [
[WORKER_ID_1, null],
[WORKER_ID_1, "test"],
[WORKER_ID_2, null],
[WORKER_ID_2, "test2"],
[WORKER_ID_2, "test3"],
[WORKER_ID_3, null],
[POOL_ID_1, null],
[POOL_ID_1, "test"],
[POOL_ID_2, null],
[POOL_ID_2, "test2"],
[POOL_ID_2, "test3"],
[POOL_ID_3, null],
[null, null],
[null, "test"],
[null, "test2"],
[null, "test3"],
] as const;
for (const [workerId, queueName] of specs) {
for (const [poolId, queueName] of specs) {
date.setMinutes(date.getMinutes() - 1);
const job = await utils.addJob(
"job3",
Expand All @@ -58,7 +58,7 @@ test("unlocks jobs for the given workers, leaves others unaffected", () =>
update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs
set locked_at = $1, locked_by = $2
where id = $3`,
[workerId ? date.toISOString() : null, workerId, job.id],
[poolId ? date.toISOString() : null, poolId, job.id],
);
jobs.push(job);
}
Expand All @@ -69,7 +69,7 @@ set locked_at = jobs.locked_at, locked_by = jobs.locked_by
from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs
where jobs.job_queue_id = job_queues.id;`,
);
await utils.forceUnlockWorkers([WORKER_ID_2, WORKER_ID_3]);
await utils.forceUnlockWorkers([POOL_ID_2, POOL_ID_3]);

const remaining = await getJobs(pgClient);
remaining.sort((a, z) => Number(a.id) - Number(z.id));
Expand All @@ -79,7 +79,7 @@ where jobs.job_queue_id = job_queues.id;`,
const job = jobs[i];
const updatedJob = remaining[i];
expect(updatedJob.id).toEqual(job.id);
if (spec[0] === WORKER_ID_2 || spec[0] === WORKER_ID_3) {
if (spec[0] === POOL_ID_2 || spec[0] === POOL_ID_3) {
expect(updatedJob.locked_by).toBeNull();
expect(updatedJob.locked_at).toBeNull();
} else if (spec[0]) {
Expand All @@ -97,7 +97,7 @@ where jobs.job_queue_id = job_queues.id;`,
expect(lockedQueues).toEqual([
expect.objectContaining({
queue_name: "test",
locked_by: WORKER_ID_1,
locked_by: POOL_ID_1,
}),
]);
}));
4 changes: 4 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1157,3 +1157,7 @@ export interface WorkerPluginContext {
hooks: AsyncHooks<GraphileConfig.WorkerHooks>;
resolvedPreset: ResolvedWorkerPreset;
}
export type GetJobFunction = (
workerId: string,
flagsToSkip: string[] | null,
) => Promise<Job | undefined>;
16 changes: 14 additions & 2 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
} from "./helpers";
import {
EnhancedWithPgClient,
GetJobFunction,
Job,
RunOnceOptions,
TaskList,
Expand All @@ -27,6 +28,7 @@ import {
import { Logger } from "./logger";
import SIGNALS, { Signal } from "./signals";
import { failJobs } from "./sql/failJob";
import { getJob as baseGetJob } from "./sql/getJob";
import { resetLockedAt } from "./sql/resetLockedAt";
import { makeNewWorker } from "./worker";

Expand Down Expand Up @@ -685,7 +687,7 @@ export function _runTaskList(
const cancelledJobs = await failJobs(
compiledSharedOptions,
withPgClient,
workerIds,
workerPool.id,
jobsToRelease,
message,
);
Expand Down Expand Up @@ -759,7 +761,7 @@ export function _runTaskList(
const cancelledJobs = await failJobs(
compiledSharedOptions,
withPgClient,
workerIds,
workerPool.id,
jobsInProgress,
message,
);
Expand Down Expand Up @@ -831,6 +833,15 @@ export function _runTaskList(
`You must not set workerId when concurrency > 1; each worker must have a unique identifier`,
);
}
const getJob: GetJobFunction = async (workerId, flagsToSkip) => {
return baseGetJob(
compiledSharedOptions,
withPgClient,
tasks,
workerId,
flagsToSkip,
);
};
for (let i = 0; i < concurrency; i++) {
const worker = makeNewWorker(compiledSharedOptions, {
tasks,
Expand All @@ -840,6 +851,7 @@ export function _runTaskList(
workerPool,
autostart,
workerId,
getJob,
});
workerPool._workers.push(worker);
const remove = () => {
Expand Down
4 changes: 2 additions & 2 deletions src/sql/completeJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { CompiledSharedOptions } from "../lib";
export async function completeJob(
compiledSharedOptions: CompiledSharedOptions,
withPgClient: EnhancedWithPgClient,
workerId: string,
poolId: string,
job: DbJob,
): Promise<void> {
const {
Expand All @@ -29,7 +29,7 @@ update ${escapedWorkerSchema}._private_job_queues as job_queues
set locked_by = null, locked_at = null
from j
where job_queues.id = j.job_queue_id and job_queues.locked_by = $2::text;`,
values: [job.id, workerId],
values: [job.id, poolId],
name: !preparedStatements
? undefined
: `complete_job_q/${workerSchema}`,
Expand Down
14 changes: 7 additions & 7 deletions src/sql/failJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { CompiledSharedOptions } from "../lib";
export async function failJob(
compiledSharedOptions: CompiledSharedOptions,
withPgClient: EnhancedWithPgClient,
workerId: string,
poolId: string,
job: DbJob,
message: string,
replacementPayload: undefined | unknown[],
Expand Down Expand Up @@ -40,7 +40,7 @@ where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text;`,
values: [
job.id,
message,
workerId,
poolId,
replacementPayload != null
? JSON.stringify(replacementPayload)
: null,
Expand All @@ -63,7 +63,7 @@ where id = $1::bigint and locked_by = $3::text;`,
values: [
job.id,
message,
workerId,
poolId,
replacementPayload != null
? JSON.stringify(replacementPayload)
: null,
Expand All @@ -77,7 +77,7 @@ where id = $1::bigint and locked_by = $3::text;`,
export async function failJobs(
compiledSharedOptions: CompiledSharedOptions,
withPgClient: EnhancedWithPgClient,
workerIds: string[],
poolId: string,
jobs: DbJob[],
message: string,
): Promise<DbJob[]> {
Expand All @@ -100,16 +100,16 @@ last_error = $2::text,
run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'),
locked_by = null,
locked_at = null
where id = any($1::int[]) and locked_by = any($3::text[])
where id = any($1::int[]) and locked_by = $3::text
returning *
), queues as (
update ${escapedWorkerSchema}._private_job_queues as job_queues
set locked_by = null, locked_at = null
from j
where job_queues.id = j.job_queue_id and job_queues.locked_by = any($3::text[])
where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text
)
select * from j;`,
values: [jobs.map((job) => job.id), message, workerIds],
values: [jobs.map((job) => job.id), message, poolId],
name: !preparedStatements ? undefined : `fail_jobs/${workerSchema}`,
}),
);
Expand Down
4 changes: 2 additions & 2 deletions src/sql/getJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export async function getJob(
compiledSharedOptions: CompiledSharedOptions,
withPgClient: EnhancedWithPgClient,
tasks: TaskList,
workerId: string,
poolId: string,
flagsToSkip: string[] | null,
): Promise<Job | undefined> {
const {
Expand Down Expand Up @@ -172,7 +172,7 @@ with j as (
// TODO: breaking change; change this to more optimal:
// `RETURNING id, job_queue_id, task_id, payload`,
const values = [
workerId,
poolId,
taskDetails.taskIds,
...(hasFlags ? [flagsToSkip!] : []),
...(useNodeTime ? [new Date().toISOString()] : []),
Expand Down
21 changes: 11 additions & 10 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import deferred from "./deferred";
import { makeJobHelpers } from "./helpers";
import {
EnhancedWithPgClient,
GetJobFunction,
Job,
PromiseOrDirect,
TaskList,
Expand All @@ -15,7 +16,6 @@ import {
import { CompiledSharedOptions } from "./lib";
import { completeJob } from "./sql/completeJob";
import { failJob } from "./sql/failJob";
import { getJob } from "./sql/getJob";

export function makeNewWorker(
compiledSharedOptions: CompiledSharedOptions<WorkerSharedOptions>,
Expand All @@ -27,6 +27,7 @@ export function makeNewWorker(
workerPool: WorkerPool;
autostart?: boolean;
workerId?: string;
getJob: GetJobFunction;
},
): Worker {
const {
Expand All @@ -37,6 +38,7 @@ export function makeNewWorker(
workerPool,
autostart = true,
workerId = `worker-${randomBytes(9).toString("hex")}`,
getJob,
} = params;
const {
events,
Expand Down Expand Up @@ -167,13 +169,7 @@ export function makeNewWorker(
}

events.emit("worker:getJob:start", { worker });
const jobRow = await getJob(
compiledSharedOptions,
withPgClient,
tasks,
workerId,
flagsToSkip,
);
const jobRow = await getJob(workerPool.id, flagsToSkip);

// `doNext` cannot be executed concurrently, so we know this is safe.
// eslint-disable-next-line require-atomic-updates
Expand Down Expand Up @@ -343,7 +339,7 @@ export function makeNewWorker(
await failJob(
compiledSharedOptions,
withPgClient,
workerId,
workerPool.id,
job,
message,
// "Batch jobs": copy through only the unsuccessful parts of the payload
Expand Down Expand Up @@ -372,7 +368,12 @@ export function makeNewWorker(
);
}

await completeJob(compiledSharedOptions, withPgClient, workerId, job);
await completeJob(
compiledSharedOptions,
withPgClient,
workerPool.id,
job,
);
}
events.emit("job:complete", { worker, job, error: err });
} catch (fatalError) {
Expand Down
2 changes: 1 addition & 1 deletion website/docs/jobs-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ performance issues!
- `updated_at` - when the job was last updated
- `key` - the `job_key` of the job, if any
- `locked_at` - when the job was locked, if locked
- `locked_by` - the worker id that the job was locked by, if locked
- `locked_by` - the WorkerPool id that the job was locked by, if locked
- `revision` - the revision number of the job, bumped each time the record is
updated
- `flags` - the [forbidden flags](/docs/forbidden-flags) associated with this
Expand Down
Loading

0 comments on commit 572ebea

Please sign in to comment.