Skip to content

Commit

Permalink
fix: fix problem close connection amqp with connection listener
Browse files Browse the repository at this point in the history
  • Loading branch information
meysamhadeli committed Dec 10, 2023
1 parent df8f0d2 commit 9ddc950
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 16 deletions.
4 changes: 2 additions & 2 deletions src/building-blocks/rabbitmq/rabbitmq-connection.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ export interface RabbitmqOptions {
password: string;
exchange: string;
}
export interface IRabbitMQConnection {
export interface IRabbitmqConnection {
getChannel(): Promise<amqp.Channel>;
closeChanel(): Promise<void>;
closeConnection(): Promise<void>;
}
export declare class RabbitmqConnection implements OnModuleInit, IRabbitMQConnection {
export declare class RabbitmqConnection implements OnModuleInit, IRabbitmqConnection {
private connection;
private channel;
onModuleInit(): Promise<void>;
Expand Down
11 changes: 11 additions & 0 deletions src/building-blocks/rabbitmq/rabbitmq-connection.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/building-blocks/rabbitmq/rabbitmq-connection.js.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions src/building-blocks/rabbitmq/rabbitmq-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface RabbitmqOptions {
exchange: string;
}

export interface IRabbitMQConnection {
export interface IRabbitmqConnection {
getChannel(): Promise<amqp.Channel>;

closeChanel(): Promise<void>;
Expand All @@ -20,7 +20,7 @@ export interface IRabbitMQConnection {
}

@Injectable()
export class RabbitmqConnection implements OnModuleInit, IRabbitMQConnection {
export class RabbitmqConnection implements OnModuleInit, IRabbitmqConnection {
private connection: amqp.Connection = null;
private channel: amqp.Channel = null;

Expand Down Expand Up @@ -48,6 +48,13 @@ export class RabbitmqConnection implements OnModuleInit, IRabbitMQConnection {
}
);
}

this.channel.on("error", async (error): Promise<void> => {
Logger.error(`Error occurred on channel: ${error}`);
await this.closeChanel();
await this.getChannel();
});

return this.channel;
} catch (error) {
Logger.error('Failed to get channel!');
Expand All @@ -68,6 +75,7 @@ export class RabbitmqConnection implements OnModuleInit, IRabbitMQConnection {
async closeConnection(): Promise<void> {
try {
if (this.connection) {
await this.closeChanel();
await this.connection.close();
Logger.log('Connection closed successfully');
}
Expand All @@ -76,7 +84,9 @@ export class RabbitmqConnection implements OnModuleInit, IRabbitMQConnection {
}
}


private async initializeConnection(): Promise<void> {

try {
if (!this.connection || this.connection == undefined) {
await asyncRetry(
Expand All @@ -98,6 +108,12 @@ export class RabbitmqConnection implements OnModuleInit, IRabbitMQConnection {
maxTimeout: configs.retry.maxTimeout
}
);

this.connection.on("error", async (error): Promise<void> => {
Logger.error(`Error occurred on connection: ${error}`);
await this.closeConnection();
await this.initializeConnection();
});
}
} catch (error) {
throw new Error('Rabbitmq connection failed!');
Expand Down
2 changes: 1 addition & 1 deletion src/building-blocks/rabbitmq/rabbitmq-subscriber.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ type handlerFunc<T> = (queue: string, message: T) => void;
export interface IRabbitmqConsumer {
isConsumed<T>(message: T): Promise<boolean>;
}
export declare class RabbitmqSubscriber<T> implements OnModuleInit, IRabbitmqConsumer {
export declare class RabbitmqConsumer<T> implements OnModuleInit, IRabbitmqConsumer {
private readonly rabbitMQConnection;
private readonly openTelemetryTracer;
private readonly type;
Expand Down
10 changes: 5 additions & 5 deletions src/building-blocks/rabbitmq/rabbitmq-subscriber.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/building-blocks/rabbitmq/rabbitmq-subscriber.js.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/building-blocks/rabbitmq/rabbitmq-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface IRabbitmqConsumer {
}

@Injectable()
export class RabbitmqSubscriber<T> implements OnModuleInit, IRabbitmqConsumer {
export class RabbitmqConsumer<T> implements OnModuleInit, IRabbitmqConsumer {
constructor(
private readonly rabbitMQConnection: RabbitmqConnection,
private readonly openTelemetryTracer: OpenTelemetryTracer,
Expand Down
2 changes: 1 addition & 1 deletion src/building-blocks/rabbitmq/rabbitmq.module.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/building-blocks/rabbitmq/rabbitmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { OpenTelemetryModule } from '../openTelemetry/open-telemetry.module';
RabbitmqPublisher,
{
provide: 'IRabbitmqConnection',
useClass: RabbitmqConnection
useClass: RabbitmqConnection,
},
{
provide: 'IRabbitmqPublisher',
Expand Down
2 changes: 1 addition & 1 deletion src/building-blocks/tsconfig.tsbuildinfo

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/identity/test/shared/fixtures/integration-test.fixture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import {AuthModule} from "../../../src/auth/auth.module";
import {RouterModule} from "@nestjs/core";
import {JwtStrategy} from "building-blocks/passport/jwt.strategy";
import {JwtGuard} from "building-blocks/passport/jwt.guard";
import {IRabbitmqConnection} from "building-blocks/rabbitmq/rabbitmq-connection";

export class Fixture {
userRepository: IUserRepository;
authRepository: IAuthRepository;
postgresContainer: StartedTestContainer;
rabbitmqContainer: StartedTestContainer;
rabbitmqConsumer: IRabbitmqConsumer;
rabbitmqConnection: IRabbitmqConnection;
rabbitmqPublisher: IRabbitmqPublisher;
commandBus: CommandBus;
queryBus: QueryBus;
Expand Down Expand Up @@ -89,6 +91,7 @@ export class IntegrationTestFixture {
this.fixture.authRepository = module.get<IAuthRepository>('IAuthRepository');

this.fixture.rabbitmqPublisher = module.get<IRabbitmqPublisher>('IRabbitmqPublisher');
this.fixture.rabbitmqConnection = module.get<IRabbitmqConnection>('IRabbitmqConnection');

this.fixture.commandBus = module.get<CommandBus>(CommandBus);
this.fixture.queryBus = module.get<QueryBus>(QueryBus);
Expand All @@ -97,6 +100,7 @@ export class IntegrationTestFixture {
}

public async cleanUp() {

await this.fixture.rabbitmqContainer.stop();
await this.fixture.postgresContainer.stop();

Expand Down

0 comments on commit 9ddc950

Please sign in to comment.