Skip to content

Commit ca0e490

Browse files
fix: improve channels
1 parent 057f2f3 commit ca0e490

File tree

4 files changed

+24
-33
lines changed

4 files changed

+24
-33
lines changed

src/channel/amqp.channel.ts

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import * as amqp from 'amqplib';
44

55
export class AmqpChannel extends Channel<ExtensionAmqpChannelConfig> {
66
public connection?: any;
7-
public channel?: any;
87
public readonly config: ExtensionAmqpChannelConfig;
98

109
constructor(config: ExtensionAmqpChannelConfig) {
@@ -13,26 +12,16 @@ export class AmqpChannel extends Channel<ExtensionAmqpChannelConfig> {
1312
}
1413

1514
async init(): Promise<void> {
16-
if (this.connection && this.channel) {
15+
if (this.connection) {
1716
return Promise.resolve();
1817
}
1918

2019
this.connection = undefined;
21-
this.channel = undefined;
2220

2321
this.connection = await amqp.connect(this.config.connectionUri);
24-
this.channel = await this.connection.createChannel();
25-
26-
if (this.config.queue) {
27-
await this.channel.assertQueue(this.config.queue, { durable: true });
28-
}
2922
}
3023

3124
async onChannelDestroy(): Promise<void> {
32-
if (this.channel) {
33-
await this.channel.close();
34-
this.channel = undefined;
35-
}
3625
if (this.connection) {
3726
await this.connection.close();
3827
this.connection = undefined;

src/consumer/rabbitmq-messaging.consumer.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export class RabbitmqMessagingConsumer
1515
implements IMessagingConsumer<AmqpChannel>, OnModuleDestroy
1616
{
1717
private channel?: AmqpChannel = undefined;
18+
private amqpChannel: any;
1819

1920
constructor(private readonly rabbitMqMigrator: RabbitmqMigrator) {}
2021

@@ -26,7 +27,8 @@ export class RabbitmqMessagingConsumer
2627
this.channel = channel;
2728
await this.rabbitMqMigrator.run(channel);
2829

29-
const amqpChannel = channel.channel;
30+
const amqpChannel = await this.channel.connection.createChannel();
31+
this.amqpChannel = amqpChannel;
3032

3133
if (!amqpChannel) {
3234
throw new Error('AMQP channel not initialized');
@@ -63,15 +65,11 @@ export class RabbitmqMessagingConsumer
6365
errored: ConsumerDispatchedMessageError,
6466
channel: AmqpChannel,
6567
): Promise<void> {
66-
if (channel.config.deadLetterQueueFeature && channel.channel) {
68+
if (channel.config.deadLetterQueueFeature && this.amqpChannel) {
6769
const exchange = 'dead_letter.exchange';
6870
const routingKey = `${channel.config.queue}_dead_letter`;
6971

70-
await channel.channel.assertExchange(exchange, 'direct', {
71-
durable: true,
72-
});
73-
74-
channel.channel.publish(
72+
this.amqpChannel.publish(
7573
exchange,
7674
routingKey,
7775
Buffer.from(JSON.stringify(errored.dispatchedConsumerMessage.message)),

src/message-bus/amqp-message.bus.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@ import { ExchangeType } from '../channel/rmq-channel.config';
99

1010
@Injectable()
1111
export class AmqpMessageBus implements IMessageBus {
12+
public publisherChannel?: any;
13+
1214
constructor(private readonly amqpChannel: AmqpChannel) {}
1315

1416
async dispatch(message: RoutingMessage): Promise<object | void> {
1517
await this.amqpChannel.init();
18+
await this.initPublisherChannel();
19+
1620
if (
1721
message.messageOptions !== undefined &&
1822
!(message.messageOptions instanceof AmqpMessageOptions)
@@ -34,11 +38,7 @@ export class AmqpMessageBus implements IMessageBus {
3438

3539
const amqpMessage = messageBuilder.buildMessage();
3640

37-
if (!this.amqpChannel.channel) {
38-
throw new Error('AMQP channel not initialized. Did you call init()?');
39-
}
40-
41-
await this.amqpChannel.channel.publish(
41+
await this.publisherChannel.publish(
4242
amqpMessage.envelope.exchange,
4343
amqpMessage.envelope.routingKey,
4444
Buffer.from(JSON.stringify(amqpMessage.message)),
@@ -48,6 +48,12 @@ export class AmqpMessageBus implements IMessageBus {
4848
);
4949
}
5050

51+
async initPublisherChannel() {
52+
if (!this.publisherChannel && this.amqpChannel.connection) {
53+
this.publisherChannel = await this.amqpChannel.connection.createChannel();
54+
}
55+
}
56+
5157
private createMessageBuilderWhenUndefined(
5258
message: RoutingMessage,
5359
): AmqpMessageBuilder {

src/migrator/rabbitmq.migrator.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,36 +8,34 @@ export class RabbitmqMigrator {
88
return;
99
}
1010

11-
if (!channel.channel) {
12-
throw new Error('AMQP channel not initialized. Did you call init()?');
13-
}
11+
const amqpChannel = await channel.connection.createChannel();
1412

15-
await channel.channel.assertExchange(
13+
await amqpChannel.assertExchange(
1614
channel.config.exchangeName,
1715
channel.config.exchangeType,
1816
{ durable: true },
1917
);
2018

21-
await channel.channel.assertQueue(channel.config.queue, {
19+
await amqpChannel.assertQueue(channel.config.queue, {
2220
durable: true,
2321
});
2422

2523
if (channel.config.deadLetterQueueFeature === true) {
2624
const dlxExchange = 'dead_letter.exchange';
2725
const dlq = `${channel.config.queue}_dead_letter`;
2826

29-
await channel.channel.assertExchange(dlxExchange, 'direct', {
27+
await amqpChannel.assertExchange(dlxExchange, 'direct', {
3028
durable: true,
3129
});
3230

33-
await channel.channel.assertQueue(dlq, { durable: true });
31+
await amqpChannel.assertQueue(dlq, { durable: true });
3432

35-
await channel.channel.bindQueue(dlq, dlxExchange, dlq);
33+
await amqpChannel.bindQueue(dlq, dlxExchange, dlq);
3634
}
3735

3836
// Bindings
3937
for (const bindingKey of channel.config.bindingKeys ?? []) {
40-
await channel.channel.bindQueue(
38+
await amqpChannel.bindQueue(
4139
channel.config.queue,
4240
channel.config.exchangeName,
4341
bindingKey,

0 commit comments

Comments
 (0)