Skip to content

Commit

Permalink
chore: optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
hans00 committed Jun 17, 2023
1 parent dc80454 commit a04c8db
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions src/simple-mq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class SimpleMQBroker extends EventEmitter {
user?: string,
pass?: Buffer
) => Promise<boolean> | boolean;
protected nextId: number;

constructor(options: Options = {}) {
super();
Expand All @@ -68,6 +69,7 @@ export class SimpleMQBroker extends EventEmitter {
this.server.on('close', this.emit.bind(this, 'close'));
this.server.on('listening', this.emit.bind(this, 'listening'));
this.sessions = {};
this.nextId = 1000;
}

start(port?: number) {
Expand Down Expand Up @@ -118,14 +120,14 @@ export class SimpleMQBroker extends EventEmitter {
}
id = packet.clientId;
this.clients[id] = client;
session = this.sessions[id] ??= { subs: {}, alive: true, pending: {} };
session!.alive = true;
if (packet.clean) {
this.sessions[id] = { subs: {}, alive: true, pending: {} };
} else {
this.sessions[id] ??= { subs: {}, alive: true, pending: {} };
this.sessions[id]!.alive = true;
this.sessions[id]!.pending = {};
session!.subs = {};
session!.pending = {};
session!.retry && clearTimeout(session!.retry!);
}
session = this.sessions[id];
this.emit('connect', id);
});
client.on('publish', async (packet: IPublishPacket) => {
if (!session) return;
Expand All @@ -134,10 +136,10 @@ export class SimpleMQBroker extends EventEmitter {
this.processMessage(packet.messageId, packet.topic, packet.payload);
break;
case 1:
this.processMessage(packet.messageId, packet.topic, packet.payload);
client.puback({
messageId: packet.messageId,
});
this.processMessage(packet.messageId, packet.topic, packet.payload);
break;
case 2:
session!.pending[packet.messageId!] = {
Expand All @@ -154,11 +156,11 @@ export class SimpleMQBroker extends EventEmitter {
const { messageId } = packet;
if (!session?.pending[messageId]) return;
const message = session!.pending[messageId]!;
this.processMessage(messageId, message.topic, message.payload);
delete session?.pending[messageId];
client.pubcomp({
messageId,
});
this.processMessage(messageId, message.topic, message.payload);
});
client.on('subscribe', (packet: ISubscribePacket) => {
if (!session) return;
Expand Down Expand Up @@ -195,11 +197,20 @@ export class SimpleMQBroker extends EventEmitter {
payload: string | Buffer
) {
this.emit('message', topic, payload);
this.publish(messageId, topic, payload);
}

publish(
messageId: number | undefined,
topic: string,
payload: string | Buffer
) {
const id = messageId ?? this.nextId++;
for (const cId in this.sessions) {
for (const rule in this.sessions[cId]!.subs) {
if (matchRule(rule, topic)) {
const qos = this.sessions[cId]!.subs[rule] ?? 0;
this.doTask(cId, messageId ?? 0, topic, payload, qos);
this.doTask(cId, id, topic, payload, qos);
break;
}
}
Expand Down

0 comments on commit a04c8db

Please sign in to comment.