From 8944a322fbb156a2a392076815724bcff78665a1 Mon Sep 17 00:00:00 2001 From: Jonathan Jogenfors Date: Tue, 10 Dec 2024 15:39:30 +0100 Subject: [PATCH] wip batch imports --- e2e/src/api/specs/library.e2e-spec.ts | 195 ++++++++- e2e/src/utils.ts | 15 +- server/src/interfaces/asset.interface.ts | 4 +- server/src/interfaces/job.interface.ts | 16 +- server/src/interfaces/library.interface.ts | 7 + server/src/repositories/asset.repository.ts | 79 ++-- server/src/services/library.service.ts | 402 ++++++++++-------- server/src/utils/asset.util.ts | 1 - server/src/utils/misc.spec.ts | 2 +- server/src/utils/misc.ts | 70 +-- .../repositories/asset.repository.mock.ts | 2 + 11 files changed, 535 insertions(+), 258 deletions(-) diff --git a/e2e/src/api/specs/library.e2e-spec.ts b/e2e/src/api/specs/library.e2e-spec.ts index 9c7796d1584c17..1959a230ad231a 100644 --- a/e2e/src/api/specs/library.e2e-spec.ts +++ b/e2e/src/api/specs/library.e2e-spec.ts @@ -421,7 +421,7 @@ describe('/libraries', () => { const { status } = await request(app) .post(`/libraries/${library.id}/scan`) .set('Authorization', `Bearer ${admin.accessToken}`) - .send({ refreshModifiedFiles: true }); + .send(); expect(status).toBe(204); await utils.waitForQueueFinish(admin.accessToken, 'library'); @@ -453,7 +453,7 @@ describe('/libraries', () => { const { status } = await request(app) .post(`/libraries/${library.id}/scan`) .set('Authorization', `Bearer ${admin.accessToken}`) - .send({ refreshModifiedFiles: true }); + .send(); expect(status).toBe(204); await utils.waitForQueueFinish(admin.accessToken, 'library'); @@ -577,7 +577,7 @@ describe('/libraries', () => { ]); }); - it('should not trash an online asset', async () => { + it('should not set an asset offline if its file exists, is in an import path, and not covered by an exclusion pattern', async () => { const library = await utils.createLibrary(admin.accessToken, { ownerId: admin.userId, importPaths: [`${testAssetDirInternal}/temp`], @@ -601,6 +601,195 @@ describe('/libraries', () => { expect(assets).toEqual(assetsBefore); }); + + it('should set an offline asset to online if its file exists, is in an import path, and not covered by an exclusion pattern', async () => { + utils.createImageFile(`${testAssetDir}/temp/offline/offline.png`); + + const library = await utils.createLibrary(admin.accessToken, { + ownerId: admin.userId, + importPaths: [`${testAssetDirInternal}/temp/offline`], + }); + + await scan(admin.accessToken, library.id); + await utils.waitForQueueFinish(admin.accessToken, 'library'); + + const { assets } = await utils.searchAssets(admin.accessToken, { libraryId: library.id }); + + utils.renameImageFile(`${testAssetDir}/temp/offline/offline.png`, `${testAssetDir}/temp/offline.png`); + + { + const { status } = await request(app) + .post(`/libraries/${library.id}/scan`) + .set('Authorization', `Bearer ${admin.accessToken}`) + .send(); + expect(status).toBe(204); + } + + await utils.waitForQueueFinish(admin.accessToken, 'library'); + + const offlineAsset = await utils.getAssetInfo(admin.accessToken, assets.items[0].id); + expect(offlineAsset.isTrashed).toBe(true); + expect(offlineAsset.originalPath).toBe(`${testAssetDirInternal}/temp/offline/offline.png`); + expect(offlineAsset.isOffline).toBe(true); + + { + const { assets } = await utils.searchAssets(admin.accessToken, { libraryId: library.id, withDeleted: true }); + expect(assets.count).toBe(1); + } + + utils.renameImageFile(`${testAssetDir}/temp/offline.png`, `${testAssetDir}/temp/offline/offline.png`); + + { + const { status } = await request(app) + .post(`/libraries/${library.id}/scan`) + .set('Authorization', `Bearer ${admin.accessToken}`) + .send(); + expect(status).toBe(204); + } + + await utils.waitForQueueFinish(admin.accessToken, 'library'); + + const backOnlineAsset = await utils.getAssetInfo(admin.accessToken, assets.items[0].id); + + expect(backOnlineAsset.isTrashed).toBe(false); + expect(backOnlineAsset.originalPath).toBe(`${testAssetDirInternal}/temp/offline/offline.png`); + expect(backOnlineAsset.isOffline).toBe(false); + + { + const { assets } = await utils.searchAssets(admin.accessToken, { libraryId: library.id }); + expect(assets.count).toBe(1); + } + }); + + it('should not set an offline asset to online if its file exists, is not covered by an exclusion pattern, but is outside of all import paths', async () => { + utils.createImageFile(`${testAssetDir}/temp/offline/offline.png`); + + const library = await utils.createLibrary(admin.accessToken, { + ownerId: admin.userId, + importPaths: [`${testAssetDirInternal}/temp/offline`], + }); + + await scan(admin.accessToken, library.id); + await utils.waitForQueueFinish(admin.accessToken, 'library'); + + const { assets } = await utils.searchAssets(admin.accessToken, { libraryId: library.id }); + + utils.renameImageFile(`${testAssetDir}/temp/offline/offline.png`, `${testAssetDir}/temp/offline.png`); + + { + const { status } = await request(app) + .post(`/libraries/${library.id}/scan`) + .set('Authorization', `Bearer ${admin.accessToken}`) + .send(); + expect(status).toBe(204); + } + + await utils.waitForQueueFinish(admin.accessToken, 'library'); + + { + const { assets } = await utils.searchAssets(admin.accessToken, { libraryId: library.id, withDeleted: true }); + expect(assets.count).toBe(1); + } + + const offlineAsset = await utils.getAssetInfo(admin.accessToken, assets.items[0].id); + + expect(offlineAsset.isTrashed).toBe(true); + expect(offlineAsset.originalPath).toBe(`${testAssetDirInternal}/temp/offline/offline.png`); + expect(offlineAsset.isOffline).toBe(true); + + utils.renameImageFile(`${testAssetDir}/temp/offline.png`, `${testAssetDir}/temp/offline/offline.png`); + + utils.createDirectory(`${testAssetDir}/temp/another-path/`); + + await utils.updateLibrary(admin.accessToken, library.id, { + importPaths: [`${testAssetDirInternal}/temp/another-path`], + }); + + { + const { status } = await request(app) + .post(`/libraries/${library.id}/scan`) + .set('Authorization', `Bearer ${admin.accessToken}`) + .send(); + expect(status).toBe(204); + } + + await utils.waitForQueueFinish(admin.accessToken, 'library'); + + const stillOfflineAsset = await utils.getAssetInfo(admin.accessToken, assets.items[0].id); + + expect(stillOfflineAsset.isTrashed).toBe(true); + expect(stillOfflineAsset.originalPath).toBe(`${testAssetDirInternal}/temp/offline/offline.png`); + expect(stillOfflineAsset.isOffline).toBe(true); + + { + const { assets } = await utils.searchAssets(admin.accessToken, { libraryId: library.id, withDeleted: true }); + expect(assets.count).toBe(1); + } + + utils.removeDirectory(`${testAssetDir}/temp/another-path/`); + }); + + it('should not set an offline asset to online if its file exists, is in an import path, but is covered by an exclusion pattern', async () => { + utils.createImageFile(`${testAssetDir}/temp/offline/offline.png`); + + const library = await utils.createLibrary(admin.accessToken, { + ownerId: admin.userId, + importPaths: [`${testAssetDirInternal}/temp/offline`], + }); + + await scan(admin.accessToken, library.id); + await utils.waitForQueueFinish(admin.accessToken, 'library'); + + const { assets } = await utils.searchAssets(admin.accessToken, { libraryId: library.id }); + + utils.renameImageFile(`${testAssetDir}/temp/offline/offline.png`, `${testAssetDir}/temp/offline.png`); + + { + const { status } = await request(app) + .post(`/libraries/${library.id}/scan`) + .set('Authorization', `Bearer ${admin.accessToken}`) + .send(); + expect(status).toBe(204); + } + + await utils.waitForQueueFinish(admin.accessToken, 'library'); + + { + const { assets } = await utils.searchAssets(admin.accessToken, { libraryId: library.id, withDeleted: true }); + expect(assets.count).toBe(1); + } + + const offlineAsset = await utils.getAssetInfo(admin.accessToken, assets.items[0].id); + + expect(offlineAsset.isTrashed).toBe(true); + expect(offlineAsset.originalPath).toBe(`${testAssetDirInternal}/temp/offline/offline.png`); + expect(offlineAsset.isOffline).toBe(true); + + utils.renameImageFile(`${testAssetDir}/temp/offline.png`, `${testAssetDir}/temp/offline/offline.png`); + + await utils.updateLibrary(admin.accessToken, library.id, { exclusionPatterns: ['**/offline/**'] }); + + { + const { status } = await request(app) + .post(`/libraries/${library.id}/scan`) + .set('Authorization', `Bearer ${admin.accessToken}`) + .send(); + expect(status).toBe(204); + } + + await utils.waitForQueueFinish(admin.accessToken, 'library'); + + const stillOfflineAsset = await utils.getAssetInfo(admin.accessToken, assets.items[0].id); + + expect(stillOfflineAsset.isTrashed).toBe(true); + expect(stillOfflineAsset.originalPath).toBe(`${testAssetDirInternal}/temp/offline/offline.png`); + expect(stillOfflineAsset.isOffline).toBe(true); + + { + const { assets } = await utils.searchAssets(admin.accessToken, { libraryId: library.id, withDeleted: true }); + expect(assets.count).toBe(1); + } + }); }); describe('POST /libraries/:id/validate', () => { diff --git a/e2e/src/utils.ts b/e2e/src/utils.ts index 14225ff063038c..b00c3c0b6d30dc 100644 --- a/e2e/src/utils.ts +++ b/e2e/src/utils.ts @@ -10,6 +10,7 @@ import { Permission, PersonCreateDto, SharedLinkCreateDto, + UpdateLibraryDto, UserAdminCreateDto, UserPreferencesUpdateDto, ValidateLibraryDto, @@ -35,6 +36,7 @@ import { updateAlbumUser, updateAssets, updateConfig, + updateLibrary, updateMyPreferences, upsertTags, validate, @@ -42,7 +44,7 @@ import { import { BrowserContext } from '@playwright/test'; import { exec, spawn } from 'node:child_process'; import { createHash } from 'node:crypto'; -import { existsSync, mkdirSync, rmSync, writeFileSync } from 'node:fs'; +import { existsSync, mkdirSync, renameSync, rmSync, writeFileSync } from 'node:fs'; import { tmpdir } from 'node:os'; import path, { dirname } from 'node:path'; import { setTimeout as setAsyncTimeout } from 'node:timers/promises'; @@ -392,6 +394,14 @@ export const utils = { rmSync(path); }, + renameImageFile: (oldPath: string, newPath: string) => { + if (!existsSync(oldPath)) { + return; + } + + renameSync(oldPath, newPath); + }, + removeDirectory: (path: string) => { if (!existsSync(path)) { return; @@ -444,6 +454,9 @@ export const utils = { createLibrary: (accessToken: string, dto: CreateLibraryDto) => createLibrary({ createLibraryDto: dto }, { headers: asBearerAuth(accessToken) }), + updateLibrary: (accessToken: string, id: string, dto: UpdateLibraryDto) => + updateLibrary({ id, updateLibraryDto: dto }, { headers: asBearerAuth(accessToken) }), + validateLibrary: (accessToken: string, id: string, dto: ValidateLibraryDto) => validate({ id, validateLibraryDto: dto }, { headers: asBearerAuth(accessToken) }), diff --git a/server/src/interfaces/asset.interface.ts b/server/src/interfaces/asset.interface.ts index add4a18ae36314..b77f41a9e45b9f 100644 --- a/server/src/interfaces/asset.interface.ts +++ b/server/src/interfaces/asset.interface.ts @@ -151,6 +151,7 @@ export const IAssetRepository = 'IAssetRepository'; export interface IAssetRepository { create(asset: AssetCreate): Promise; + createAll(assets: AssetCreate[]): Promise; getByIds( ids: string[], relations?: FindOptionsRelations, @@ -193,5 +194,6 @@ export interface IAssetRepository { getChangedDeltaSync(options: AssetDeltaSyncOptions): Promise; upsertFile(file: UpsertFileOptions): Promise; upsertFiles(files: UpsertFileOptions[]): Promise; - updateOffline(pagination: PaginationOptions, library: LibraryEntity): Paginated; + updateOffline(library: LibraryEntity): Promise; + getNewPaths(libraryId: string, paths: string[]): Promise; } diff --git a/server/src/interfaces/job.interface.ts b/server/src/interfaces/job.interface.ts index 1822a609c2ff8a..86693a949e674c 100644 --- a/server/src/interfaces/job.interface.ts +++ b/server/src/interfaces/job.interface.ts @@ -84,7 +84,7 @@ export enum JobName { // library management LIBRARY_QUEUE_SYNC_FILES = 'library-queue-sync-files', LIBRARY_QUEUE_SYNC_ASSETS = 'library-queue-sync-assets', - LIBRARY_SYNC_FILE = 'library-sync-file', + LIBRARY_SYNC_FILES = 'library-sync-files', LIBRARY_SYNC_ASSETS = 'library-sync-assets', LIBRARY_DELETE = 'library-delete', LIBRARY_QUEUE_SYNC_ALL = 'library-queue-sync-all', @@ -143,12 +143,18 @@ export interface IAssetDeleteJob extends IEntityJob { deleteOnDisk: boolean; } -export interface ILibraryFileJob extends IEntityJob { +export interface ILibraryFileJob { + libraryId: string; ownerId: string; assetPath: string; } -export interface IBulkEntityJob extends IBaseJob { +export interface ILibraryBulkIdsJob { + libraryId: string; + assetIds: string[]; +} + +export interface IBulkEntityJob { ids: string[]; } @@ -284,10 +290,10 @@ export type JobItem = | { name: JobName.ASSET_DELETION_CHECK; data?: IBaseJob } // Library Management - | { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob } + | { name: JobName.LIBRARY_SYNC_FILES; data: ILibraryFileJob } | { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob } | { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob } - | { name: JobName.LIBRARY_SYNC_ASSETS; data: IBulkEntityJob } + | { name: JobName.LIBRARY_SYNC_ASSETS; data: ILibraryBulkIdsJob } | { name: JobName.LIBRARY_DELETE; data: IEntityJob } | { name: JobName.LIBRARY_QUEUE_SYNC_ALL; data?: IBaseJob } | { name: JobName.LIBRARY_QUEUE_CLEANUP; data: IBaseJob } diff --git a/server/src/interfaces/library.interface.ts b/server/src/interfaces/library.interface.ts index d8f1a1303116e5..803cf1bc4ed310 100644 --- a/server/src/interfaces/library.interface.ts +++ b/server/src/interfaces/library.interface.ts @@ -1,8 +1,15 @@ +import { ADDED_IN_PREFIX } from 'src/constants'; import { LibraryStatsResponseDto } from 'src/dtos/library.dto'; import { LibraryEntity } from 'src/entities/library.entity'; export const ILibraryRepository = 'ILibraryRepository'; +export enum AssetSyncResult { + DO_NOTHING, + UPDATE, + OFFLINE, +} + export interface ILibraryRepository { getAll(withDeleted?: boolean): Promise; getAllDeleted(): Promise; diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index e9c54b90b843f4..24ccb3ff17c0c7 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -1,6 +1,5 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import picomatch from 'picomatch'; import { Chunked, ChunkedArray, DummyValue, GenerateSql } from 'src/decorators'; import { AssetFileEntity } from 'src/entities/asset-files.entity'; import { AssetJobStatusEntity } from 'src/entities/asset-job-status.entity'; @@ -79,6 +78,10 @@ export class AssetRepository implements IAssetRepository { return this.repository.save(asset); } + createAll(assets: AssetCreate[]): Promise { + return this.repository.save(assets); + } + @GenerateSql({ params: [[DummyValue.UUID], { day: 1, month: 1 }] }) getByDayOfYear(ownerIds: string[], { day, month }: MonthDay): Promise { return this.repository @@ -716,39 +719,47 @@ export class AssetRepository implements IAssetRepository { await this.fileRepository.upsert(files, { conflictPaths: ['assetId', 'type'] }); } - updateOffline(pagination: PaginationOptions, library: LibraryEntity): Paginated { - return this.dataSource.manager.transaction(async (transactionalEntityManager) => - transactionalEntityManager.query( - ` - WITH updated_rows AS ( - UPDATE assets - SET "isOffline" = $1, "deletedAt" = $2 - WHERE "isOffline" = $3 - AND ( - "originalPath" NOT SIMILAR TO $4 - OR "originalPath" SIMILAR TO $5 - ) - RETURNING id - ) - SELECT * - FROM assets - WHERE id NOT IN (SELECT id FROM updated_rows) - AND "libraryId" = $6 - AND ($7 OR "deletedAt" IS NULL) - LIMIT $8 OFFSET $9; - `, - [ - true, // $1 - is_offline = true - new Date(), // $2 - deleted_at = current timestamp - false, // $3 - is_offline = false - library.importPaths.map((importPath) => `${importPath}%`).join('|'), // $4 - importPartMatcher pattern - library.exclusionPatterns.map(globToSqlPattern).join('|'), // $5 - exclusionPatternMatcher pattern - library.id, // $6 - libraryId matches job.id - true, // $7 - withDeleted flag - pagination.take, // $8 - LIMIT - pagination.skip, // $9 - OFFSET - ], - ), + updateOffline(library: LibraryEntity): Promise { + const paths = library.importPaths.map((importPath) => `${importPath}%`).join('|'); + const exclusions = library.exclusionPatterns.map((pattern) => globToSqlPattern(pattern)).join('|'); + return this.repository + .createQueryBuilder() + .update() + .set({ + isOffline: true, + deletedAt: new Date(), + }) + .where({ isOffline: false }) + .andWhere({ libraryId: library.id }) + .andWhere( + new Brackets((qb) => { + qb.where('originalPath NOT SIMILAR TO :paths', { + paths, + }).orWhere('originalPath SIMILAR TO :exclusions', { + exclusions, + }); + }), + ) + .execute(); + } + + async getNewPaths(libraryId: string, paths: string[]): Promise { + const rawSql = ` + WITH unnested_paths AS ( + SELECT unnest($1::text[]) AS path + ) + SELECT unnested_paths.path AS path + FROM unnested_paths + WHERE not exists( + SELECT 1 + FROM assets + WHERE "originalPath" = unnested_paths.path AND + "libraryId" = $2 ); + `; + + return this.repository + .query(rawSql, [paths, libraryId]) + .then((result) => result.map((row: { path: string }) => row.path)); } } diff --git a/server/src/services/library.service.ts b/server/src/services/library.service.ts index febcf2418643a4..35895c192ff0b3 100644 --- a/server/src/services/library.service.ts +++ b/server/src/services/library.service.ts @@ -2,7 +2,6 @@ import { BadRequestException, Injectable } from '@nestjs/common'; import { R_OK } from 'node:constants'; import path, { basename, isAbsolute, parse } from 'node:path'; import picomatch from 'picomatch'; -import parseLib from 'picomatch/lib/parse'; import { StorageCore } from 'src/cores/storage.core'; import { OnEvent, OnJob } from 'src/decorators'; import { @@ -17,10 +16,11 @@ import { } from 'src/dtos/library.dto'; import { AssetEntity } from 'src/entities/asset.entity'; import { LibraryEntity } from 'src/entities/library.entity'; -import { AssetType, ImmichWorker } from 'src/enum'; +import { AssetStatus, AssetType, ImmichWorker } from 'src/enum'; import { DatabaseLock } from 'src/interfaces/database.interface'; import { ArgOf } from 'src/interfaces/event.interface'; import { JobName, JobOf, JOBS_LIBRARY_PAGINATION_SIZE, JobStatus, QueueName } from 'src/interfaces/job.interface'; +import { AssetSyncResult } from 'src/interfaces/library.interface'; import { BaseService } from 'src/services/base.service'; import { mimeTypes } from 'src/utils/mime-types'; import { handlePromiseError } from 'src/utils/misc'; @@ -99,6 +99,15 @@ export class LibraryService extends BaseService { let _resolve: () => void; const ready$ = new Promise((resolve) => (_resolve = resolve)); + const handler = async (event: string, path: string) => { + if (matcher(path)) { + this.logger.debug(`File ${event} event received for ${path} in library ${library.id}}`); + await this.syncFiles(library, [path]); + } else { + this.logger.verbose(`Ignoring file ${event} event for ${path} in library ${library.id}`); + } + }; + this.watchers[id] = this.storageRepository.watch( library.importPaths, { @@ -108,58 +117,13 @@ export class LibraryService extends BaseService { { onReady: () => _resolve(), onAdd: (path) => { - const handler = async () => { - this.logger.debug(`File add event received for ${path} in library ${library.id}}`); - if (matcher(path)) { - const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path); - if (asset) { - await this.jobRepository.queue({ - name: JobName.LIBRARY_SYNC_ASSETS, - data: { - ids: [asset.id], - }, - }); - } - if (matcher(path)) { - await this.syncFiles(library, [path]); - } - } - }; - return handlePromiseError(handler(), this.logger); + return handlePromiseError(handler('add', path), this.logger); }, onChange: (path) => { - const handler = async () => { - this.logger.debug(`Detected file change for ${path} in library ${library.id}`); - const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path); - if (asset) { - await this.jobRepository.queue({ - name: JobName.LIBRARY_SYNC_ASSETS, - data: { - ids: [asset.id], - }, - }); - } - if (matcher(path)) { - // Note: if the changed file was not previously imported, it will be imported now. - await this.syncFiles(library, [path]); - } - }; - return handlePromiseError(handler(), this.logger); + return handlePromiseError(handler('change', path), this.logger); }, onUnlink: (path) => { - const handler = async () => { - this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`); - const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path); - if (asset) { - await this.jobRepository.queue({ - name: JobName.LIBRARY_SYNC_ASSETS, - data: { - ids: [asset.id], - }, - }); - } - }; - return handlePromiseError(handler(), this.logger); + return handlePromiseError(handler('delete', path), this.logger); }, onError: (error) => { this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`); @@ -244,17 +208,15 @@ export class LibraryService extends BaseService { return mapLibrary(library); } + @OnJob({ name: JobName.LIBRARY_SYNC_FILES, queue: QueueName.LIBRARY }) private async syncFiles({ id, ownerId }: LibraryEntity, assetPaths: string[]) { - await this.jobRepository.queueAll( - assetPaths.map((assetPath) => ({ - name: JobName.LIBRARY_SYNC_FILE, - data: { - id, - assetPath, - ownerId, - }, - })), - ); + const assets = assetPaths.map((assetPath) => this.prepareAsset(assetPath, ownerId, id)); + + const assetIds = await this.assetRepository.createAll(assets).then((assets) => assets.map((asset) => asset.id)); + + await this.queuePostSyncJobs(assetIds); + + return JobStatus.SUCCESS; } private async validateImportPath(importPath: string): Promise { @@ -368,94 +330,62 @@ export class LibraryService extends BaseService { return JobStatus.SUCCESS; } - @OnJob({ name: JobName.LIBRARY_SYNC_FILE, queue: QueueName.LIBRARY }) - async handleSyncFile(job: JobOf): Promise { - // Only needs to handle new assets - const assetPath = path.normalize(job.assetPath); - - let asset = await this.assetRepository.getByLibraryIdAndOriginalPath(job.id, assetPath); - if (asset) { - return JobStatus.SKIPPED; - } - - let stat; - try { - stat = await this.storageRepository.stat(assetPath); - } catch (error: any) { - if (error.code === 'ENOENT') { - this.logger.error(`File not found: ${assetPath}`); - return JobStatus.SKIPPED; - } - this.logger.error(`Error reading file: ${assetPath}. Error: ${error}`); - return JobStatus.FAILED; - } - - this.logger.log(`Importing new library asset: ${assetPath}`); - - const library = await this.libraryRepository.get(job.id, true); - if (!library || library.deletedAt) { - this.logger.error('Cannot import asset into deleted library'); - return JobStatus.FAILED; - } - - // TODO: device asset id is deprecated, remove it - const deviceAssetId = `${basename(assetPath)}`.replaceAll(/\s+/g, ''); - - const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`); + private prepareAsset(assetPath: string, ownerId: string, libraryId: string) { + const normalizedPath = path.normalize(assetPath); - // TODO: doesn't xmp replace the file extension? Will need investigation - let sidecarPath: string | null = null; - if (await this.storageRepository.checkFileExists(`${assetPath}.xmp`, R_OK)) { - sidecarPath = `${assetPath}.xmp`; - } - - const assetType = mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE; - - const mtime = stat.mtime; + const now = new Date(); - asset = await this.assetRepository.create({ - ownerId: job.ownerId, - libraryId: job.id, - checksum: pathHash, - originalPath: assetPath, - deviceAssetId, + return { + ownerId: ownerId, + libraryId: libraryId, + checksum: this.cryptoRepository.hashSha1(`path:${assetPath}`), + originalPath: normalizedPath, + // TODO: device asset id is deprecated, remove it + deviceAssetId: `${basename(normalizedPath)}`.replaceAll(/\s+/g, ''), deviceId: 'Library Import', - fileCreatedAt: mtime, - fileModifiedAt: mtime, - localDateTime: mtime, - type: assetType, - originalFileName: parse(assetPath).base, - - sidecarPath, + fileCreatedAt: now, + fileModifiedAt: now, + localDateTime: now, + type: mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE, + originalFileName: parse(normalizedPath).base, isExternal: true, - }); - - await this.queuePostSyncJobs(asset); - - return JobStatus.SUCCESS; + }; } - async queuePostSyncJobs(asset: AssetEntity) { - this.logger.debug(`Queueing metadata extraction for: ${asset.originalPath}`); + async queuePostSyncJobs(assetIds: string[]) { + await this.jobRepository.queueAll( + assetIds.map((assetId) => ({ + name: JobName.METADATA_EXTRACTION, + data: { id: assetId, source: 'upload' }, + })), + ); - await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id, source: 'upload' } }); + await this.jobRepository.queueAll( + assetIds.map((assetId) => ({ + name: JobName.SIDECAR_DISCOVERY, + data: { id: assetId, source: 'upload' }, + })), + ); } async queueScan(id: string) { await this.findOrFail(id); + this.logger.log(`Starting to scan library ${id}`); + await this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_FILES, data: { id, }, }); + await this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ASSETS, data: { id } }); } @OnJob({ name: JobName.LIBRARY_QUEUE_SYNC_ALL, queue: QueueName.LIBRARY }) async handleQueueSyncAll(): Promise { - this.logger.debug(`Refreshing all external libraries`); + this.logger.log(`Initiating scan of all external libraries`); await this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_CLEANUP, data: {} }); @@ -481,52 +411,110 @@ export class LibraryService extends BaseService { @OnJob({ name: JobName.LIBRARY_SYNC_ASSETS, queue: QueueName.LIBRARY }) async handleSyncAssets(job: JobOf): Promise { - for (const id of job.ids) { - await this.handleSyncAsset(id); + const assets = await this.assetRepository.getByIds(job.assetIds); + + const assetIdsToOffline: string[] = []; + const assetIdsToUpdate: string[] = []; + + for (const asset of assets) { + const action = await this.handleSyncAsset(asset); + switch (action) { + case AssetSyncResult.OFFLINE: + assetIdsToOffline.push(asset.id); + break; + case AssetSyncResult.UPDATE: + assetIdsToUpdate.push(asset.id); + break; + } + } + + if (assetIdsToOffline.length) { + await this.assetRepository.updateAll(assetIdsToOffline, { isOffline: true, deletedAt: new Date() }); + this.logger.log( + `Originals are missing for ${assetIdsToOffline.length} asset(s) in library ${job.libraryId}, marked offline`, + ); + } + + if (assetIdsToUpdate.length) { + //TODO: When we have asset status, we need to leave deletedAt as is when status is trashed + await this.assetRepository.updateAll(assetIdsToUpdate, { + isOffline: false, + deletedAt: null, + }); + + this.logger.log( + `Found ${assetIdsToOffline.length} asset(s) with modified files for library ${job.libraryId}, queuing refresh...`, + ); + + await this.queuePostSyncJobs(assetIdsToUpdate); + } + + const remainingCount = assets.length - assetIdsToOffline.length - assetIdsToUpdate.length; + if (remainingCount > 0) { + this.logger.log(`${remainingCount} asset(s) are unchanged in library ${job.libraryId}, no action required`); } return JobStatus.SUCCESS; } - private async handleSyncAsset(id: string): Promise { - const asset = await this.assetRepository.getById(id); + private async checkOfflineAsset(asset: AssetEntity) { + if (!asset.libraryId) { + return false; + } + + const library = await this.libraryRepository.get(asset.libraryId); + if (!library) { + return false; + } + + const isInImportPath = library.importPaths.find((path) => asset.originalPath.startsWith(path)); + if (!isInImportPath) { + return false; + } + + const isExcluded = library.exclusionPatterns.some((pattern) => picomatch.isMatch(asset.originalPath, pattern)); + if (isExcluded) { + return false; + } + + return true; + } + + private async handleSyncAsset(asset: AssetEntity): Promise { if (!asset) { - return JobStatus.SKIPPED; + return AssetSyncResult.DO_NOTHING; } let stat; try { stat = await this.storageRepository.stat(asset.originalPath); } catch { - await (async (explanation: string) => { - if (!asset.isOffline) { - this.logger.debug(`${explanation}, moving to trash: ${asset.originalPath}`); - await this.assetRepository.updateAll([asset.id], { isOffline: true, deletedAt: new Date() }); - } - })('Asset is no longer on disk or is inaccessible because of permissions'); - return JobStatus.SUCCESS; + if (asset.isOffline) { + return AssetSyncResult.DO_NOTHING; + } + + this.logger.debug( + `Asset is no longer on disk or is inaccessible because of permissions, moving to trash: ${asset.originalPath}`, + ); + return AssetSyncResult.OFFLINE; } const mtime = stat.mtime; const isAssetModified = mtime.toISOString() !== asset.fileModifiedAt.toISOString(); + let shouldAssetGoOnline = false; - if (asset.isOffline || isAssetModified) { - this.logger.debug(`Asset was offline or modified, updating asset record ${asset.originalPath}`); - //TODO: When we have asset status, we need to leave deletedAt as is when status is trashed - await this.assetRepository.updateAll([asset.id], { - isOffline: false, - deletedAt: null, - fileCreatedAt: mtime, - fileModifiedAt: mtime, - originalFileName: parse(asset.originalPath).base, - }); + if (asset.isOffline && asset.status != AssetStatus.DELETED) { + // Only perform the expensive check if the asset is offline + shouldAssetGoOnline = await this.checkOfflineAsset(asset); } - if (isAssetModified) { - this.logger.debug(`Asset was modified, queuing metadata extraction for: ${asset.originalPath}`); - await this.queuePostSyncJobs(asset); + if (shouldAssetGoOnline || isAssetModified) { + this.logger.debug(`Asset was offline or modified, updating asset record ${asset.originalPath}`); + + return AssetSyncResult.UPDATE; } - return JobStatus.SUCCESS; + + return AssetSyncResult.DO_NOTHING; } @OnJob({ name: JobName.LIBRARY_QUEUE_SYNC_FILES, queue: QueueName.LIBRARY }) @@ -537,7 +525,7 @@ export class LibraryService extends BaseService { return JobStatus.SKIPPED; } - this.logger.log(`Refreshing library ${library.id} for new assets`); + this.logger.log(`Crawling import paths for library ${library.id}...`); const validImportPaths: string[] = []; @@ -552,28 +540,53 @@ export class LibraryService extends BaseService { if (validImportPaths.length === 0) { this.logger.warn(`No valid import paths found for library ${library.id}`); + + return JobStatus.SKIPPED; } - const assetsOnDisk = this.storageRepository.walk({ + const pathsOnDisk = this.storageRepository.walk({ pathsToCrawl: validImportPaths, includeHidden: false, exclusionPatterns: library.exclusionPatterns, take: JOBS_LIBRARY_PAGINATION_SIZE, }); - let count = 0; + let importCount = 0; + let crawlCount = 0; - for await (const assetBatch of assetsOnDisk) { - count += assetBatch.length; - this.logger.debug(`Discovered ${count} asset(s) on disk for library ${library.id}...`); - await this.syncFiles(library, assetBatch); - this.logger.verbose(`Queued scan of ${assetBatch.length} crawled asset(s) in library ${library.id}...`); + this.logger.log(`Starting crawl of filesystem for ${library.id}...`); + + for await (const pathBatch of pathsOnDisk) { + crawlCount += pathBatch.length; + this.logger.log( + `Crawled ${pathBatch.length} file(s) for library ${library.id}, in total ${crawlCount} file(s) crawled so far`, + ); + const newPaths = await this.assetRepository.getNewPaths(library.id, pathBatch); + if (newPaths.length > 0) { + importCount += newPaths.length; + await this.syncFiles(library, newPaths); + if (newPaths.length < pathBatch.length) { + this.logger.debug( + `Current crawl batch: ${newPaths.length} of ${pathBatch.length} file(s) are new, queued import for library ${library.id}...`, + ); + } else { + this.logger.debug( + `Current crawl batch: ${newPaths.length} new file(s), queued import for library ${library.id}...`, + ); + } + } else { + this.logger.debug(`Current crawl batch: ${pathBatch.length} asset(s) already in library ${library.id}`); + } } - if (count > 0) { - this.logger.debug(`Finished queueing scan of ${count} assets on disk for library ${library.id}`); - } else if (validImportPaths.length > 0) { - this.logger.debug(`No non-excluded assets found in any import path for library ${library.id}`); + if (importCount > 0 && importCount === crawlCount) { + this.logger.log(`Finished crawling and queueing ${crawlCount} file(s) for import for library ${library.id}`); + } else if (importCount > 0) { + this.logger.log( + `Finished crawling ${crawlCount} file(s) of which ${importCount} are queued for import for library ${library.id}`, + ); + } else { + this.logger.debug(`Finished crawling, no files found for library ${library.id}`); } await this.libraryRepository.update({ id: job.id, refreshedAt: new Date() }); @@ -588,33 +601,58 @@ export class LibraryService extends BaseService { return JobStatus.SKIPPED; } - this.logger.log(`Checking assets in library ${library.id} against import path and exclusion patterns`); + const assetCount = (await this.getStatistics(library.id)).total; - const onlineAssets = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) => - this.assetRepository.updateOffline(pagination, library), + this.logger.log( + `Scanning library ${library.id} for assets outside of import paths and/or matching an exclusion pattern...`, ); + const offlineResult = await this.assetRepository.updateOffline(library); - this.logger.log(`Scanning library ${library.id} for removed assets`); + const affectedAssetCount = offlineResult.affected; + if (affectedAssetCount === undefined) { + this.logger.error(`Unknown error occurred when updating offline status in ${library.id}`); + return JobStatus.FAILED; + } - let assetCount = 0; - for await (const assets of onlineAssets) { - if (!assets) { - console.log('No assets found'); - } else { - console.log(assets[0]); - assetCount += assets.length; - this.logger.debug(`Discovered ${assetCount} asset(s) in library ${library.id}...`); + if (affectedAssetCount === assetCount) { + this.logger.log( + `All ${assetCount} asset(s) in ${library.id} are outside of import paths and/or match an exclusion pattern, marked as offline`, + ); + } else if (affectedAssetCount !== assetCount && affectedAssetCount > 0) { + this.logger.log( + `${offlineResult.affected} asset(s) out of ${assetCount} were marked offline due to import paths and/or exclusion patterns for library ${library.id}`, + ); + } else { + this.logger.log( + `All ${assetCount} asset(s) in library ${library.id} were in an import path and none matched an exclusion pattern`, + ); + } - for (const asset of assets) { - await this.handleSyncAsset(asset.id); - } + this.logger.log(`Scanning library ${library.id} for assets missing from disk...`); - this.logger.debug(`Checked ${assets.length} asset(s) in library ${library.id}...`); - } + const existingAssets = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) => + this.assetRepository.getAll(pagination, { libraryId: job.id, withDeleted: true }), + ); + + let currentAssetCount = 0; + for await (const assets of existingAssets) { + currentAssetCount += assets.length; + + await this.jobRepository.queue({ + name: JobName.LIBRARY_SYNC_ASSETS, + data: { + libraryId: library.id, + assetIds: assets.map((asset) => asset.id), + }, + }); + + this.logger.log( + `Queued check of ${assets.length} existing asset(s) in library ${library.id}, ${currentAssetCount} of ${assetCount} queued in total`, + ); } - if (assetCount) { - this.logger.log(`Finished check of ${assetCount} assets for library ${library.id}`); + if (currentAssetCount) { + this.logger.log(`Finished queuing ${currentAssetCount} file checks for library ${library.id}`); } return JobStatus.SUCCESS; diff --git a/server/src/utils/asset.util.ts b/server/src/utils/asset.util.ts index 02e1ced7bae7ee..f8bed5485f8b1e 100644 --- a/server/src/utils/asset.util.ts +++ b/server/src/utils/asset.util.ts @@ -1,5 +1,4 @@ import { BadRequestException } from '@nestjs/common'; -import picomatch from 'picomatch'; import { StorageCore } from 'src/cores/storage.core'; import { BulkIdErrorReason, BulkIdResponseDto } from 'src/dtos/asset-ids.response.dto'; import { UploadFieldName } from 'src/dtos/asset-media.dto'; diff --git a/server/src/utils/misc.spec.ts b/server/src/utils/misc.spec.ts index 8ede66df4059e1..87ab6d4399bbf5 100644 --- a/server/src/utils/misc.spec.ts +++ b/server/src/utils/misc.spec.ts @@ -59,7 +59,7 @@ describe('globToSqlPattern', () => { ['**/*.tif', '%/%.tif'], ['**/*.jp?', '%/%.jp_'], ['**/@eaDir/**', '%/@eaDir/%'], - ['**/._*', '%/.\\_%'], + ['**/._*', `%/.\\_%`], ]; it.each(testCases)('should convert %s to %s', (input, expected) => { diff --git a/server/src/utils/misc.ts b/server/src/utils/misc.ts index f2ce76b8760df7..3543cf20b02f5b 100644 --- a/server/src/utils/misc.ts +++ b/server/src/utils/misc.ts @@ -266,39 +266,49 @@ export const useSwagger = (app: INestApplication, { write }: { write: boolean }) } }; -export const globToSqlPattern = (glob: string) => { - const tokens = picomatch.parse(glob).tokens; +const convertTokenToSqlPattern = (token: any): string => { + if (typeof token === 'string') { + return token; + } - const convertTokenToSqlPattern = (token: any): string => { - if (typeof token === 'string') { - return token; + switch (token.type) { + case 'slash': { + return '/'; } - - switch (token.type) { - case 'slash': - return '/'; - case 'text': - return token.value; - case 'globstar': - case 'star': - return '%'; - case 'underscore': - return '\\_'; - case 'qmark': - return '_'; - case 'dot': - return '.'; - case 'bracket': - return `[${token.value}]`; - case 'negate': - return `[^${token.value}]`; - case 'brace': - const options = token.value.split(','); - return `(${options.join('|')})`; - default: - return ''; + case 'text': { + return token.value; } - }; + case 'globstar': + case 'star': { + return '%'; + } + case 'underscore': { + return String.raw`\_`; + } + case 'qmark': { + return '_'; + } + case 'dot': { + return '.'; + } + case 'bracket': { + return `[${token.value}]`; + } + case 'negate': { + return `[^${token.value}]`; + } + case 'brace': { + const options = token.value.split(','); + return `(${options.join('|')})`; + } + default: { + return ''; + } + } +}; + +export const globToSqlPattern = (glob: string) => { + const tokens = picomatch.parse(glob).tokens; let result = ''; for (const token of tokens) { diff --git a/server/test/repositories/asset.repository.mock.ts b/server/test/repositories/asset.repository.mock.ts index 928a7956c5f0c6..07020c48c2c7e2 100644 --- a/server/test/repositories/asset.repository.mock.ts +++ b/server/test/repositories/asset.repository.mock.ts @@ -38,5 +38,7 @@ export const newAssetRepositoryMock = (): Mocked => { getDuplicates: vitest.fn(), upsertFile: vitest.fn(), upsertFiles: vitest.fn(), + updateOffline: vitest.fn(), + getNewPaths: vitest.fn(), }; };