Skip to content

Commit

Permalink
feat: support pending multiple message
Browse files Browse the repository at this point in the history
  • Loading branch information
hans00 committed Jun 16, 2023
1 parent f767134 commit 6b00614
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions src/simple-mq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ export interface Options {
keepalive?: number;
}

interface Message {
topic: string;
payload: string | Buffer;
}

export interface MQClientSession {
storeMessage?: {
messageId?: number;
topic: string;
payload: string | Buffer;
};
pending: { [id: string]: Message };
subs: { [rule: string]: QoS };
retry?: ReturnType<typeof setTimeout>;
alive: boolean;
Expand Down Expand Up @@ -80,7 +81,6 @@ export class SimpleMQBroker extends EventEmitter {
protected handleClient(client: Client) {
let id: string | undefined;
let session: MQClientSession | undefined;
client.on('data', (packet) => console.log(packet));
client.on('close', () => {
if (id && session) {
delete this.clients[id!];
Expand All @@ -99,7 +99,9 @@ export class SimpleMQBroker extends EventEmitter {
client.setProtocolVersion(version);
const pass =
(await this.authenticate?.(packet.username, packet.password)) ?? true;
const sessionPresent = packet.clean ? false : !!this.sessions[packet.clientId];
const sessionPresent = packet.clean
? false
: !!this.sessions[packet.clientId];
const keepalive = packet.keepalive ?? this.keepalive;
if (pass) {
client.connack({
Expand All @@ -117,15 +119,16 @@ export class SimpleMQBroker extends EventEmitter {
id = packet.clientId;
this.clients[id] = client;
if (packet.clean) {
this.sessions[id] = { subs: {}, alive: true };
this.sessions[id] = { subs: {}, alive: true, pending: {} };
} else {
this.sessions[id] ??= { subs: {}, alive: true };
this.sessions[id] ??= { subs: {}, alive: true, pending: {} };
this.sessions[id]!.alive = true;
this.sessions[id]!.pending = {};
}
session = this.sessions[id];
});
client.on('publish', async (packet: IPublishPacket) => {
if (!session || session?.storeMessage) return;
if (!session) return;
switch (packet.qos) {
case 0:
this.processMessage(packet.messageId, packet.topic, packet.payload);
Expand All @@ -137,8 +140,7 @@ export class SimpleMQBroker extends EventEmitter {
});
break;
case 2:
session!.storeMessage = {
messageId: packet.messageId,
session!.pending[packet.messageId!] = {
topic: packet.topic,
payload: packet.payload,
};
Expand All @@ -149,15 +151,13 @@ export class SimpleMQBroker extends EventEmitter {
}
});
client.on('pubrel', (packet) => {
if (!session?.storeMessage) return;
this.processMessage(
session!.storeMessage!.messageId,
session!.storeMessage!.topic,
session!.storeMessage!.payload
);
delete session?.storeMessage;
const { messageId } = packet;
if (!session?.pending[messageId]) return;
const message = session!.pending[messageId]!;
this.processMessage(messageId, message.topic, message.payload);
delete session?.pending[messageId];
client.pubcomp({
messageId: packet.messageId,
messageId,
});
});
client.on('subscribe', (packet: ISubscribePacket) => {
Expand Down

0 comments on commit 6b00614

Please sign in to comment.