diff --git a/apps/nestjs-backend/src/features/attachments/attachments-crop.job.ts b/apps/nestjs-backend/src/features/attachments/attachments-crop.job.ts new file mode 100644 index 0000000000..cd9690b639 --- /dev/null +++ b/apps/nestjs-backend/src/features/attachments/attachments-crop.job.ts @@ -0,0 +1,22 @@ +import { InjectQueue } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; +import { Queue } from 'bullmq'; + +export interface IRecordImageJob { + bucket: string; + token: string; + path: string; + mimetype: string; + height?: number | null; +} + +export const ATTACHMENTS_CROP_QUEUE = 'attachments-crop-queue'; + +@Injectable() +export class AttachmentsCropJob { + constructor(@InjectQueue(ATTACHMENTS_CROP_QUEUE) public readonly queue: Queue) {} + + addAttachmentCropImage(data: IRecordImageJob) { + return this.queue.add('attachment_crop_image', data); + } +} diff --git a/apps/nestjs-backend/src/features/attachments/attachments-crop.module.ts b/apps/nestjs-backend/src/features/attachments/attachments-crop.module.ts index cf06048cbb..949c829d2d 100644 --- a/apps/nestjs-backend/src/features/attachments/attachments-crop.module.ts +++ b/apps/nestjs-backend/src/features/attachments/attachments-crop.module.ts @@ -1,14 +1,19 @@ import { Module } from '@nestjs/common'; import { EventJobModule } from '../../event-emitter/event-job/event-job.module'; -import { - ATTACHMENTS_CROP_QUEUE, - AttachmentsCropQueueProcessor, -} from './attachments-crop.processor'; +import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../utils/queue'; +import { ATTACHMENTS_CROP_QUEUE, AttachmentsCropJob } from './attachments-crop.job'; +import { AttachmentsCropQueueProcessor } from './attachments-crop.processor'; import { AttachmentsStorageModule } from './attachments-storage.module'; @Module({ - providers: [AttachmentsCropQueueProcessor], + providers: [ + ...conditionalQueueProcessorProviders({ + consumer: QueueConsumerType.ImageCrop, + providers: [AttachmentsCropQueueProcessor], + }), + AttachmentsCropJob, + ], imports: [EventJobModule.registerQueue(ATTACHMENTS_CROP_QUEUE), AttachmentsStorageModule], - exports: [AttachmentsCropQueueProcessor], + exports: [AttachmentsCropJob], }) export class AttachmentsCropModule {} diff --git a/apps/nestjs-backend/src/features/attachments/attachments-crop.processor.ts b/apps/nestjs-backend/src/features/attachments/attachments-crop.processor.ts index bb72fa383e..f746f98aaf 100644 --- a/apps/nestjs-backend/src/features/attachments/attachments-crop.processor.ts +++ b/apps/nestjs-backend/src/features/attachments/attachments-crop.processor.ts @@ -1,21 +1,12 @@ -import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq'; +import { Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import { PrismaService } from '@teable/db-main-prisma'; -import { Queue } from 'bullmq'; import type { Job } from 'bullmq'; import { EventEmitterService } from '../../event-emitter/event-emitter.service'; import { Events } from '../../event-emitter/events'; import { AttachmentsStorageService } from '../attachments/attachments-storage.service'; - -interface IRecordImageJob { - bucket: string; - token: string; - path: string; - mimetype: string; - height?: number | null; -} - -export const ATTACHMENTS_CROP_QUEUE = 'attachments-crop-queue'; +import type { IRecordImageJob } from './attachments-crop.job'; +import { ATTACHMENTS_CROP_QUEUE } from './attachments-crop.job'; @Injectable() @Processor(ATTACHMENTS_CROP_QUEUE) @@ -25,8 +16,7 @@ export class AttachmentsCropQueueProcessor extends WorkerHost { constructor( private readonly prismaService: PrismaService, private readonly attachmentsStorageService: AttachmentsStorageService, - private readonly eventEmitterService: EventEmitterService, - @InjectQueue(ATTACHMENTS_CROP_QUEUE) public readonly queue: Queue + private readonly eventEmitterService: EventEmitterService ) { super(); } diff --git a/apps/nestjs-backend/src/features/attachments/attachments.service.ts b/apps/nestjs-backend/src/features/attachments/attachments.service.ts index fcb9bcf659..806cbc18a2 100644 --- a/apps/nestjs-backend/src/features/attachments/attachments.service.ts +++ b/apps/nestjs-backend/src/features/attachments/attachments.service.ts @@ -27,7 +27,7 @@ import { ThresholdConfig, IThresholdConfig } from '../../configs/threshold.confi import type { IClsStore } from '../../types/cls'; import { FileUtils } from '../../utils'; import { second } from '../../utils/second'; -import { AttachmentsCropQueueProcessor } from './attachments-crop.processor'; +import { AttachmentsCropJob } from './attachments-crop.job'; import { AttachmentsStorageService } from './attachments-storage.service'; import StorageAdapter from './plugins/adapter'; import type { LocalStorage } from './plugins/local'; @@ -42,7 +42,7 @@ export class AttachmentsService { private readonly cls: ClsService, private readonly cacheService: CacheService, private readonly attachmentsStorageService: AttachmentsStorageService, - private readonly attachmentsCropQueueProcessor: AttachmentsCropQueueProcessor, + private readonly attachmentsCropJob: AttachmentsCropJob, @StorageConfig() readonly storageConfig: IStorageConfig, @ThresholdConfig() readonly thresholdConfig: IThresholdConfig, @InjectStorageAdapter() readonly storageAdapter: StorageAdapter @@ -170,7 +170,7 @@ export class AttachmentsService { path: true, }, }); - await this.attachmentsCropQueueProcessor.queue.add('attachment_crop_image', { + await this.attachmentsCropJob.addAttachmentCropImage({ token: attachment.token, path: attachment.path, mimetype: attachment.mimetype, diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.job.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.job.ts new file mode 100644 index 0000000000..ffe320d94d --- /dev/null +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.job.ts @@ -0,0 +1,18 @@ +import { InjectQueue } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; +import { Queue } from 'bullmq'; + +export interface IBaseImportAttachmentsCsvJob { + path: string; + userId: string; +} + +export const BASE_IMPORT_ATTACHMENTS_CSV_QUEUE = 'base-import-attachments-csv-queue'; + +@Injectable() +export class BaseImportAttachmentsCsvJob { + constructor( + @InjectQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE) + public readonly queue: Queue + ) {} +} diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.module.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.module.ts index e74a3283f6..297382db6e 100644 --- a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.module.ts +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.module.ts @@ -1,14 +1,21 @@ import { Module } from '@nestjs/common'; import { EventJobModule } from '../../../event-emitter/event-job/event-job.module'; +import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue'; import { StorageModule } from '../../attachments/plugins/storage.module'; import { - BaseImportAttachmentsCsvQueueProcessor, BASE_IMPORT_ATTACHMENTS_CSV_QUEUE, -} from './base-import-attachments-csv.processor'; - + BaseImportAttachmentsCsvJob, +} from './base-import-attachments-csv.job'; +import { BaseImportAttachmentsCsvQueueProcessor } from './base-import-attachments-csv.processor'; @Module({ - providers: [BaseImportAttachmentsCsvQueueProcessor], + providers: [ + ...conditionalQueueProcessorProviders({ + consumer: QueueConsumerType.ImportExport, + providers: [BaseImportAttachmentsCsvQueueProcessor], + }), + BaseImportAttachmentsCsvJob, + ], imports: [EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE), StorageModule], - exports: [BaseImportAttachmentsCsvQueueProcessor], + exports: [BaseImportAttachmentsCsvJob], }) export class BaseImportAttachmentsCsvModule {} diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.processor.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.processor.ts index eed9cb932f..7f3c6ffa80 100644 --- a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.processor.ts +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.processor.ts @@ -1,22 +1,16 @@ -import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq'; +import { Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import type { Attachments } from '@teable/db-main-prisma'; import { PrismaService } from '@teable/db-main-prisma'; import { UploadType } from '@teable/openapi'; import type { Job } from 'bullmq'; -import { Queue } from 'bullmq'; import * as csvParser from 'csv-parser'; import * as unzipper from 'unzipper'; import StorageAdapter from '../../attachments/plugins/adapter'; import { InjectStorageAdapter } from '../../attachments/plugins/storage'; import { BatchProcessor } from '../BatchProcessor.class'; - -interface IBaseImportAttachmentsCsvJob { - path: string; - userId: string; -} - -export const BASE_IMPORT_ATTACHMENTS_CSV_QUEUE = 'base-import-attachments-csv-queue'; +import type { IBaseImportAttachmentsCsvJob } from './base-import-attachments-csv.job'; +import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job'; @Injectable() @Processor(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE) @@ -27,9 +21,7 @@ export class BaseImportAttachmentsCsvQueueProcessor extends WorkerHost { constructor( private readonly prismaService: PrismaService, - @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter, - @InjectQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE) - public readonly queue: Queue + @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter ) { super(); } diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.job.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.job.ts new file mode 100644 index 0000000000..e1cb6e2cbf --- /dev/null +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.job.ts @@ -0,0 +1,17 @@ +import { InjectQueue } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; +import { Queue } from 'bullmq'; + +export interface IBaseImportJob { + path: string; + userId: string; +} + +export const BASE_IMPORT_ATTACHMENTS_QUEUE = 'base-import-attachments-queue'; + +@Injectable() +export class BaseImportAttachmentsJob { + constructor( + @InjectQueue(BASE_IMPORT_ATTACHMENTS_QUEUE) public readonly queue: Queue + ) {} +} diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.module.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.module.ts index 6a5d4098b8..7e03400b75 100644 --- a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.module.ts +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.module.ts @@ -1,23 +1,28 @@ import { Module } from '@nestjs/common'; import { EventJobModule } from '../../../event-emitter/event-job/event-job.module'; +import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue'; import { StorageModule } from '../../attachments/plugins/storage.module'; +import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job'; import { BaseImportAttachmentsCsvModule } from './base-import-attachments-csv.module'; -import { - BaseImportAttachmentsCsvQueueProcessor, - BASE_IMPORT_ATTACHMENTS_CSV_QUEUE, -} from './base-import-attachments-csv.processor'; import { BASE_IMPORT_ATTACHMENTS_QUEUE, - BaseImportAttachmentsQueueProcessor, -} from './base-import-attachments.processor'; + BaseImportAttachmentsJob, +} from './base-import-attachments.job'; +import { BaseImportAttachmentsQueueProcessor } from './base-import-attachments.processor'; @Module({ - providers: [BaseImportAttachmentsQueueProcessor, BaseImportAttachmentsCsvQueueProcessor], + providers: [ + ...conditionalQueueProcessorProviders({ + consumer: QueueConsumerType.ImportExport, + providers: [BaseImportAttachmentsQueueProcessor], + }), + BaseImportAttachmentsJob, + ], imports: [ EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_QUEUE), EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE), StorageModule, BaseImportAttachmentsCsvModule, ], - exports: [BaseImportAttachmentsQueueProcessor, BaseImportAttachmentsCsvQueueProcessor], + exports: [BaseImportAttachmentsJob], }) export class BaseImportAttachmentsModule {} diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.processor.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.processor.ts index e934a89c73..a3b9516a47 100644 --- a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.processor.ts +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.processor.ts @@ -1,24 +1,19 @@ /* eslint-disable sonarjs/no-duplicate-string */ import { PassThrough } from 'stream'; -import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import { PrismaService } from '@teable/db-main-prisma'; import { UploadType } from '@teable/openapi'; -import { Queue, Job } from 'bullmq'; +import { Job } from 'bullmq'; import * as unzipper from 'unzipper'; import StorageAdapter from '../../attachments/plugins/adapter'; import { InjectStorageAdapter } from '../../attachments/plugins/storage'; import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE, - BaseImportAttachmentsCsvQueueProcessor, -} from './base-import-attachments-csv.processor'; - -interface IBaseImportJob { - path: string; - userId: string; -} - -export const BASE_IMPORT_ATTACHMENTS_QUEUE = 'base-import-attachments-queue'; + BaseImportAttachmentsCsvJob, +} from './base-import-attachments-csv.job'; +import type { IBaseImportJob } from './base-import-attachments.job'; +import { BASE_IMPORT_ATTACHMENTS_QUEUE } from './base-import-attachments.job'; @Injectable() @Processor(BASE_IMPORT_ATTACHMENTS_QUEUE) @@ -28,9 +23,8 @@ export class BaseImportAttachmentsQueueProcessor extends WorkerHost { constructor( private readonly prismaService: PrismaService, - private readonly baseImportAttachmentsCsvQueueProcessor: BaseImportAttachmentsCsvQueueProcessor, - @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter, - @InjectQueue(BASE_IMPORT_ATTACHMENTS_QUEUE) public readonly queue: Queue + private readonly baseImportAttachmentsCsvJob: BaseImportAttachmentsCsvJob, + @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter ) { super(); } @@ -234,7 +228,7 @@ export class BaseImportAttachmentsQueueProcessor extends WorkerHost { @OnWorkerEvent('completed') async onCompleted(job: Job) { const { path, userId } = job.data; - this.baseImportAttachmentsCsvQueueProcessor.queue.add( + this.baseImportAttachmentsCsvJob.queue.add( BASE_IMPORT_ATTACHMENTS_CSV_QUEUE, { path, diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.job.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.job.ts new file mode 100644 index 0000000000..acf6d081d5 --- /dev/null +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.job.ts @@ -0,0 +1,23 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +import { InjectQueue } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; +import type { IBaseJson } from '@teable/openapi'; +import { Queue } from 'bullmq'; +export interface IBaseImportCsvJob { + path: string; + userId: string; + tableIdMap: Record; + fieldIdMap: Record; + viewIdMap: Record; + fkMap: Record; + structure: IBaseJson; +} + +export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue'; + +@Injectable() +export class BaseImportCsvJob { + constructor( + @InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue + ) {} +} diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.module.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.module.ts index 1deba2c79b..6d553ebda2 100644 --- a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.module.ts +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.module.ts @@ -1,18 +1,26 @@ import { Module } from '@nestjs/common'; import { EventJobModule } from '../../../event-emitter/event-job/event-job.module'; +import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue'; import { StorageModule } from '../../attachments/plugins/storage.module'; -import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.processor'; -import { BASE_IMPORT_CSV_QUEUE, BaseImportCsvQueueProcessor } from './base-import-csv.processor'; +import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job'; +import { BASE_IMPORT_CSV_QUEUE, BaseImportCsvJob } from './base-import-csv.job'; +import { BaseImportCsvQueueProcessor } from './base-import-csv.processor'; import { BaseImportJunctionCsvModule } from './base-import-junction-csv.module'; @Module({ - providers: [BaseImportCsvQueueProcessor], + providers: [ + BaseImportCsvJob, + ...conditionalQueueProcessorProviders({ + consumer: QueueConsumerType.ImportExport, + providers: [BaseImportCsvQueueProcessor], + }), + ], imports: [ EventJobModule.registerQueue(BASE_IMPORT_CSV_QUEUE), EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE), StorageModule, BaseImportJunctionCsvModule, ], - exports: [BaseImportCsvQueueProcessor], + exports: [BaseImportCsvJob], }) export class BaseImportCsvModule {} diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts index c1dafcd163..377bdfc0f3 100644 --- a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts @@ -1,12 +1,11 @@ /* eslint-disable @typescript-eslint/naming-convention */ -import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import type { IAttachmentCellValue } from '@teable/core'; import { FieldType, generateAttachmentId } from '@teable/core'; import { PrismaService } from '@teable/db-main-prisma'; -import type { IBaseJson } from '@teable/openapi'; import { UploadType } from '@teable/openapi'; -import { Queue, Job } from 'bullmq'; +import { Job } from 'bullmq'; import * as csvParser from 'csv-parser'; import { Knex } from 'knex'; import { InjectModel } from 'nest-knexjs'; @@ -17,18 +16,9 @@ import StorageAdapter from '../../attachments/plugins/adapter'; import { InjectStorageAdapter } from '../../attachments/plugins/storage'; import { BatchProcessor } from '../BatchProcessor.class'; import { EXCLUDE_SYSTEM_FIELDS } from '../constant'; -import { BaseImportJunctionCsvQueueProcessor } from './base-import-junction.processor'; -interface IBaseImportCsvJob { - path: string; - userId: string; - tableIdMap: Record; - fieldIdMap: Record; - viewIdMap: Record; - fkMap: Record; - structure: IBaseJson; -} - -export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue'; +import type { IBaseImportCsvJob } from './base-import-csv.job'; +import { BASE_IMPORT_CSV_QUEUE } from './base-import-csv.job'; +import { BaseImportJunctionCsvJob } from './base-import-junction.job'; @Injectable() @Processor(BASE_IMPORT_CSV_QUEUE) @@ -39,10 +29,9 @@ export class BaseImportCsvQueueProcessor extends WorkerHost { constructor( private readonly prismaService: PrismaService, - private readonly baseImportJunctionCsvQueueProcessor: BaseImportJunctionCsvQueueProcessor, + private readonly baseImportJunctionCsvJob: BaseImportJunctionCsvJob, @InjectModel('CUSTOM_KNEX') private readonly knex: Knex, @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter, - @InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue, @InjectDbProvider() private readonly dbProvider: IDbProvider ) { super(); @@ -360,7 +349,7 @@ export class BaseImportCsvQueueProcessor extends WorkerHost { @OnWorkerEvent('completed') async onCompleted(job: Job) { const { fieldIdMap, path, structure, userId } = job.data; - await this.baseImportJunctionCsvQueueProcessor.queue.add( + await this.baseImportJunctionCsvJob.queue.add( 'import_base_junction_csv', { fieldIdMap, diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction-csv.module.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction-csv.module.ts index 788f793c34..a110381372 100644 --- a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction-csv.module.ts +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction-csv.module.ts @@ -1,14 +1,22 @@ import { Module } from '@nestjs/common'; import { EventJobModule } from '../../../event-emitter/event-job/event-job.module'; +import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue'; import { StorageModule } from '../../attachments/plugins/storage.module'; import { - BaseImportJunctionCsvQueueProcessor, BASE_IMPORT_JUNCTION_CSV_QUEUE, -} from './base-import-junction.processor'; + BaseImportJunctionCsvJob, +} from './base-import-junction.job'; +import { BaseImportJunctionCsvQueueProcessor } from './base-import-junction.processor'; @Module({ - providers: [BaseImportJunctionCsvQueueProcessor], + providers: [ + ...conditionalQueueProcessorProviders({ + consumer: QueueConsumerType.ImportExport, + providers: [BaseImportJunctionCsvQueueProcessor], + }), + BaseImportJunctionCsvJob, + ], imports: [EventJobModule.registerQueue(BASE_IMPORT_JUNCTION_CSV_QUEUE), StorageModule], - exports: [BaseImportJunctionCsvQueueProcessor], + exports: [BaseImportJunctionCsvJob], }) export class BaseImportJunctionCsvModule {} diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction.job.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction.job.ts new file mode 100644 index 0000000000..9684d14336 --- /dev/null +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction.job.ts @@ -0,0 +1,21 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +import { InjectQueue } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; +import type { IBaseJson } from '@teable/openapi'; +import { Queue } from 'bullmq'; + +export interface IBaseImportJunctionCsvJob { + path: string; + fieldIdMap: Record; + structure: IBaseJson; +} + +export const BASE_IMPORT_JUNCTION_CSV_QUEUE = 'base-import-junction-csv-queue'; + +@Injectable() +export class BaseImportJunctionCsvJob { + constructor( + @InjectQueue(BASE_IMPORT_JUNCTION_CSV_QUEUE) + public readonly queue: Queue + ) {} +} diff --git a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction.processor.ts b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction.processor.ts index 3df78b9fb3..f7a87ec6c6 100644 --- a/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction.processor.ts +++ b/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction.processor.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/naming-convention */ -import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq'; +import { Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import { PrismaClientKnownRequestError, @@ -11,7 +11,6 @@ import { PrismaService } from '@teable/db-main-prisma'; import type { IBaseJson } from '@teable/openapi'; import { UploadType } from '@teable/openapi'; import type { Job } from 'bullmq'; -import { Queue } from 'bullmq'; import * as csvParser from 'csv-parser'; import { Knex } from 'knex'; import { InjectModel } from 'nest-knexjs'; @@ -22,14 +21,8 @@ import StorageAdapter from '../../attachments/plugins/adapter'; import { InjectStorageAdapter } from '../../attachments/plugins/storage'; import { createFieldInstanceByRaw } from '../../field/model/factory'; import { BatchProcessor } from '../BatchProcessor.class'; - -interface IBaseImportJunctionCsvJob { - path: string; - fieldIdMap: Record; - structure: IBaseJson; -} - -export const BASE_IMPORT_JUNCTION_CSV_QUEUE = 'base-import-junction-csv-queue'; +import type { IBaseImportJunctionCsvJob } from './base-import-junction.job'; +import { BASE_IMPORT_JUNCTION_CSV_QUEUE } from './base-import-junction.job'; @Injectable() @Processor(BASE_IMPORT_JUNCTION_CSV_QUEUE) @@ -41,8 +34,6 @@ export class BaseImportJunctionCsvQueueProcessor extends WorkerHost { private readonly prismaService: PrismaService, @InjectModel('CUSTOM_KNEX') private readonly knex: Knex, @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter, - @InjectQueue(BASE_IMPORT_JUNCTION_CSV_QUEUE) - public readonly queue: Queue, @InjectDbProvider() private readonly dbProvider: IDbProvider ) { super(); diff --git a/apps/nestjs-backend/src/features/base/base-import.service.ts b/apps/nestjs-backend/src/features/base/base-import.service.ts index 92fb831e97..30753be49e 100644 --- a/apps/nestjs-backend/src/features/base/base-import.service.ts +++ b/apps/nestjs-backend/src/features/base/base-import.service.ts @@ -33,8 +33,8 @@ import { InjectStorageAdapter } from '../attachments/plugins/storage'; import { FieldDuplicateService } from '../field/field-duplicate/field-duplicate.service'; import { TableService } from '../table/table.service'; import { ViewOpenApiService } from '../view/open-api/view-open-api.service'; -import { BaseImportAttachmentsQueueProcessor } from './base-import-processor/base-import-attachments.processor'; -import { BaseImportCsvQueueProcessor } from './base-import-processor/base-import-csv.processor'; +import { BaseImportAttachmentsJob } from './base-import-processor/base-import-attachments.job'; +import { BaseImportCsvJob } from './base-import-processor/base-import-csv.job'; import { replaceStringByMap } from './utils'; @Injectable() @@ -47,8 +47,8 @@ export class BaseImportService { private readonly tableService: TableService, private readonly fieldDuplicateService: FieldDuplicateService, private readonly viewOpenApiService: ViewOpenApiService, - private readonly baseImportAttachmentsQueueProcessor: BaseImportAttachmentsQueueProcessor, - private readonly baseImportCsvQueueProcessor: BaseImportCsvQueueProcessor, + private readonly baseImportAttachmentsJob: BaseImportAttachmentsJob, + private readonly baseImportCsvJob: BaseImportCsvJob, @InjectModel('CUSTOM_KNEX') private readonly knex: Knex, @InjectDbProvider() private readonly dbProvider: IDbProvider, @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter, @@ -186,7 +186,7 @@ export class BaseImportService { private async uploadAttachments(path: string) { const userId = this.cls.get('user.id'); - await this.baseImportAttachmentsQueueProcessor.queue.add( + await this.baseImportAttachmentsJob.queue.add( 'import_base_attachments', { path, @@ -207,7 +207,7 @@ export class BaseImportService { structure: IBaseJson ) { const userId = this.cls.get('user.id'); - await this.baseImportCsvQueueProcessor.queue.add( + await this.baseImportCsvJob.queue.add( 'base_import_csv', { path, diff --git a/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.job.ts b/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.job.ts new file mode 100644 index 0000000000..f6fc1d4a14 --- /dev/null +++ b/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.job.ts @@ -0,0 +1,37 @@ +import { InjectQueue } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; +import type { FieldType } from '@teable/core'; +import type { IImportOptionRo, IImportColumn } from '@teable/openapi'; +import { Queue } from 'bullmq'; + +export interface ITableImportChunkJob { + baseId: string; + table: { + id: string; + name: string; + }; + userId: string; + importerParams: Pick & { + maxRowCount?: number; + }; + options: { + skipFirstNLines: number; + sheetKey: string; + notification: boolean; + }; + recordsCal: { + columnInfo?: IImportColumn[]; + fields: { id: string; type: FieldType }[]; + sourceColumnMap?: Record; + }; +} + +export const TABLE_IMPORT_CSV_CHUNK_QUEUE = 'import-table-csv-chunk-queue'; +export const TABLE_IMPORT_CSV_CHUNK_QUEUE_CONCURRENCY = 6; + +@Injectable() +export class ImportTableCsvChunkJob { + constructor( + @InjectQueue(TABLE_IMPORT_CSV_CHUNK_QUEUE) public readonly queue: Queue + ) {} +} diff --git a/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.module.ts b/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.module.ts index a675fe24bc..1b09d1e9e3 100644 --- a/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.module.ts +++ b/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.module.ts @@ -2,26 +2,31 @@ import { Module } from '@nestjs/common'; import { EventEmitterModule } from '@nestjs/event-emitter'; import { EventJobModule } from '../../../event-emitter/event-job/event-job.module'; import { ShareDbModule } from '../../../share-db/share-db.module'; +import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue'; import { StorageModule } from '../../attachments/plugins/storage.module'; import { NotificationModule } from '../../notification/notification.module'; import { RecordOpenApiModule } from '../../record/open-api/record-open-api.module'; -import { - ImportTableCsvChunkQueueProcessor, - TABLE_IMPORT_CSV_CHUNK_QUEUE, -} from './import-csv-chunk.processor'; -import { ImportTableCsvQueueProcessor, TABLE_IMPORT_CSV_QUEUE } from './import-csv.processor'; +import { ImportTableCsvChunkJob, TABLE_IMPORT_CSV_CHUNK_QUEUE } from './import-csv-chunk.job'; +import { ImportTableCsvChunkQueueProcessor } from './import-csv-chunk.processor'; +import { ImportCsvModule } from './import-csv.module'; @Module({ - providers: [ImportTableCsvChunkQueueProcessor, ImportTableCsvQueueProcessor], + providers: [ + ...conditionalQueueProcessorProviders({ + consumer: QueueConsumerType.ImportExport, + providers: [ImportTableCsvChunkQueueProcessor], + }), + ImportTableCsvChunkJob, + ], imports: [ EventJobModule.registerQueue(TABLE_IMPORT_CSV_CHUNK_QUEUE), - EventJobModule.registerQueue(TABLE_IMPORT_CSV_QUEUE), ShareDbModule, RecordOpenApiModule, NotificationModule, StorageModule, EventEmitterModule, + ImportCsvModule, ], - exports: [ImportTableCsvChunkQueueProcessor, ImportTableCsvQueueProcessor], + exports: [ImportTableCsvChunkJob], }) export class ImportCsvChunkModule {} diff --git a/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.processor.ts b/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.processor.ts index e32e92c06c..afb3d26301 100644 --- a/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.processor.ts +++ b/apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.processor.ts @@ -1,19 +1,21 @@ /* eslint-disable @typescript-eslint/naming-convention */ import { Readable } from 'stream'; import { Worker } from 'worker_threads'; -import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; -import type { FieldType } from '@teable/core'; import { getRandomString } from '@teable/core'; import { UploadType } from '@teable/openapi'; -import type { IImportOptionRo, IImportColumn } from '@teable/openapi'; -import { Job, Queue } from 'bullmq'; +import { Job } from 'bullmq'; import Papa from 'papaparse'; -import { EventEmitterService } from '../../../event-emitter/event-emitter.service'; import StorageAdapter from '../../attachments/plugins/adapter'; import { InjectStorageAdapter } from '../../attachments/plugins/storage'; import { NotificationService } from '../../notification/notification.service'; -import { ImportTableCsvQueueProcessor, TABLE_IMPORT_CSV_QUEUE } from './import-csv.processor'; +import type { ITableImportChunkJob } from './import-csv-chunk.job'; +import { + TABLE_IMPORT_CSV_CHUNK_QUEUE, + TABLE_IMPORT_CSV_CHUNK_QUEUE_CONCURRENCY, +} from './import-csv-chunk.job'; +import { ImportTableCsvJob, TABLE_IMPORT_CSV_QUEUE } from './import-csv.job'; import { getWorkerPath, importerFactory } from './import.class'; class ImportError extends Error { @@ -26,31 +28,6 @@ class ImportError extends Error { } } -interface ITableImportChunkJob { - baseId: string; - table: { - id: string; - name: string; - }; - userId: string; - importerParams: Pick & { - maxRowCount?: number; - }; - options: { - skipFirstNLines: number; - sheetKey: string; - notification: boolean; - }; - recordsCal: { - columnInfo?: IImportColumn[]; - fields: { id: string; type: FieldType }[]; - sourceColumnMap?: Record; - }; -} - -export const TABLE_IMPORT_CSV_CHUNK_QUEUE = 'import-table-csv-chunk-queue'; -export const TABLE_IMPORT_CSV_CHUNK_QUEUE_CONCURRENCY = 6; - @Injectable() @Processor(TABLE_IMPORT_CSV_CHUNK_QUEUE, { concurrency: TABLE_IMPORT_CSV_CHUNK_QUEUE_CONCURRENCY, @@ -62,10 +39,8 @@ export class ImportTableCsvChunkQueueProcessor extends WorkerHost { constructor( private readonly notificationService: NotificationService, - private readonly eventEmitterService: EventEmitterService, - private readonly importTableCsvQueueProcessor: ImportTableCsvQueueProcessor, - @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter, - @InjectQueue(TABLE_IMPORT_CSV_CHUNK_QUEUE) public readonly queue: Queue + private readonly importTableCsvJob: ImportTableCsvJob, + @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter ) { super(); } @@ -227,9 +202,9 @@ export class ImportTableCsvChunkQueueProcessor extends WorkerHost { } ); - const chunkJobId = this.importTableCsvQueueProcessor.getChunkImportJobId(jobId, range); + const chunkJobId = this.importTableCsvJob.getChunkImportJobId(jobId, range); - await this.importTableCsvQueueProcessor.queue.add( + await this.importTableCsvJob.queue.add( TABLE_IMPORT_CSV_QUEUE, { baseId, @@ -265,8 +240,8 @@ export class ImportTableCsvChunkQueueProcessor extends WorkerHost { this.logger.error(`import data to ${table.id} chunk data job failed, range: [${range}]`); - const allJobs = (await this.queue.getJobs(['waiting', 'active'])).filter((job) => - job.id?.startsWith(jobId) + const allJobs = (await this.importTableCsvJob.queue.getJobs(['waiting', 'active'])).filter( + (job) => job.id?.startsWith(jobId) ); for (const relatedJob of allJobs) { @@ -277,10 +252,7 @@ export class ImportTableCsvChunkQueueProcessor extends WorkerHost { } } - const localPresence = this.importTableCsvQueueProcessor.createImportPresence( - table.id, - 'status' - ); - this.importTableCsvQueueProcessor.setImportStatus(localPresence, true); + const localPresence = this.importTableCsvJob.createImportPresence(table.id, 'status'); + this.importTableCsvJob.setImportStatus(localPresence, true); } } diff --git a/apps/nestjs-backend/src/features/import/open-api/import-csv.job.ts b/apps/nestjs-backend/src/features/import/open-api/import-csv.job.ts new file mode 100644 index 0000000000..a4686a00ca --- /dev/null +++ b/apps/nestjs-backend/src/features/import/open-api/import-csv.job.ts @@ -0,0 +1,80 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +import { InjectQueue } from '@nestjs/bullmq'; +import { Injectable, Logger } from '@nestjs/common'; +import { + getActionTriggerChannel, + getRandomString, + getTableImportChannel, + type FieldType, +} from '@teable/core'; +import { type IImportColumn } from '@teable/openapi'; +import { Queue } from 'bullmq'; +import type { LocalPresence } from 'sharedb/lib/client'; +import { ShareDbService } from '../../../share-db/share-db.service'; + +export interface ITableImportCsvJob { + baseId: string; + userId: string; + path: string; + columnInfo?: IImportColumn[]; + fields: { id: string; type: FieldType }[]; + sourceColumnMap?: Record; + table: { id: string; name: string }; + range: [number, number]; + notification?: boolean; + lastChunk?: boolean; + parentJobId: string; +} + +export const TABLE_IMPORT_CSV_QUEUE = 'import-table-csv-queue'; + +@Injectable() +export class ImportTableCsvJob { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private presences: LocalPresence[] = []; + private logger = new Logger(ImportTableCsvJob.name); + + constructor( + private readonly shareDbService: ShareDbService, + @InjectQueue(TABLE_IMPORT_CSV_QUEUE) public readonly queue: Queue + ) {} + + public getChunkImportJobIdPrefix(parentId: string) { + return `${parentId}_import_${getRandomString(6)}`; + } + + public getChunkImportJobId(jobId: string, range: [number, number]) { + const prefix = this.getChunkImportJobIdPrefix(jobId); + return `${prefix}_[${range[0]},${range[1]}]`; + } + + setImportStatus(presence: LocalPresence, loading: boolean) { + presence.submit( + { + loading, + }, + (error) => { + error && this.logger.error(error); + } + ); + } + + createImportPresence(tableId: string, type: 'rowCount' | 'status' = 'status') { + const channel = + type === 'rowCount' ? getActionTriggerChannel(tableId) : getTableImportChannel(tableId); + const existPresence = this.presences.find(({ presence }) => { + return presence.channel === channel; + }); + if (existPresence) { + return existPresence; + } + const presence = this.shareDbService.connect().getPresence(channel); + const localPresence = presence.create(channel); + this.presences.push(localPresence); + return localPresence; + } + + deleteImportPresence(presenceId: string) { + this.presences = this.presences.filter((presence) => presence.presenceId !== presenceId); + } +} diff --git a/apps/nestjs-backend/src/features/import/open-api/import-csv.module.ts b/apps/nestjs-backend/src/features/import/open-api/import-csv.module.ts index c77adab4c0..b6a2b24423 100644 --- a/apps/nestjs-backend/src/features/import/open-api/import-csv.module.ts +++ b/apps/nestjs-backend/src/features/import/open-api/import-csv.module.ts @@ -2,13 +2,21 @@ import { Module } from '@nestjs/common'; import { EventEmitterModule } from '@nestjs/event-emitter'; import { EventJobModule } from '../../../event-emitter/event-job/event-job.module'; import { ShareDbModule } from '../../../share-db/share-db.module'; +import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue'; import { StorageModule } from '../../attachments/plugins/storage.module'; import { NotificationModule } from '../../notification/notification.module'; import { RecordOpenApiModule } from '../../record/open-api/record-open-api.module'; -import { ImportTableCsvQueueProcessor, TABLE_IMPORT_CSV_QUEUE } from './import-csv.processor'; +import { ImportTableCsvJob, TABLE_IMPORT_CSV_QUEUE } from './import-csv.job'; +import { ImportTableCsvQueueProcessor } from './import-csv.processor'; @Module({ - providers: [ImportTableCsvQueueProcessor], + providers: [ + ...conditionalQueueProcessorProviders({ + consumer: QueueConsumerType.ImportExport, + providers: [ImportTableCsvQueueProcessor], + }), + ImportTableCsvJob, + ], imports: [ EventJobModule.registerQueue(TABLE_IMPORT_CSV_QUEUE), ShareDbModule, @@ -17,6 +25,6 @@ import { ImportTableCsvQueueProcessor, TABLE_IMPORT_CSV_QUEUE } from './import-c StorageModule, EventEmitterModule, ], - exports: [ImportTableCsvQueueProcessor], + exports: [ImportTableCsvJob], }) export class ImportCsvModule {} diff --git a/apps/nestjs-backend/src/features/import/open-api/import-csv.processor.ts b/apps/nestjs-backend/src/features/import/open-api/import-csv.processor.ts index b95fd8437c..a37f8908ac 100644 --- a/apps/nestjs-backend/src/features/import/open-api/import-csv.processor.ts +++ b/apps/nestjs-backend/src/features/import/open-api/import-csv.processor.ts @@ -1,22 +1,15 @@ /* eslint-disable @typescript-eslint/naming-convention */ import { join } from 'path'; -import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; -import { - FieldKeyType, - FieldType, - getActionTriggerChannel, - getRandomString, - getTableImportChannel, -} from '@teable/core'; +import { FieldKeyType, FieldType } from '@teable/core'; import { PrismaService } from '@teable/db-main-prisma'; -import { UploadType, type IImportColumn } from '@teable/openapi'; -import { Job, Queue } from 'bullmq'; +import { UploadType } from '@teable/openapi'; +import { Job } from 'bullmq'; import { toString } from 'lodash'; import { ClsService } from 'nestjs-cls'; import Papa from 'papaparse'; import type { CreateOp } from 'sharedb'; -import type { LocalPresence } from 'sharedb/lib/client'; import { EventEmitterService } from '../../../event-emitter/event-emitter.service'; import { Events } from '../../../event-emitter/events'; import { ShareDbService } from '../../../share-db/share-db.service'; @@ -25,32 +18,16 @@ import StorageAdapter from '../../attachments/plugins/adapter'; import { InjectStorageAdapter } from '../../attachments/plugins/storage'; import { NotificationService } from '../../notification/notification.service'; import { RecordOpenApiService } from '../../record/open-api/record-open-api.service'; +import type { ITableImportCsvJob } from './import-csv.job'; +import { ImportTableCsvJob, TABLE_IMPORT_CSV_QUEUE } from './import-csv.job'; import { parseBoolean } from './import.class'; -interface ITableImportCsvJob { - baseId: string; - userId: string; - path: string; - columnInfo?: IImportColumn[]; - fields: { id: string; type: FieldType }[]; - sourceColumnMap?: Record; - table: { id: string; name: string }; - range: [number, number]; - notification?: boolean; - lastChunk?: boolean; - parentJobId: string; -} - -export const TABLE_IMPORT_CSV_QUEUE = 'import-table-csv-queue'; - @Injectable() @Processor(TABLE_IMPORT_CSV_QUEUE) export class ImportTableCsvQueueProcessor extends WorkerHost { public static readonly JOB_ID_PREFIX = 'import-table-csv'; private logger = new Logger(ImportTableCsvQueueProcessor.name); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - private presences: LocalPresence[] = []; constructor( private readonly recordOpenApiService: RecordOpenApiService, @@ -60,15 +37,15 @@ export class ImportTableCsvQueueProcessor extends WorkerHost { private readonly cls: ClsService, private readonly prismaService: PrismaService, @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter, - @InjectQueue(TABLE_IMPORT_CSV_QUEUE) public readonly queue: Queue + private readonly importTableCsvJob: ImportTableCsvJob ) { super(); } public async process(job: Job) { const { table, notification, baseId, userId, lastChunk, sourceColumnMap, range } = job.data; - const localPresence = this.createImportPresence(table.id, 'status'); - this.setImportStatus(localPresence, true); + const localPresence = this.importTableCsvJob.createImportPresence(table.id, 'status'); + this.importTableCsvJob.setImportStatus(localPresence, true); try { await this.handleImportChunkCsv(job); if (lastChunk) { @@ -85,11 +62,9 @@ export class ImportTableCsvQueueProcessor extends WorkerHost { tableId: table.id, }); - this.setImportStatus(localPresence, false); + this.importTableCsvJob.setImportStatus(localPresence, false); localPresence.destroy(); - this.presences = this.presences.filter( - (presence) => presence.presenceId !== localPresence.presenceId - ); + this.importTableCsvJob.deleteImportPresence(localPresence.presenceId); const dir = StorageAdapter.getDir(UploadType.Import); const fullPath = join(dir, job.data.parentJobId); @@ -114,8 +89,8 @@ export class ImportTableCsvQueueProcessor extends WorkerHost { } private async cleanRelativeTask(parentJobId: string) { - const allJobs = (await this.queue.getJobs(['waiting', 'active'])).filter((job) => - job.id?.startsWith(parentJobId) + const allJobs = (await this.importTableCsvJob.queue.getJobs(['waiting', 'active'])).filter( + (job) => job.id?.startsWith(parentJobId) ); for (const relatedJob of allJobs) { @@ -195,7 +170,7 @@ export class ImportTableCsvQueueProcessor extends WorkerHost { } private updateRowCount(tableId: string) { - const localPresence = this.createImportPresence(tableId, 'rowCount'); + const localPresence = this.importTableCsvJob.createImportPresence(tableId, 'rowCount'); localPresence.submit([{ actionKey: 'addRecord' }], (error) => { error && this.logger.error(error); }); @@ -223,41 +198,6 @@ export class ImportTableCsvQueueProcessor extends WorkerHost { }); } - setImportStatus(presence: LocalPresence, loading: boolean) { - presence.submit( - { - loading, - }, - (error) => { - error && this.logger.error(error); - } - ); - } - - createImportPresence(tableId: string, type: 'rowCount' | 'status' = 'status') { - const channel = - type === 'rowCount' ? getActionTriggerChannel(tableId) : getTableImportChannel(tableId); - const existPresence = this.presences.find(({ presence }) => { - return presence.channel === channel; - }); - if (existPresence) { - return existPresence; - } - const presence = this.shareDbService.connect().getPresence(channel); - const localPresence = presence.create(channel); - this.presences.push(localPresence); - return localPresence; - } - - public getChunkImportJobIdPrefix(parentId: string) { - return `${parentId}_import_${getRandomString(6)}`; - } - - public getChunkImportJobId(jobId: string, range: [number, number]) { - const prefix = this.getChunkImportJobIdPrefix(jobId); - return `${prefix}_[${range[0]},${range[1]}]`; - } - @OnWorkerEvent('active') onWorkerEvent(job: Job) { const { table, range } = job.data; @@ -273,8 +213,8 @@ export class ImportTableCsvQueueProcessor extends WorkerHost { const { table, range, parentJobId } = job.data; this.logger.error(`import data to ${table.id} job failed, range: [${range}]`); this.cleanRelativeTask(parentJobId); - const localPresence = this.createImportPresence(table.id, 'status'); - this.setImportStatus(localPresence, false); + const localPresence = this.importTableCsvJob.createImportPresence(table.id, 'status'); + this.importTableCsvJob.setImportStatus(localPresence, false); } @OnWorkerEvent('completed') diff --git a/apps/nestjs-backend/src/features/import/open-api/import-open-api.service.ts b/apps/nestjs-backend/src/features/import/open-api/import-open-api.service.ts index e827f42ab8..06a7a92c2f 100644 --- a/apps/nestjs-backend/src/features/import/open-api/import-open-api.service.ts +++ b/apps/nestjs-backend/src/features/import/open-api/import-open-api.service.ts @@ -10,18 +10,12 @@ import type { } from '@teable/openapi'; import { chunk, difference } from 'lodash'; import { ClsService } from 'nestjs-cls'; - -import { ShareDbService } from '../../../share-db/share-db.service'; import type { IClsStore } from '../../../types/cls'; import { FieldOpenApiService } from '../../field/open-api/field-open-api.service'; -import { NotificationService } from '../../notification/notification.service'; -import { RecordOpenApiService } from '../../record/open-api/record-open-api.service'; import { DEFAULT_VIEWS, DEFAULT_FIELDS } from '../../table/constant'; import { TableOpenApiService } from '../../table/open-api/table-open-api.service'; -import { - ImportTableCsvChunkQueueProcessor, - TABLE_IMPORT_CSV_CHUNK_QUEUE, -} from './import-csv-chunk.processor'; +import { ImportTableCsvChunkJob, TABLE_IMPORT_CSV_CHUNK_QUEUE } from './import-csv-chunk.job'; +import { ImportTableCsvChunkQueueProcessor } from './import-csv-chunk.processor'; import { importerFactory } from './import.class'; const maxFieldsLength = 500; @@ -34,10 +28,7 @@ export class ImportOpenApiService { private readonly tableOpenApiService: TableOpenApiService, private readonly cls: ClsService, private readonly prismaService: PrismaService, - private readonly recordOpenApiService: RecordOpenApiService, - private readonly notificationService: NotificationService, - private readonly shareDbService: ShareDbService, - private readonly importTableCsvChunkQueueProcessor: ImportTableCsvChunkQueueProcessor, + private readonly importTableCsvChunkJob: ImportTableCsvChunkJob, private readonly fieldOpenApiService: FieldOpenApiService ) {} @@ -103,7 +94,7 @@ export class ImportOpenApiService { const jobId = `${ImportTableCsvChunkQueueProcessor.JOB_ID_PREFIX}:${table.id}:${getRandomString(6)}`; if (importData && columns.length) { - await this.importTableCsvChunkQueueProcessor.queue.add( + await this.importTableCsvChunkJob.queue.add( `${TABLE_IMPORT_CSV_CHUNK_QUEUE}_job`, { baseId, @@ -220,7 +211,7 @@ export class ImportOpenApiService { const jobId = await this.generateChunkJobId(tableId); - await this.importTableCsvChunkQueueProcessor.queue.add( + await this.importTableCsvChunkJob.queue.add( `${TABLE_IMPORT_CSV_CHUNK_QUEUE}_job`, { baseId, diff --git a/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.job.ts b/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.job.ts new file mode 100644 index 0000000000..0986c49e9a --- /dev/null +++ b/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.job.ts @@ -0,0 +1,25 @@ +import { InjectQueue } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; +import type { MailType } from '@teable/openapi'; +import { type Queue } from 'bullmq'; +import { type ISendMailOptions } from '../mail-helpers'; + +export const MAIL_SENDER_QUEUE = 'mailSenderQueue'; + +export type IMailSenderMergePayload = Omit & { + mailType: MailType; + to: string; +}; +export type INotifyMailMergeSendPayload = { to: string }; + +export interface IMailSenderMergeJob { + payload: IMailSenderMergePayload | INotifyMailMergeSendPayload; +} + +@Injectable() +export class MailSenderMergeJob { + constructor( + @InjectQueue(MAIL_SENDER_QUEUE) + public readonly queue: Queue + ) {} +} diff --git a/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.module.ts b/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.module.ts index 637dcc219f..a89150f43e 100644 --- a/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.module.ts +++ b/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.module.ts @@ -1,8 +1,10 @@ import { Module } from '@nestjs/common'; import { EventJobModule } from '../../../event-emitter/event-job/event-job.module'; +import { conditionalQueueProcessorProviders } from '../../../utils/queue'; import { SettingOpenApiModule } from '../../setting/open-api/setting-open-api.module'; import { MailSenderModule } from '../mail-sender.module'; -import { MAIL_SENDER_QUEUE, MailSenderMergeProcessor } from './mail-sender.merge.processor'; +import { MAIL_SENDER_QUEUE, MailSenderMergeJob } from './mail-sender.merge.job'; +import { MailSenderMergeProcessor } from './mail-sender.merge.processor'; @Module({ imports: [ @@ -10,7 +12,12 @@ import { MAIL_SENDER_QUEUE, MailSenderMergeProcessor } from './mail-sender.merge EventJobModule.registerQueue(MAIL_SENDER_QUEUE), SettingOpenApiModule, ], - providers: [MailSenderMergeProcessor], - exports: [MailSenderMergeProcessor], + providers: [ + ...conditionalQueueProcessorProviders({ + providers: [MailSenderMergeProcessor], + }), + MailSenderMergeJob, + ], + exports: [MailSenderMergeJob], }) export class MailSenderMergeModule {} diff --git a/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.processor.ts b/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.processor.ts index 87e3f82d25..754d1382c3 100644 --- a/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.processor.ts +++ b/apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.processor.ts @@ -1,31 +1,26 @@ -import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq'; -import { Injectable, Logger } from '@nestjs/common'; +import { Processor, WorkerHost } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { MailTransporterType, MailType } from '@teable/openapi'; -import { type Job, type Queue } from 'bullmq'; -import dayjs from 'dayjs'; +import { type Job } from 'bullmq'; import { isUndefined } from 'lodash'; import { CacheService } from '../../../cache/cache.service'; import type { ICacheStore } from '../../../cache/types'; import { Events } from '../../../event-emitter/events'; import { SettingOpenApiService } from '../../setting/open-api/setting-open-api.service'; -import { type ISendMailOptions } from '../mail-helpers'; import { MailSenderService } from '../mail-sender.service'; - -export const MAIL_SENDER_QUEUE = 'mailSenderQueue'; +import type { + IMailSenderMergeJob, + INotifyMailMergeSendPayload, + IMailSenderMergePayload, +} from './mail-sender.merge.job'; +import { MAIL_SENDER_QUEUE, MailSenderMergeJob } from './mail-sender.merge.job'; enum MailSenderJob { NotifyMailMerge = 'notifyMailMerge', NotifyMailMergeSend = 'notifyMailMergeSend', } -type IMailSenderMergePayload = Omit & { mailType: MailType; to: string }; -type INotifyMailMergeSendPayload = { to: string }; - -interface IMailSenderMergeJob { - payload: IMailSenderMergePayload | INotifyMailMergeSendPayload; -} - @Processor(MAIL_SENDER_QUEUE) @Injectable() export class MailSenderMergeProcessor extends WorkerHost { @@ -33,8 +28,7 @@ export class MailSenderMergeProcessor extends WorkerHost { private readonly mailSenderService: MailSenderService, private readonly cacheService: CacheService, private readonly settingOpenApiService: SettingOpenApiService, - @InjectQueue(MAIL_SENDER_QUEUE) - public readonly queue: Queue + private readonly mailSenderMergeJob: MailSenderMergeJob ) { super(); } @@ -64,7 +58,7 @@ export class MailSenderMergeProcessor extends WorkerHost { @OnEvent(Events.NOTIFY_MAIL_MERGE) async onNotifyMailMerge(event: { payload: IMailSenderMergePayload }) { - await this.queue.add(MailSenderJob.NotifyMailMerge, { + await this.mailSenderMergeJob.queue.add(MailSenderJob.NotifyMailMerge, { payload: event.payload, }); } @@ -74,7 +68,7 @@ export class MailSenderMergeProcessor extends WorkerHost { const list = await this.cacheService.get(`mail-sender:notify-mail-merge:${to}`); if (isUndefined(list)) { await this.cacheService.set(`mail-sender:notify-mail-merge:${to}`, [], 1000 * 60 * 5); // 5 minutes - await this.queue.add( + await this.mailSenderMergeJob.queue.add( MailSenderJob.NotifyMailMergeSend, { payload: { to }, diff --git a/apps/nestjs-backend/src/features/setting/open-api/admin-open-api.service.ts b/apps/nestjs-backend/src/features/setting/open-api/admin-open-api.service.ts index 3a74d1f632..664d89c592 100644 --- a/apps/nestjs-backend/src/features/setting/open-api/admin-open-api.service.ts +++ b/apps/nestjs-backend/src/features/setting/open-api/admin-open-api.service.ts @@ -13,7 +13,7 @@ import { Knex } from 'knex'; import { InjectModel } from 'nest-knexjs'; import { PerformanceCacheService } from '../../../performance-cache'; import { Timing } from '../../../utils/timing'; -import { AttachmentsCropQueueProcessor } from '../../attachments/attachments-crop.processor'; +import { AttachmentsCropJob } from '../../attachments/attachments-crop.job'; import StorageAdapter from '../../attachments/plugins/adapter'; @Injectable() @@ -22,7 +22,7 @@ export class AdminOpenApiService { constructor( private readonly prismaService: PrismaService, @InjectModel('CUSTOM_KNEX') private readonly knex: Knex, - private readonly attachmentsCropQueueProcessor: AttachmentsCropQueueProcessor, + private readonly attachmentsCropJob: AttachmentsCropJob, private readonly performanceCacheService: PerformanceCacheService ) {} @@ -68,7 +68,7 @@ export class AdminOpenApiService { break; } total += attachments.length; - await this.attachmentsCropQueueProcessor.queue.addBulk( + await this.attachmentsCropJob.queue.addBulk( attachments.map((attachment) => ({ name: 'admin_attachment_crop_image', data: { diff --git a/apps/nestjs-backend/src/share-db/share-db.service.ts b/apps/nestjs-backend/src/share-db/share-db.service.ts index ceccfff36f..0dd9e32706 100644 --- a/apps/nestjs-backend/src/share-db/share-db.service.ts +++ b/apps/nestjs-backend/src/share-db/share-db.service.ts @@ -98,7 +98,7 @@ export class ShareDbService extends ShareDBClass { if (!tableIds.length) { return; } - await this.prismaService.txClient().tableMeta.updateMany({ + await this.prismaService.tableMeta.updateMany({ where: { id: { in: tableIds } }, data: { lastModifiedTime: new Date().toISOString() }, }); diff --git a/apps/nestjs-backend/src/utils/queue.ts b/apps/nestjs-backend/src/utils/queue.ts new file mode 100644 index 0000000000..f5d617b546 --- /dev/null +++ b/apps/nestjs-backend/src/utils/queue.ts @@ -0,0 +1,49 @@ +import { Logger } from '@nestjs/common'; + +export enum QueueConsumerType { + Automation = 'automation', + ImportExport = 'import-export', + ImageCrop = 'image-crop', + Default = 'default', +} + +export function conditionalQueueProcessorProviders( + ...opts: { + consumer?: QueueConsumerType; + providers: T[]; + }[] +): T[] { + if (process.env.BACKEND_DISABLE_QUEUE_CONSUMER === 'true') { + return []; + } + const selectedConsumer = (process.env.BACKEND_QUEUE_CONSUMER?.split(',')?.filter((v) => + Object.values(QueueConsumerType).includes(v as QueueConsumerType) + ) || []) as QueueConsumerType[]; + + // If selected consumer is provided, return providers for the selected consumer + if (selectedConsumer.length > 0) { + const providers: T[] = []; + for (const opt of opts) { + const consumer = opt.consumer || QueueConsumerType.Default; + if (selectedConsumer.includes(consumer)) { + providers.push(...opt.providers); + } + } + providers.length > 0 && + Logger.log( + `Queue Consumer Providers (${selectedConsumer.join(', ')}): ${providers.map((p) => (typeof p === 'function' ? p.name : 'unknown')).join(', ')}` + ); + return providers; + } + + // If no selected consumer is provided, return providers for all consumers + const providers: T[] = []; + for (const opt of opts) { + providers.push(...opt.providers); + } + providers.length > 0 && + Logger.log( + `Queue Consumer Providers (ALL): ${providers.map((p) => (typeof p === 'function' ? p.name : 'unknown')).join(', ')}` + ); + return providers; +} diff --git a/packages/sdk/src/context/app/useConnection.tsx b/packages/sdk/src/context/app/useConnection.tsx index 5e19ec70c8..f108da6865 100644 --- a/packages/sdk/src/context/app/useConnection.tsx +++ b/packages/sdk/src/context/app/useConnection.tsx @@ -58,7 +58,7 @@ export const useConnection = (path?: string) => { }, [socket, updateRefreshTime]); useConnectionAutoManage(socket, updateShareDb, { - // 10 minutes, it will be closed when the user is leave the page for 1 hour + // 1 hour, it will be closed when the user is leave the page for 1 hour inactiveTimeout: 1000 * 60 * 60, // reconnect when the browser is back for 2 seconds reconnectDelay: 2000,