Skip to content

Commit

Permalink
feat: support create multi publish task
Browse files Browse the repository at this point in the history
  • Loading branch information
hans00 committed Jun 17, 2023
1 parent 6b839e8 commit 32c55b1
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions src/simple-mq.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { QoS } from 'mqtt-packet';
import type {
IPingreqPacket,
IConnectPacket,
IPublishPacket,
IPubackPacket,
IPubrecPacket,
IPubcompPacket,
ISubscribePacket,
IUnsubscribePacket,
} from 'mqtt-packet';
Expand All @@ -24,7 +26,7 @@ interface Message {
export interface MQClientSession {
pending: { [id: string]: Message };
subs: { [rule: string]: QoS };
retry?: ReturnType<typeof setTimeout>;
tasks: { [id: number]: ReturnType<typeof setInterval> };
alive: boolean;
}

Expand Down Expand Up @@ -120,12 +122,18 @@ export class SimpleMQBroker extends EventEmitter {
}
id = packet.clientId;
this.clients[id] = client;
session = this.sessions[id] ??= { subs: {}, alive: true, pending: {} };
session = this.sessions[id] ??= {
subs: {},
alive: true,
pending: {},
tasks: {},
};
session!.alive = true;
if (packet.clean) {
Object.values(session!.tasks).forEach(clearInterval);
session!.tasks = {};
session!.subs = {};
session!.pending = {};
session!.retry && clearTimeout(session!.retry!);
}
this.emit('connect', id);
});
Expand Down Expand Up @@ -184,10 +192,27 @@ export class SimpleMQBroker extends EventEmitter {
reasonCode: 0,
});
});
client.on('pingreq', (packet: IPingreqPacket) => {
client.pingresp({
messageId: packet.messageId,
});
client.on('pingreq', () => {
client.pingresp();
});
client.on('puback', (packet: IPubackPacket) => {
if (session?.tasks[packet.messageId!] !== undefined) {
clearInterval(session!.tasks[packet.messageId!]);
delete session!.tasks[packet.messageId!];
}
});
client.on('pubrec', (packet: IPubrecPacket) => {
if (session?.tasks[packet.messageId!] !== undefined) {
client?.pubrel({
messageId: packet.messageId!,
});
}
});
client.on('pubcomp', (packet: IPubcompPacket) => {
if (session?.tasks[packet.messageId!] !== undefined) {
clearInterval(session!.tasks[packet.messageId!]);
delete session!.tasks[packet.messageId!];
}
});
}

Expand All @@ -210,49 +235,31 @@ export class SimpleMQBroker extends EventEmitter {
for (const rule in this.sessions[cId]!.subs) {
if (matchRule(rule, topic)) {
const qos = this.sessions[cId]!.subs[rule] ?? 0;
this.doTask(cId, id, topic, payload, qos);
this.createPubTask(cId, id, topic, payload, qos);
break;
}
}
}
}

protected doTask(
protected createPubTask(
cId: string,
messageId: number,
topic: string,
payload: string | Buffer,
qos: QoS,
dup: boolean = false
) {
if (!this.sessions[cId]) return;
this.clients[cId]!.publish({ messageId, topic, payload, qos, dup });
switch (qos) {
case 1:
this.clients[cId]!.once('puback', () => {
clearTimeout(this.sessions[cId]?.retry);
});
this.sessions[cId]!.retry = setTimeout(() => {
this.clients[cId]?.removeAllListeners('puback');
delete this.sessions[cId]?.retry;
this.doTask(cId, messageId, topic, payload, qos, true);
}, this.retryInterval);
break;
case 2:
this.clients[cId]!.once('pubrec', () => {
this.clients[cId]!.pubrel();
});
this.clients[cId]!.once('pubcomp', () => {
clearTimeout(this.sessions[cId]?.retry);
});
this.sessions[cId]!.retry = setTimeout(() => {
this.clients[cId]?.removeAllListeners('pubrec');
this.clients[cId]?.removeAllListeners('pubcomp');
delete this.sessions[cId]?.retry;
this.doTask(cId, messageId, topic, payload, qos, true);
}, this.retryInterval);
break;
const session = this.sessions[cId];
if (!session?.alive) return;
const pubPacket = { messageId, topic, payload, qos, dup };
const client = this.clients[cId];
if (qos > 0) {
session!.tasks[messageId] = setInterval(() => {
client?.publish(pubPacket);
}, this.retryInterval);
}
client?.publish(pubPacket);
}
}

Expand Down

0 comments on commit 32c55b1

Please sign in to comment.