From a04c8dbb3cb04945e890d33f054f7db9e701f559 Mon Sep 17 00:00:00 2001 From: Hans Date: Sat, 17 Jun 2023 12:07:34 +0800 Subject: [PATCH] chore: optimize code --- src/simple-mq.ts | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/src/simple-mq.ts b/src/simple-mq.ts index 76427f3..9b3bedd 100644 --- a/src/simple-mq.ts +++ b/src/simple-mq.ts @@ -55,6 +55,7 @@ export class SimpleMQBroker extends EventEmitter { user?: string, pass?: Buffer ) => Promise | boolean; + protected nextId: number; constructor(options: Options = {}) { super(); @@ -68,6 +69,7 @@ export class SimpleMQBroker extends EventEmitter { this.server.on('close', this.emit.bind(this, 'close')); this.server.on('listening', this.emit.bind(this, 'listening')); this.sessions = {}; + this.nextId = 1000; } start(port?: number) { @@ -118,14 +120,14 @@ export class SimpleMQBroker extends EventEmitter { } id = packet.clientId; this.clients[id] = client; + session = this.sessions[id] ??= { subs: {}, alive: true, pending: {} }; + session!.alive = true; if (packet.clean) { - this.sessions[id] = { subs: {}, alive: true, pending: {} }; - } else { - this.sessions[id] ??= { subs: {}, alive: true, pending: {} }; - this.sessions[id]!.alive = true; - this.sessions[id]!.pending = {}; + session!.subs = {}; + session!.pending = {}; + session!.retry && clearTimeout(session!.retry!); } - session = this.sessions[id]; + this.emit('connect', id); }); client.on('publish', async (packet: IPublishPacket) => { if (!session) return; @@ -134,10 +136,10 @@ export class SimpleMQBroker extends EventEmitter { this.processMessage(packet.messageId, packet.topic, packet.payload); break; case 1: - this.processMessage(packet.messageId, packet.topic, packet.payload); client.puback({ messageId: packet.messageId, }); + this.processMessage(packet.messageId, packet.topic, packet.payload); break; case 2: session!.pending[packet.messageId!] = { @@ -154,11 +156,11 @@ export class SimpleMQBroker extends EventEmitter { const { messageId } = packet; if (!session?.pending[messageId]) return; const message = session!.pending[messageId]!; - this.processMessage(messageId, message.topic, message.payload); delete session?.pending[messageId]; client.pubcomp({ messageId, }); + this.processMessage(messageId, message.topic, message.payload); }); client.on('subscribe', (packet: ISubscribePacket) => { if (!session) return; @@ -195,11 +197,20 @@ export class SimpleMQBroker extends EventEmitter { payload: string | Buffer ) { this.emit('message', topic, payload); + this.publish(messageId, topic, payload); + } + + publish( + messageId: number | undefined, + topic: string, + payload: string | Buffer + ) { + const id = messageId ?? this.nextId++; for (const cId in this.sessions) { for (const rule in this.sessions[cId]!.subs) { if (matchRule(rule, topic)) { const qos = this.sessions[cId]!.subs[rule] ?? 0; - this.doTask(cId, messageId ?? 0, topic, payload, qos); + this.doTask(cId, id, topic, payload, qos); break; } }