From 4c9f5a5c29cf63ccb4df866148aeb433354deceb Mon Sep 17 00:00:00 2001 From: Alko89 Date: Mon, 14 Aug 2023 17:01:59 +0200 Subject: [PATCH 1/3] feat: add metrics api endpoint --- example.ts | 11 ++++++--- packages/api/package.json | 2 +- packages/api/src/handlers/metrics.ts | 30 ++++++++++++++++++++++++ packages/api/src/queueAdapters/base.ts | 4 ++++ packages/api/src/queueAdapters/bull.ts | 6 ++++- packages/api/src/queueAdapters/bullMQ.ts | 6 ++++- packages/api/src/routes.ts | 6 +++++ packages/api/typings/app.ts | 10 ++++++++ yarn.lock | 13 ++++++++++ 9 files changed, 82 insertions(+), 6 deletions(-) create mode 100644 packages/api/src/handlers/metrics.ts diff --git a/example.ts b/example.ts index eb4ff5f1..9446adbf 100644 --- a/example.ts +++ b/example.ts @@ -1,6 +1,6 @@ import * as Bull from 'bull'; import Queue3 from 'bull'; -import { Queue as QueueMQ, Worker } from 'bullmq'; +import { MetricsTime, Queue as QueueMQ, Worker } from 'bullmq'; import express from 'express'; import { BullMQAdapter } from '@bull-board/api/src/queueAdapters/bullMQ'; import { BullAdapter } from '@bull-board/api/src/queueAdapters/bull'; @@ -15,7 +15,7 @@ const redisOptions = { const sleep = (t: number) => new Promise((resolve) => setTimeout(resolve, t * 1000)); -const createQueue3 = (name: string) => new Queue3(name, { redis: redisOptions }); +const createQueue3 = (name: string) => new Queue3(name, { redis: redisOptions, metrics: { maxDataPoints: MetricsTime.ONE_WEEK } }); const createQueueMQ = (name: string) => new QueueMQ(name, { connection: redisOptions }); function setupBullProcessor(bullQueue: Bull.Queue) { @@ -45,7 +45,12 @@ async function setupBullMQProcessor(queueName: string) { return { jobId: `This is the return value of job (${job.id})` }; }, - { connection: redisOptions } + { + connection: redisOptions, + metrics: { + maxDataPoints: MetricsTime.ONE_WEEK, + }, + } ); } diff --git a/packages/api/package.json b/packages/api/package.json index 23c6adc1..c2338a92 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -33,7 +33,7 @@ "devDependencies": { "@types/redis-info": "^3.0.0", "@types/supertest": "^2.0.12", - "bull": "^4.10.4", + "bull": "^4.11.3", "bullmq": "^4.6.0", "ioredis": "^5.3.2", "supertest": "^6.3.3" diff --git a/packages/api/src/handlers/metrics.ts b/packages/api/src/handlers/metrics.ts new file mode 100644 index 00000000..fad8bf47 --- /dev/null +++ b/packages/api/src/handlers/metrics.ts @@ -0,0 +1,30 @@ +import { BaseAdapter } from '../queueAdapters/base'; +import { + BullBoardRequest, + ControllerHandlerReturnType, +} from '../../typings/app'; +import { queueProvider } from '../providers/queue'; + +async function getMetrics( + req: BullBoardRequest, + queue: BaseAdapter +): Promise { + const type = req.query?.type || 'completed'; + const metrics = await queue.getMetrics(type); + + if (metrics) { + return { + status: 200, + body: metrics, + }; + } else { + return { + status: 404, + body: { error: 'Metrics not available' }, + }; + } +} + +export const metricsHandler = queueProvider(getMetrics, { + skipReadOnlyModeCheck: true, +}); diff --git a/packages/api/src/queueAdapters/base.ts b/packages/api/src/queueAdapters/base.ts index 3382b34a..b9cc82d6 100644 --- a/packages/api/src/queueAdapters/base.ts +++ b/packages/api/src/queueAdapters/base.ts @@ -2,9 +2,11 @@ import { FormatterField, JobCleanStatus, JobCounts, + JobRetryStatus, JobStatus, QueueAdapterOptions, QueueJob, + QueueMetrics, } from '../../typings/app'; export abstract class BaseAdapter { @@ -56,6 +58,8 @@ export abstract class BaseAdapter { public abstract getJobLogs(id: string): Promise; + public abstract getMetrics(type: JobRetryStatus, start?: number, end?: number): Promise; + public abstract getName(): string; public abstract getRedisInfo(): Promise; diff --git a/packages/api/src/queueAdapters/bull.ts b/packages/api/src/queueAdapters/bull.ts index cdcd97b6..ed8b78d8 100644 --- a/packages/api/src/queueAdapters/bull.ts +++ b/packages/api/src/queueAdapters/bull.ts @@ -1,5 +1,5 @@ import { Job, Queue } from 'bull'; -import { JobCleanStatus, JobCounts, JobStatus, QueueAdapterOptions } from '../../typings/app'; +import { JobCleanStatus, JobCounts, JobRetryStatus, JobStatus, QueueAdapterOptions } from '../../typings/app'; import { STATUSES } from '../constants/statuses'; import { BaseAdapter } from './base'; @@ -49,6 +49,10 @@ export class BullAdapter extends BaseAdapter { public getJobLogs(id: string): Promise { return this.queue.getJobLogs(id).then(({ logs }) => logs); } + + public getMetrics(type: JobRetryStatus, start?: number, end?: number) { + return this.queue.getMetrics(type, start, end); + } public isPaused(): Promise { return this.queue.isPaused(); diff --git a/packages/api/src/queueAdapters/bullMQ.ts b/packages/api/src/queueAdapters/bullMQ.ts index 3eff7e2d..dacac442 100644 --- a/packages/api/src/queueAdapters/bullMQ.ts +++ b/packages/api/src/queueAdapters/bullMQ.ts @@ -1,5 +1,5 @@ import { Job, Queue } from 'bullmq'; -import { JobCleanStatus, JobCounts, JobStatus, QueueAdapterOptions } from '../../typings/app'; +import { JobCleanStatus, JobCounts, JobRetryStatus, JobStatus, QueueAdapterOptions } from '../../typings/app'; import { STATUSES } from '../constants/statuses'; import { BaseAdapter } from './base'; @@ -37,6 +37,10 @@ export class BullMQAdapter extends BaseAdapter { return this.queue.getJobLogs(id).then(({ logs }) => logs); } + public getMetrics(type: JobRetryStatus, start?: number, end?: number) { + return this.queue.getMetrics(type, start, end); + } + public isPaused(): Promise { return this.queue.isPaused(); } diff --git a/packages/api/src/routes.ts b/packages/api/src/routes.ts index 421b7745..f4a5c02f 100644 --- a/packages/api/src/routes.ts +++ b/packages/api/src/routes.ts @@ -13,6 +13,7 @@ import { resumeQueueHandler } from './handlers/resumeQueue'; import { retryAllHandler } from './handlers/retryAll'; import { retryJobHandler } from './handlers/retryJob'; import { promoteAllHandler } from './handlers/promoteAll'; +import { metricsHandler } from './handlers/metrics'; export const appRoutes: AppRouteDefs = { entryPoint: { @@ -78,5 +79,10 @@ export const appRoutes: AppRouteDefs = { route: '/api/queues/:queueName/:jobId/promote', handler: promoteJobHandler, }, + { + method: 'get', + route: '/api/metrics/:queueName', + handler: metricsHandler, + }, ], }; diff --git a/packages/api/typings/app.ts b/packages/api/typings/app.ts index 358e9744..e913dec9 100644 --- a/packages/api/typings/app.ts +++ b/packages/api/typings/app.ts @@ -37,6 +37,16 @@ export interface QueueJob { getState(): Promise; } +export interface QueueMetrics { + meta: { + count: number; + prevTS: number; + prevCount: number; + }; + data: number[]; + count: number; +} + export interface QueueJobJson { // add properties as needed from real Bull/BullMQ jobs id?: string | undefined | number | null; diff --git a/yarn.lock b/yarn.lock index 32582a73..939ff52a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5180,6 +5180,19 @@ bull@^4.10.4: semver "^7.5.2" uuid "^8.3.0" +bull@^4.11.3: + version "4.11.3" + resolved "https://registry.yarnpkg.com/bull/-/bull-4.11.3.tgz#466214c8d2aa6ac4fc3133c3616d31a37af4c099" + integrity sha512-DhS0XtiAuejkAY08iGOdDK35eex/yGNoezlWqGJTu9FqWFF/oBjUhpsusE9SXiI4culyDbOoFs+l3ar0VXhFqQ== + dependencies: + cron-parser "^4.2.1" + get-port "^5.1.1" + ioredis "^5.3.2" + lodash "^4.17.21" + msgpackr "^1.5.2" + semver "^7.5.2" + uuid "^8.3.0" + bullmq@^4.6.0: version "4.7.0" resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-4.7.0.tgz#dd2d26219142cf76ab578386a0db75c4757f2380" From 2111e3a5fc7173d9cacfd8db4e6aa4c450d76036 Mon Sep 17 00:00:00 2001 From: Alko89 Date: Tue, 15 Aug 2023 16:17:42 +0200 Subject: [PATCH 2/3] fix: don't use BullMQ type for Bull metric --- example.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example.ts b/example.ts index 9446adbf..5a9789a4 100644 --- a/example.ts +++ b/example.ts @@ -15,7 +15,7 @@ const redisOptions = { const sleep = (t: number) => new Promise((resolve) => setTimeout(resolve, t * 1000)); -const createQueue3 = (name: string) => new Queue3(name, { redis: redisOptions, metrics: { maxDataPoints: MetricsTime.ONE_WEEK } }); +const createQueue3 = (name: string) => new Queue3(name, { redis: redisOptions, metrics: { maxDataPoints: 10080 } }); const createQueueMQ = (name: string) => new QueueMQ(name, { connection: redisOptions }); function setupBullProcessor(bullQueue: Bull.Queue) { From 66f0b568a9d0d7ee6c6ed1b1327d56aa2052443c Mon Sep 17 00:00:00 2001 From: Alko89 Date: Tue, 15 Aug 2023 16:50:28 +0200 Subject: [PATCH 3/3] fix: add metrics type --- packages/api/src/queueAdapters/base.ts | 4 ++-- packages/api/src/queueAdapters/bull.ts | 4 ++-- packages/api/src/queueAdapters/bullMQ.ts | 4 ++-- packages/api/typings/app.ts | 2 ++ 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/api/src/queueAdapters/base.ts b/packages/api/src/queueAdapters/base.ts index b9cc82d6..21b0e652 100644 --- a/packages/api/src/queueAdapters/base.ts +++ b/packages/api/src/queueAdapters/base.ts @@ -2,8 +2,8 @@ import { FormatterField, JobCleanStatus, JobCounts, - JobRetryStatus, JobStatus, + MetricsType, QueueAdapterOptions, QueueJob, QueueMetrics, @@ -58,7 +58,7 @@ export abstract class BaseAdapter { public abstract getJobLogs(id: string): Promise; - public abstract getMetrics(type: JobRetryStatus, start?: number, end?: number): Promise; + public abstract getMetrics(type: MetricsType, start?: number, end?: number): Promise; public abstract getName(): string; diff --git a/packages/api/src/queueAdapters/bull.ts b/packages/api/src/queueAdapters/bull.ts index ed8b78d8..6ebcf0b2 100644 --- a/packages/api/src/queueAdapters/bull.ts +++ b/packages/api/src/queueAdapters/bull.ts @@ -1,5 +1,5 @@ import { Job, Queue } from 'bull'; -import { JobCleanStatus, JobCounts, JobRetryStatus, JobStatus, QueueAdapterOptions } from '../../typings/app'; +import { JobCleanStatus, JobCounts, JobStatus, MetricsType, QueueAdapterOptions } from '../../typings/app'; import { STATUSES } from '../constants/statuses'; import { BaseAdapter } from './base'; @@ -50,7 +50,7 @@ export class BullAdapter extends BaseAdapter { return this.queue.getJobLogs(id).then(({ logs }) => logs); } - public getMetrics(type: JobRetryStatus, start?: number, end?: number) { + public getMetrics(type: MetricsType, start?: number, end?: number) { return this.queue.getMetrics(type, start, end); } diff --git a/packages/api/src/queueAdapters/bullMQ.ts b/packages/api/src/queueAdapters/bullMQ.ts index dacac442..c66c3c1c 100644 --- a/packages/api/src/queueAdapters/bullMQ.ts +++ b/packages/api/src/queueAdapters/bullMQ.ts @@ -1,5 +1,5 @@ import { Job, Queue } from 'bullmq'; -import { JobCleanStatus, JobCounts, JobRetryStatus, JobStatus, QueueAdapterOptions } from '../../typings/app'; +import { JobCleanStatus, JobCounts, JobStatus, MetricsType, QueueAdapterOptions } from '../../typings/app'; import { STATUSES } from '../constants/statuses'; import { BaseAdapter } from './base'; @@ -37,7 +37,7 @@ export class BullMQAdapter extends BaseAdapter { return this.queue.getJobLogs(id).then(({ logs }) => logs); } - public getMetrics(type: JobRetryStatus, start?: number, end?: number) { + public getMetrics(type: MetricsType, start?: number, end?: number) { return this.queue.getMetrics(type, start, end); } diff --git a/packages/api/typings/app.ts b/packages/api/typings/app.ts index e913dec9..16b2951b 100644 --- a/packages/api/typings/app.ts +++ b/packages/api/typings/app.ts @@ -12,6 +12,8 @@ export type JobStatus = keyof Omit; export type JobCounts = Record; +export type MetricsType = 'completed' | 'failed' + export interface QueueAdapterOptions { readOnlyMode: boolean; allowRetries: boolean;