Skip to content

Commit

Permalink
refactor(server): events
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasm91 committed Sep 27, 2024
1 parent 8bbcd5c commit 850228e
Show file tree
Hide file tree
Showing 28 changed files with 255 additions and 256 deletions.
3 changes: 1 addition & 2 deletions e2e/src/api/specs/asset.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ describe('/asset', () => {
let user2Assets: AssetMediaResponseDto[];
let locationAsset: AssetMediaResponseDto;
let ratingAsset: AssetMediaResponseDto;
let facesAsset: AssetMediaResponseDto;

const setupTests = async () => {
await utils.resetDatabase();
Expand Down Expand Up @@ -236,7 +235,7 @@ describe('/asset', () => {
await updateConfig({ systemConfigDto: config }, { headers: asBearerAuth(admin.accessToken) });

// asset faces
facesAsset = await utils.createAsset(admin.accessToken, {
const facesAsset = await utils.createAsset(admin.accessToken, {
assetData: {
filename: 'portrait.jpg',
bytes: await readFile(facesAssetFilepath),
Expand Down
15 changes: 0 additions & 15 deletions server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { BullModule } from '@nestjs/bullmq';
import { Inject, Module, OnModuleDestroy, OnModuleInit, ValidationPipe } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE, ModuleRef } from '@nestjs/core';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule';
import { TypeOrmModule } from '@nestjs/typeorm';
import _ from 'lodash';
Expand Down Expand Up @@ -42,7 +41,6 @@ const imports = [
BullModule.registerQueue(...bullQueues),
ClsModule.forRoot(clsConfig),
ConfigModule.forRoot(immichAppConfig),
EventEmitterModule.forRoot(),
OpenTelemetryModule.forRoot(otelConfig),
TypeOrmModule.forRootAsync({
inject: [ModuleRef],
Expand Down Expand Up @@ -114,16 +112,3 @@ export class MicroservicesModule implements OnModuleInit, OnModuleDestroy {
providers: [...common, ...commands, SchedulerRegistry],
})
export class ImmichAdminModule {}

@Module({
imports: [
ConfigModule.forRoot(immichAppConfig),
EventEmitterModule.forRoot(),
TypeOrmModule.forRoot(databaseConfig),
TypeOrmModule.forFeature(entities),
OpenTelemetryModule.forRoot(otelConfig),
],
controllers: [...controllers],
providers: [...common, ...middleware, SchedulerRegistry],
})
export class AppTestModule {}
2 changes: 0 additions & 2 deletions server/src/bin/sync-sql.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env node
import { INestApplication } from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { SchedulerRegistry } from '@nestjs/schedule';
import { Test } from '@nestjs/testing';
import { TypeOrmModule } from '@nestjs/typeorm';
Expand Down Expand Up @@ -85,7 +84,6 @@ class SqlGenerator {
logger: this.sqlLogger,
}),
TypeOrmModule.forFeature(entities),
EventEmitterModule.forRoot(),
OpenTelemetryModule.forRoot(otelConfig),
],
providers: [...repositories, AuthService, SchedulerRegistry],
Expand Down
17 changes: 6 additions & 11 deletions server/src/cores/system-config.core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { plainToInstance } from 'class-transformer';
import { validate } from 'class-validator';
import { load as loadYaml } from 'js-yaml';
import * as _ from 'lodash';
import { Subject } from 'rxjs';
import { SystemConfig, defaults } from 'src/config';
import { SystemConfigDto } from 'src/dtos/system-config.dto';
import { SystemMetadataKey } from 'src/enum';
Expand All @@ -24,8 +23,6 @@ export class SystemConfigCore {
private config: SystemConfig | null = null;
private lastUpdated: number | null = null;

config$ = new Subject<SystemConfig>();

private constructor(
private repository: ISystemMetadataRepository,
private logger: ILoggerRepository,
Expand All @@ -42,6 +39,11 @@ export class SystemConfigCore {
instance = null;
}

invalidateCache() {
this.config = null;
this.lastUpdated = null;
}

async getConfig({ withCache }: { withCache: boolean }): Promise<SystemConfig> {
if (!withCache || !this.config) {
const lastUpdated = this.lastUpdated;
Expand Down Expand Up @@ -74,14 +76,7 @@ export class SystemConfigCore {

await this.repository.set(SystemMetadataKey.SYSTEM_CONFIG, partialConfig);

const config = await this.getConfig({ withCache: false });
this.config$.next(config);
return config;
}

async refreshConfig() {
const newConfig = await this.getConfig({ withCache: false });
this.config$.next(newConfig);
return this.getConfig({ withCache: false });
}

isUsingConfigFile() {
Expand Down
15 changes: 6 additions & 9 deletions server/src/decorators.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { SetMetadata, applyDecorators } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OnEventOptions } from '@nestjs/event-emitter/dist/interfaces';
import { ApiExtension, ApiOperation, ApiProperty, ApiTags } from '@nestjs/swagger';
import _ from 'lodash';
import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants';
import { MetadataKey } from 'src/enum';
import { EmitEvent, ServerEvent } from 'src/interfaces/event.interface';
import { EmitEvent } from 'src/interfaces/event.interface';
import { setUnion } from 'src/utils/set';

// PostgreSQL uses a 16-bit integer to indicate the number of bound parameters. This means that the
Expand Down Expand Up @@ -133,15 +131,14 @@ export interface GenerateSqlQueries {
/** Decorator to enable versioning/tracking of generated Sql */
export const GenerateSql = (...options: GenerateSqlQueries[]) => SetMetadata(GENERATE_SQL_KEY, options);

export const OnServerEvent = (event: ServerEvent, options?: OnEventOptions) =>
OnEvent(event, { suppressErrors: false, ...options });

export type EmitConfig = {
event: EmitEvent;
export type EventConfig = {
name: EmitEvent;
/** handle socket.io server events as well */
server?: boolean;
/** lower value has higher priority, defaults to 0 */
priority?: number;
};
export const OnEmit = (config: EmitConfig) => SetMetadata(MetadataKey.ON_EMIT_CONFIG, config);
export const OnEvent = (config: EventConfig) => SetMetadata(MetadataKey.EVENT_CONFIG, config);

type LifecycleRelease = 'NEXT_RELEASE' | string;
type LifecycleMetadata = {
Expand Down
2 changes: 1 addition & 1 deletion server/src/enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ export enum MetadataKey {
ADMIN_ROUTE = 'admin_route',
SHARED_ROUTE = 'shared_route',
API_KEY_SECURITY = 'api_key',
ON_EMIT_CONFIG = 'on_emit_config',
EVENT_CONFIG = 'event_config',
}

export enum RouteKey {
Expand Down
43 changes: 25 additions & 18 deletions server/src/interfaces/event.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server.d

export const IEventRepository = 'IEventRepository';

type EmitEventMap = {
type EventMap = {
// app events
'app.bootstrap': ['api' | 'microservices'];
'app.shutdown': [];

// config events
'config.update': [{ newConfig: SystemConfig; oldConfig: SystemConfig }];
'config.update': [
{
newConfig: SystemConfig;
/** When the server starts, `oldConfig` is `undefined` */
oldConfig?: SystemConfig;
},
];
'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }];

// album events
Expand Down Expand Up @@ -43,12 +49,17 @@ type EmitEventMap = {

// user events
'user.signup': [{ notify: boolean; id: string; tempPassword?: string }];

// websocket events
'websocket.connect': [{ userId: string }];
};

export type EmitEvent = keyof EmitEventMap;
export type ServerEvents = 'config.update';

export type EmitEvent = keyof EventMap;
export type EmitHandler<T extends EmitEvent> = (...args: ArgsOf<T>) => Promise<void> | void;
export type ArgOf<T extends EmitEvent> = EmitEventMap[T][0];
export type ArgsOf<T extends EmitEvent> = EmitEventMap[T];
export type ArgOf<T extends EmitEvent> = EventMap[T][0];
export type ArgsOf<T extends EmitEvent> = EventMap[T];

export enum ClientEvent {
UPLOAD_SUCCESS = 'on_upload_success',
Expand Down Expand Up @@ -82,19 +93,15 @@ export interface ClientEventMap {
[ClientEvent.SESSION_DELETE]: string;
}

export enum ServerEvent {
CONFIG_UPDATE = 'config.update',
WEBSOCKET_CONNECT = 'websocket.connect',
}

export interface ServerEventMap {
[ServerEvent.CONFIG_UPDATE]: null;
[ServerEvent.WEBSOCKET_CONNECT]: { userId: string };
}
export type EventItem<T extends EmitEvent> = {
event: T;
handler: EmitHandler<T>;
server: boolean;
};

export interface IEventRepository {
on<T extends keyof EmitEventMap>(event: T, handler: EmitHandler<T>): void;
emit<T extends keyof EmitEventMap>(event: T, ...args: ArgsOf<T>): Promise<void>;
on<T extends keyof EventMap>(item: EventItem<T>): void;
emit<T extends keyof EventMap>(event: T, ...args: ArgsOf<T>): Promise<void>;

/**
* Send to connected clients for a specific user
Expand All @@ -105,7 +112,7 @@ export interface IEventRepository {
*/
clientBroadcast<E extends keyof ClientEventMap>(event: E, data: ClientEventMap[E]): void;
/**
* Notify listeners in this and connected processes. Subscribe to an event with `@OnServerEvent`
* Send to all connected servers
*/
serverSend<E extends keyof ServerEventMap>(event: E, data: ServerEventMap[E]): boolean;
serverSend<T extends ServerEvents>(event: T, ...args: ArgsOf<T>): void;
}
48 changes: 26 additions & 22 deletions server/src/repositories/event.repository.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Inject, Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { EventEmitter2 } from '@nestjs/event-emitter';
import {
OnGatewayConnection,
OnGatewayDisconnect,
Expand All @@ -13,16 +12,16 @@ import {
ArgsOf,
ClientEventMap,
EmitEvent,
EmitHandler,
EventItem,
IEventRepository,
ServerEvent,
ServerEventMap,
ServerEvents,
} from 'src/interfaces/event.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { AuthService } from 'src/services/auth.service';
import { Instrumentation } from 'src/utils/instrumentation';
import { handlePromiseError } from 'src/utils/misc';

type EmitHandlers = Partial<{ [T in EmitEvent]: EmitHandler<T>[] }>;
type EmitHandlers = Partial<{ [T in EmitEvent]: Array<EventItem<T>> }>;

@Instrumentation()
@WebSocketGateway({
Expand All @@ -39,7 +38,6 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect

constructor(
private moduleRef: ModuleRef,
private eventEmitter: EventEmitter2,
@Inject(ILoggerRepository) private logger: ILoggerRepository,
) {
this.logger.setContext(EventRepository.name);
Expand All @@ -48,14 +46,10 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
afterInit(server: Server) {
this.logger.log('Initialized websocket server');

for (const event of Object.values(ServerEvent)) {
if (event === ServerEvent.WEBSOCKET_CONNECT) {
continue;
}

server.on(event, (data: unknown) => {
for (const event of ['config.update'] as const) {
server.on(event, (...args: ArgsOf<any>) => {
this.logger.debug(`Server event: ${event} (receive)`);
this.eventEmitter.emit(event, data);
handlePromiseError(this.onEvent({ name: event, args, server: true }), this.logger);
});
}
}
Expand All @@ -72,7 +66,7 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
if (auth.session) {
await client.join(auth.session.id);
}
this.serverSend(ServerEvent.WEBSOCKET_CONNECT, { userId: auth.user.id });
await this.onEvent({ name: 'websocket.connect', args: [{ userId: auth.user.id }], server: false });
} catch (error: Error | any) {
this.logger.error(`Websocket connection error: ${error}`, error?.stack);
client.emit('error', 'unauthorized');
Expand All @@ -85,18 +79,29 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
await client.leave(client.nsp.name);
}

on<T extends EmitEvent>(event: T, handler: EmitHandler<T>): void {
on<T extends EmitEvent>(item: EventItem<T>): void {
const event = item.event;

if (!this.emitHandlers[event]) {
this.emitHandlers[event] = [];
}

this.emitHandlers[event].push(handler);
this.emitHandlers[event].push(item);
}

async emit<T extends EmitEvent>(event: T, ...args: ArgsOf<T>): Promise<void> {
const handlers = this.emitHandlers[event] || [];
for (const handler of handlers) {
await handler(...args);
return this.onEvent({ name: event, args, server: false });
}

private async onEvent<T extends EmitEvent>(event: { name: T; args: ArgsOf<T>; server: boolean }): Promise<void> {
const handlers = this.emitHandlers[event.name] || [];
for (const { handler, server } of handlers) {
// exclude handlers that ignore server events
if (!server && event.server) {
continue;
}

await handler(...event.args);
}
}

Expand All @@ -108,9 +113,8 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
this.server?.emit(event, data);
}

serverSend<E extends keyof ServerEventMap>(event: E, data: ServerEventMap[E]) {
serverSend<T extends ServerEvents>(event: T, ...args: ArgsOf<T>): void {
this.logger.debug(`Server event: ${event} (send)`);
this.server?.serverSideEmit(event, data);
return this.eventEmitter.emit(event, data);
this.server?.serverSideEmit(event, ...args);
}
}
4 changes: 2 additions & 2 deletions server/src/services/database.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Inject, Injectable } from '@nestjs/common';
import { Duration } from 'luxon';
import semver from 'semver';
import { OnEmit } from 'src/decorators';
import { OnEvent } from 'src/decorators';
import { IConfigRepository } from 'src/interfaces/config.interface';
import {
DatabaseExtension,
Expand Down Expand Up @@ -74,7 +74,7 @@ export class DatabaseService {
this.logger.setContext(DatabaseService.name);
}

@OnEmit({ event: 'app.bootstrap', priority: -200 })
@OnEvent({ name: 'app.bootstrap', priority: -200 })
async onBootstrap() {
const version = await this.databaseRepository.getPostgresVersion();
const current = semver.coerce(version);
Expand Down
Loading

0 comments on commit 850228e

Please sign in to comment.