From 32c55b154425a6a3777bf250ca19f8197bff2e37 Mon Sep 17 00:00:00 2001 From: Hans Date: Sat, 17 Jun 2023 17:17:37 +0800 Subject: [PATCH] feat: support create multi publish task --- src/simple-mq.ts | 81 ++++++++++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/src/simple-mq.ts b/src/simple-mq.ts index 9b3bedd..4808a33 100644 --- a/src/simple-mq.ts +++ b/src/simple-mq.ts @@ -1,8 +1,10 @@ import type { QoS } from 'mqtt-packet'; import type { - IPingreqPacket, IConnectPacket, IPublishPacket, + IPubackPacket, + IPubrecPacket, + IPubcompPacket, ISubscribePacket, IUnsubscribePacket, } from 'mqtt-packet'; @@ -24,7 +26,7 @@ interface Message { export interface MQClientSession { pending: { [id: string]: Message }; subs: { [rule: string]: QoS }; - retry?: ReturnType; + tasks: { [id: number]: ReturnType }; alive: boolean; } @@ -120,12 +122,18 @@ export class SimpleMQBroker extends EventEmitter { } id = packet.clientId; this.clients[id] = client; - session = this.sessions[id] ??= { subs: {}, alive: true, pending: {} }; + session = this.sessions[id] ??= { + subs: {}, + alive: true, + pending: {}, + tasks: {}, + }; session!.alive = true; if (packet.clean) { + Object.values(session!.tasks).forEach(clearInterval); + session!.tasks = {}; session!.subs = {}; session!.pending = {}; - session!.retry && clearTimeout(session!.retry!); } this.emit('connect', id); }); @@ -184,10 +192,27 @@ export class SimpleMQBroker extends EventEmitter { reasonCode: 0, }); }); - client.on('pingreq', (packet: IPingreqPacket) => { - client.pingresp({ - messageId: packet.messageId, - }); + client.on('pingreq', () => { + client.pingresp(); + }); + client.on('puback', (packet: IPubackPacket) => { + if (session?.tasks[packet.messageId!] !== undefined) { + clearInterval(session!.tasks[packet.messageId!]); + delete session!.tasks[packet.messageId!]; + } + }); + client.on('pubrec', (packet: IPubrecPacket) => { + if (session?.tasks[packet.messageId!] !== undefined) { + client?.pubrel({ + messageId: packet.messageId!, + }); + } + }); + client.on('pubcomp', (packet: IPubcompPacket) => { + if (session?.tasks[packet.messageId!] !== undefined) { + clearInterval(session!.tasks[packet.messageId!]); + delete session!.tasks[packet.messageId!]; + } }); } @@ -210,14 +235,14 @@ export class SimpleMQBroker extends EventEmitter { for (const rule in this.sessions[cId]!.subs) { if (matchRule(rule, topic)) { const qos = this.sessions[cId]!.subs[rule] ?? 0; - this.doTask(cId, id, topic, payload, qos); + this.createPubTask(cId, id, topic, payload, qos); break; } } } } - protected doTask( + protected createPubTask( cId: string, messageId: number, topic: string, @@ -225,34 +250,16 @@ export class SimpleMQBroker extends EventEmitter { qos: QoS, dup: boolean = false ) { - if (!this.sessions[cId]) return; - this.clients[cId]!.publish({ messageId, topic, payload, qos, dup }); - switch (qos) { - case 1: - this.clients[cId]!.once('puback', () => { - clearTimeout(this.sessions[cId]?.retry); - }); - this.sessions[cId]!.retry = setTimeout(() => { - this.clients[cId]?.removeAllListeners('puback'); - delete this.sessions[cId]?.retry; - this.doTask(cId, messageId, topic, payload, qos, true); - }, this.retryInterval); - break; - case 2: - this.clients[cId]!.once('pubrec', () => { - this.clients[cId]!.pubrel(); - }); - this.clients[cId]!.once('pubcomp', () => { - clearTimeout(this.sessions[cId]?.retry); - }); - this.sessions[cId]!.retry = setTimeout(() => { - this.clients[cId]?.removeAllListeners('pubrec'); - this.clients[cId]?.removeAllListeners('pubcomp'); - delete this.sessions[cId]?.retry; - this.doTask(cId, messageId, topic, payload, qos, true); - }, this.retryInterval); - break; + const session = this.sessions[cId]; + if (!session?.alive) return; + const pubPacket = { messageId, topic, payload, qos, dup }; + const client = this.clients[cId]; + if (qos > 0) { + session!.tasks[messageId] = setInterval(() => { + client?.publish(pubPacket); + }, this.retryInterval); } + client?.publish(pubPacket); } }