Skip to content

Commit

Permalink
fix: fix message process
Browse files Browse the repository at this point in the history
  • Loading branch information
hans00 committed Aug 28, 2023
1 parent c958096 commit 1dd7e56
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions src/simple-mq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ export class SimpleMQBroker extends EventEmitter {
if (!session) return;
switch (packet.qos) {
case 0:
this.processMessage(packet.messageId, packet.topic, packet.payload);
this.processMessage(packet);
break;
case 1:
client.puback({
messageId: packet.messageId,
});
this.processMessage(packet.messageId, packet.topic, packet.payload);
this.processMessage(packet);
break;
case 2:
session!.pending[packet.messageId!] = {
Expand All @@ -179,7 +179,10 @@ export class SimpleMQBroker extends EventEmitter {
client.pubcomp({
messageId,
});
this.processMessage(messageId, message.topic, message.payload);
this.processMessage({
...message,
messageId,
});
});
client.on('subscribe', (packet: ISubscribePacket) => {
if (!session) return;
Expand Down Expand Up @@ -225,42 +228,39 @@ export class SimpleMQBroker extends EventEmitter {
client.pingresp();
});
client.on('puback', (packet: IPubackPacket) => {
if (session?.tasks[packet.messageId!] !== undefined) {
clearInterval(session!.tasks[packet.messageId!]);
delete session!.tasks[packet.messageId!];
this.emit('messageSent', id, packet.messageId);
const messageId = packet.messageId ? Number(packet.messageId) : -1;
if (session?.tasks[messageId] !== undefined) {
clearInterval(session!.tasks[messageId]);
delete session!.tasks[messageId];
this.emit('messageSend', id, messageId);
}
});
client.on('pubrec', (packet: IPubrecPacket) => {
if (session?.tasks[packet.messageId!] !== undefined) {
client?.pubrel({
messageId: packet.messageId!,
});
const messageId = packet.messageId ? Number(packet.messageId) : -1;
if (session?.tasks[messageId] !== undefined) {
client?.pubrel({ messageId });
}
});
client.on('pubcomp', (packet: IPubcompPacket) => {
if (session?.tasks[packet.messageId!] !== undefined) {
clearInterval(session!.tasks[packet.messageId!]);
delete session!.tasks[packet.messageId!];
this.emit('messageSent', id, packet.messageId);
const messageId = packet.messageId ? Number(packet.messageId) : -1;
if (session?.tasks[messageId] !== undefined) {
clearInterval(session!.tasks[messageId]);
delete session!.tasks[messageId];
this.emit('messageSend', id, messageId);
}
});
}

protected processMessage(
messageId: number | undefined,
topic: string,
payload: string | Buffer
) {
this.emit('message', topic, payload);
this.publish(topic, payload, messageId);
protected processMessage(packet: Partial<IPublishPacket>) {
this.emit('message', packet.topic, packet.payload);
this.publish(packet.topic!, packet.payload!, packet.messageId);
}

publish(
topic: string,
payload: string | Buffer,
messageId: number | undefined
) {
): number {
const id = messageId ?? this.nextId++;
for (const cId in this.sessions) {
for (const rule in this.sessions[cId]!.subs) {
Expand All @@ -271,6 +271,7 @@ export class SimpleMQBroker extends EventEmitter {
}
}
}
return id;
}

protected createPubTask(
Expand Down

0 comments on commit 1dd7e56

Please sign in to comment.