From f980219fe4c922f5d4d43645a959181330610ffd Mon Sep 17 00:00:00 2001 From: Jonathan Jogenfors Date: Fri, 13 Dec 2024 00:03:08 +0100 Subject: [PATCH] wip batch imports --- e2e/test-assets | 2 +- server/src/interfaces/asset.interface.ts | 1 + server/src/interfaces/job.interface.ts | 4 +- server/src/repositories/asset.repository.ts | 4 + server/src/services/library.service.ts | 111 ++++++-------------- 5 files changed, 43 insertions(+), 79 deletions(-) diff --git a/e2e/test-assets b/e2e/test-assets index 99544a200412d5..c4a0575c3e89a7 160000 --- a/e2e/test-assets +++ b/e2e/test-assets @@ -1 +1 @@ -Subproject commit 99544a200412d553103cc7b8f1a28f339c7cffd9 +Subproject commit c4a0575c3e89a755b951ae6d91e7307cd34c606f diff --git a/server/src/interfaces/asset.interface.ts b/server/src/interfaces/asset.interface.ts index 7e3994a21d7bcc..b77f41a9e45b9f 100644 --- a/server/src/interfaces/asset.interface.ts +++ b/server/src/interfaces/asset.interface.ts @@ -151,6 +151,7 @@ export const IAssetRepository = 'IAssetRepository'; export interface IAssetRepository { create(asset: AssetCreate): Promise; + createAll(assets: AssetCreate[]): Promise; getByIds( ids: string[], relations?: FindOptionsRelations, diff --git a/server/src/interfaces/job.interface.ts b/server/src/interfaces/job.interface.ts index c55b2aefe0bea8..86693a949e674c 100644 --- a/server/src/interfaces/job.interface.ts +++ b/server/src/interfaces/job.interface.ts @@ -84,7 +84,7 @@ export enum JobName { // library management LIBRARY_QUEUE_SYNC_FILES = 'library-queue-sync-files', LIBRARY_QUEUE_SYNC_ASSETS = 'library-queue-sync-assets', - LIBRARY_SYNC_FILE = 'library-sync-file', + LIBRARY_SYNC_FILES = 'library-sync-files', LIBRARY_SYNC_ASSETS = 'library-sync-assets', LIBRARY_DELETE = 'library-delete', LIBRARY_QUEUE_SYNC_ALL = 'library-queue-sync-all', @@ -290,7 +290,7 @@ export type JobItem = | { name: JobName.ASSET_DELETION_CHECK; data?: IBaseJob } // Library Management - | { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob } + | { name: JobName.LIBRARY_SYNC_FILES; data: ILibraryFileJob } | { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob } | { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob } | { name: JobName.LIBRARY_SYNC_ASSETS; data: ILibraryBulkIdsJob } diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index 821b66b4fd4e1b..24ccb3ff17c0c7 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -78,6 +78,10 @@ export class AssetRepository implements IAssetRepository { return this.repository.save(asset); } + createAll(assets: AssetCreate[]): Promise { + return this.repository.save(assets); + } + @GenerateSql({ params: [[DummyValue.UUID], { day: 1, month: 1 }] }) getByDayOfYear(ownerIds: string[], { day, month }: MonthDay): Promise { return this.repository diff --git a/server/src/services/library.service.ts b/server/src/services/library.service.ts index ae2d4b94fd055a..35895c192ff0b3 100644 --- a/server/src/services/library.service.ts +++ b/server/src/services/library.service.ts @@ -208,17 +208,15 @@ export class LibraryService extends BaseService { return mapLibrary(library); } + @OnJob({ name: JobName.LIBRARY_SYNC_FILES, queue: QueueName.LIBRARY }) private async syncFiles({ id, ownerId }: LibraryEntity, assetPaths: string[]) { - await this.jobRepository.queueAll( - assetPaths.map((assetPath) => ({ - name: JobName.LIBRARY_SYNC_FILE, - data: { - libraryId: id, - assetPath, - ownerId, - }, - })), - ); + const assets = assetPaths.map((assetPath) => this.prepareAsset(assetPath, ownerId, id)); + + const assetIds = await this.assetRepository.createAll(assets).then((assets) => assets.map((asset) => asset.id)); + + await this.queuePostSyncJobs(assetIds); + + return JobStatus.SUCCESS; } private async validateImportPath(importPath: string): Promise { @@ -332,72 +330,26 @@ export class LibraryService extends BaseService { return JobStatus.SUCCESS; } - @OnJob({ name: JobName.LIBRARY_SYNC_FILE, queue: QueueName.LIBRARY }) - async handleSyncFile(job: JobOf): Promise { - /* For performance reasons, we don't check if the asset is already imported. - This is instead handled by a previous step in the scan process. - In the edge case of an asset being imported between that check - and this function call, the database constraint will prevent duplicates. - */ - - const assetPath = path.normalize(job.assetPath); - - // TODO: we can replace this get call with an exists call - /* let asset = await this.assetRepository.getByLibraryIdAndOriginalPath(job.libraryId, assetPath); - if (asset) { - return await this.handleSyncAssets({ libraryId: job.libraryId, assetIds: [asset.id] }); - } */ - - this.logger.log(`Importing new asset ${assetPath} into library ${job.libraryId}`); - - let stat; - try { - stat = await this.storageRepository.stat(assetPath); - } catch (error: any) { - if (error.code === 'ENOENT') { - this.logger.error(`File not found: ${assetPath}`); - return JobStatus.SKIPPED; - } - this.logger.error(`Error reading file: ${assetPath}. Error: ${error}`); - return JobStatus.FAILED; - } - - // TODO: device asset id is deprecated, remove it - const deviceAssetId = `${basename(assetPath)}`.replaceAll(/\s+/g, ''); - - const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`); - - // TODO: doesn't xmp replace the file extension? Will need investigation - let sidecarPath: string | null = null; - if (await this.storageRepository.checkFileExists(`${assetPath}.xmp`, R_OK)) { - sidecarPath = `${assetPath}.xmp`; - } - - const assetType = mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE; + private prepareAsset(assetPath: string, ownerId: string, libraryId: string) { + const normalizedPath = path.normalize(assetPath); - const mtime = stat.mtime; + const now = new Date(); - const asset = await this.assetRepository.create({ - ownerId: job.ownerId, - libraryId: job.libraryId, - checksum: pathHash, - originalPath: assetPath, - deviceAssetId, + return { + ownerId: ownerId, + libraryId: libraryId, + checksum: this.cryptoRepository.hashSha1(`path:${assetPath}`), + originalPath: normalizedPath, + // TODO: device asset id is deprecated, remove it + deviceAssetId: `${basename(normalizedPath)}`.replaceAll(/\s+/g, ''), deviceId: 'Library Import', - fileCreatedAt: mtime, - fileModifiedAt: mtime, - localDateTime: mtime, - type: assetType, - originalFileName: parse(assetPath).base, - sidecarPath, + fileCreatedAt: now, + fileModifiedAt: now, + localDateTime: now, + type: mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE, + originalFileName: parse(normalizedPath).base, isExternal: true, - }); - - this.logger.debug(`Queueing metadata extraction for: ${asset.originalPath}`); - - await this.queuePostSyncJobs([asset.id]); - - return JobStatus.SUCCESS; + }; } async queuePostSyncJobs(assetIds: string[]) { @@ -407,6 +359,13 @@ export class LibraryService extends BaseService { data: { id: assetId, source: 'upload' }, })), ); + + await this.jobRepository.queueAll( + assetIds.map((assetId) => ({ + name: JobName.SIDECAR_DISCOVERY, + data: { id: assetId, source: 'upload' }, + })), + ); } async queueScan(id: string) { @@ -607,21 +566,21 @@ export class LibraryService extends BaseService { importCount += newPaths.length; await this.syncFiles(library, newPaths); if (newPaths.length < pathBatch.length) { - this.logger.log( + this.logger.debug( `Current crawl batch: ${newPaths.length} of ${pathBatch.length} file(s) are new, queued import for library ${library.id}...`, ); } else { - this.logger.log( + this.logger.debug( `Current crawl batch: ${newPaths.length} new file(s), queued import for library ${library.id}...`, ); } } else { - this.logger.log(`Current crawl batch: ${pathBatch.length} asset(s) already in library ${library.id}`); + this.logger.debug(`Current crawl batch: ${pathBatch.length} asset(s) already in library ${library.id}`); } } if (importCount > 0 && importCount === crawlCount) { - this.logger.log(`Finished crawling and queueing ${crawlCount} files for import for library ${library.id}`); + this.logger.log(`Finished crawling and queueing ${crawlCount} file(s) for import for library ${library.id}`); } else if (importCount > 0) { this.logger.log( `Finished crawling ${crawlCount} file(s) of which ${importCount} are queued for import for library ${library.id}`,