Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
e4193c5
feat: add delivery receipt confirmation buffering
MartinCupela Sep 9, 2025
34bacd9
Merge branch 'master' into feat/delivery_receipts
rafaelmf3 Sep 15, 2025
e4a140b
fix: initialize read state with delivery receipt data
MartinCupela Sep 15, 2025
3ab61a5
refactor: rename event type 'notification.channel_delivered' to 'mess…
MartinCupela Sep 17, 2025
02aa87a
refactor: remove client_id and connection_id from MarkDeliveredOptions
MartinCupela Sep 17, 2025
029aaf8
feat: cap the number of reported delivery candidates to 100
MartinCupela Sep 18, 2025
decfe01
fix: prevent override of channel delivered info on message.new event …
MartinCupela Sep 18, 2025
76fa3c6
feat: add OwnMessageReceiptsTracker for performant read state retrieval
MartinCupela Sep 26, 2025
7c15340
feat: add methods usersWhoseLastReadIs & usersWhoseLastDeliveredIs to…
MartinCupela Sep 26, 2025
f92441c
chore: update OwnMessageReceiptsTracker doc string
MartinCupela Sep 26, 2025
1ede2a6
feat: report delivery when read events are enabled and privacy settin…
MartinCupela Oct 7, 2025
b201fd7
Merge branch 'master' into feat/delivery_receipts
MartinCupela Oct 7, 2025
468bf90
style: remove unnecessary comments
MartinCupela Oct 7, 2025
9a891cf
fix: prevent overriding read data on various message delivery events
MartinCupela Oct 7, 2025
eabae00
refactor: rename OwnMessageReceiptsTracker to MessageReceiptsTracker
MartinCupela Oct 8, 2025
065e908
feat: adjust MessageReceiptsTracker state on notification.mark_unread…
MartinCupela Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 73 additions & 7 deletions src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { ChannelState } from './channel_state';
import { MessageComposer } from './messageComposer';
import { MessageReceiptsTracker } from './messageDelivery';
import {
generateChannelTempCid,
logChatPromiseExecution,
Expand Down Expand Up @@ -74,7 +76,6 @@ import type {
} from './types';
import type { Role } from './permissions';
import type { CustomChannelData } from './custom_types';
import { MessageComposer } from './messageComposer';

/**
* Channel - The Channel class manages it's own state.
Expand Down Expand Up @@ -110,6 +111,7 @@ export class Channel {
disconnected: boolean;
push_preferences?: PushPreference;
public readonly messageComposer: MessageComposer;
public readonly messageReceiptsTracker: MessageReceiptsTracker;

/**
* constructor - Create a channel
Expand Down Expand Up @@ -158,6 +160,13 @@ export class Channel {
client: this._client,
compositionContext: this,
});

this.messageReceiptsTracker = new MessageReceiptsTracker({
locateMessage: (timestampMs) => {
const msg = this.state.findMessageByTimestamp(timestampMs);
return msg && { timestampMs, msgId: msg.id };
},
});
}

/**
Expand Down Expand Up @@ -1131,16 +1140,26 @@ export class Channel {
}

/**
* markRead - Send the mark read event for this user, only works if the `read_events` setting is enabled
* markRead - Send the mark read event for this user, only works if the `read_events` setting is enabled. Syncs the message delivery report candidates local state.
*
* @param {MarkReadOptions} data
* @return {Promise<EventAPIResponse | null>} Description
*/
async markRead(data: MarkReadOptions = {}) {
return await this.getClient().messageDeliveryReporter.markRead(this, data);
}

/**
* markReadRequest - Send the mark read event for this user, only works if the `read_events` setting is enabled
*
* @param {MarkReadOptions} data
* @return {Promise<EventAPIResponse | null>} Description
*/
async markAsReadRequest(data: MarkReadOptions = {}) {
this._checkInitialized();

if (!this.getConfig()?.read_events && !this.getClient()._isUsingServerAuth()) {
return Promise.resolve(null);
return null;
}

return await this.getClient().post<EventAPIResponse>(this._channelURL() + '/read', {
Expand Down Expand Up @@ -1554,6 +1573,7 @@ export class Channel {
{ method: 'upsertChannels' },
);

this.getClient().syncDeliveredCandidates([this]);
return state;
}

Expand Down Expand Up @@ -1874,18 +1894,50 @@ export class Channel {
break;
case 'message.read':
if (event.user?.id && event.created_at) {
const previousReadState = channelState.read[event.user.id];
channelState.read[event.user.id] = {
// in case we already have delivery information
...previousReadState,
last_read: new Date(event.created_at),
last_read_message_id: event.last_read_message_id,
user: event.user,
unread_messages: 0,
};
this.messageReceiptsTracker.onMessageRead({
user: event.user,
readAt: event.created_at,
lastReadMessageId: event.last_read_message_id,
});
const client = this.getClient();

if (event.user?.id === this.getClient().user?.id) {
const isOwnEvent = event.user?.id === client.user?.id;

if (isOwnEvent) {
channelState.unreadCount = 0;
client.syncDeliveredCandidates([this]);
}
}
break;
case 'message.delivered':
// todo: update also on thread
if (event.user?.id && event.created_at) {
const previousReadState = channelState.read[event.user.id];
channelState.read[event.user.id] = {
...previousReadState,
last_delivered_at: event.last_delivered_at
? new Date(event.last_delivered_at)
: undefined,
last_delivered_message_id: event.last_delivered_message_id,
user: event.user,
};

this.messageReceiptsTracker.onMessageDelivered({
user: event.user,
deliveredAt: event.created_at,
lastDeliveredMessageId: event.last_delivered_message_id,
});
}
break;
case 'user.watching.start':
case 'user.updated':
if (event.user?.id) {
Expand Down Expand Up @@ -1921,8 +1973,9 @@ export class Channel {
break;
case 'message.new':
if (event.message) {
const client = this.getClient();
/* if message belongs to current user, always assume timestamp is changed to filter it out and add again to avoid duplication */
const ownMessage = event.user?.id === this.getClient().user?.id;
const ownMessage = event.user?.id === client.user?.id;
const isThreadMessage =
event.message.parent_id && !event.message.show_in_channel;

Expand All @@ -1947,6 +2000,8 @@ export class Channel {
last_read: new Date(event.created_at as string),
user: event.user,
unread_messages: 0,
last_delivered_at: new Date(event.created_at as string),
last_delivered_message_id: event.message.id,
};
} else {
channelState.read[userId].unread_messages += 1;
Expand All @@ -1957,6 +2012,8 @@ export class Channel {
if (this._countMessageAsUnread(event.message)) {
channelState.unreadCount = channelState.unreadCount + 1;
}

client.syncDeliveredCandidates([this]);
}
break;
case 'message.updated':
Expand Down Expand Up @@ -2057,11 +2114,13 @@ export class Channel {
break;
case 'notification.mark_unread': {
const ownMessage = event.user?.id === this.getClient().user?.id;
if (!(ownMessage && event.user)) break;
if (!ownMessage || !event.user) break;

const unreadCount = event.unread_messages ?? 0;

const currentState = channelState.read[event.user.id];
channelState.read[event.user.id] = {
// keep the message delivery info
...currentState,
first_unread_message_id: event.first_unread_message_id,
last_read: new Date(event.last_read_at as string),
last_read_message_id: event.last_read_message_id,
Expand All @@ -2070,6 +2129,11 @@ export class Channel {
};

channelState.unreadCount = unreadCount;
this.messageReceiptsTracker.onNotificationMarkUnread({
user: event.user,
lastReadAt: event.last_read_at,
lastReadMessageId: event.last_read_message_id,
});
break;
}
case 'channel.updated':
Expand Down Expand Up @@ -2286,6 +2350,8 @@ export class Channel {
this.state.unreadCount = this.state.read[read.user.id].unread_messages;
}
}

this.messageReceiptsTracker.ingestInitial(state.read);
}

return {
Expand Down
161 changes: 147 additions & 14 deletions src/channel_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@ type ChannelReadStatus = Record<
}
>;

const messageSetBounds = (
a: LocalMessage[] | MessageResponse[],
b: LocalMessage[] | MessageResponse[],
) => ({
newestMessageA: new Date(a[0]?.created_at ?? 0),
oldestMessageA: new Date(a.slice(-1)[0]?.created_at ?? 0),
newestMessageB: new Date(b[0]?.created_at ?? 0),
oldestMessageB: new Date(b.slice(-1)[0]?.created_at ?? 0),
});

const aContainsOrEqualsB = (a: LocalMessage[], b: LocalMessage[]) => {
const { newestMessageA, newestMessageB, oldestMessageA, oldestMessageB } =
messageSetBounds(a, b);
return newestMessageA >= newestMessageB && oldestMessageB >= oldestMessageA;
};

const aOverlapsB = (a: LocalMessage[], b: LocalMessage[]) => {
const { newestMessageA, newestMessageB, oldestMessageA, oldestMessageB } =
messageSetBounds(a, b);
return (
oldestMessageA < oldestMessageB &&
oldestMessageB < newestMessageA &&
newestMessageA < newestMessageB
);
};

const messageSetsOverlapByTimestamp = (a: LocalMessage[], b: LocalMessage[]) =>
aContainsOrEqualsB(a, b) ||
aContainsOrEqualsB(b, a) ||
aOverlapsB(a, b) ||
aOverlapsB(b, a);

/**
* ChannelState - A container class for the channel state.
*/
Expand Down Expand Up @@ -867,6 +899,41 @@ export class ChannelState {
return this.messageSets[messageSetIndex].messages.find((m) => m.id === messageId);
}

findMessageByTimestamp(
timestampMs: number,
parentMessageId?: string,
exactTsMatch: boolean = false,
): LocalMessage | null {
if (
(parentMessageId && !this.threads[parentMessageId]) ||
this.messageSets.length === 0
)
return null;
const setIndex = this.findMessageSetByOldestTimestamp(timestampMs);
const targetMsgSet = this.messageSets[setIndex]?.messages;
if (!targetMsgSet?.length) return null;
const firstMsgTimestamp = targetMsgSet[0].created_at.getTime();
const lastMsgTimestamp = targetMsgSet.slice(-1)[0].created_at.getTime();
const isOutOfBound =
timestampMs < firstMsgTimestamp || lastMsgTimestamp < timestampMs;
if (isOutOfBound && exactTsMatch) return null;

let msgIndex = 0,
hi = targetMsgSet.length - 1;
while (msgIndex < hi) {
const mid = (msgIndex + hi) >>> 1;
if (timestampMs <= targetMsgSet[mid].created_at.getTime()) hi = mid;
else msgIndex = mid + 1;
}

const foundMessage = targetMsgSet[msgIndex];
return !exactTsMatch
? foundMessage
: foundMessage.created_at.getTime() === timestampMs
? foundMessage
: null;
}

private switchToMessageSet(index: number) {
const currentMessages = this.messageSets.find((s) => s.isCurrent);
if (!currentMessages) {
Expand All @@ -889,46 +956,112 @@ export class ChannelState {
);
}

/**
* Identifies the set index into which a message set would pertain if its first item's creation date corresponded to oldestTimestampMs.
* @param oldestTimestampMs
*/
private findMessageSetByOldestTimestamp = (oldestTimestampMs: number): number => {
let lo = 0,
hi = this.messageSets.length;
while (lo < hi) {
const mid = (lo + hi) >>> 1;
const msgSet = this.messageSets[mid];
// should not happen
if (msgSet.messages.length === 0) return -1;

const oldestMessageTimestampInSet = msgSet.messages[0].created_at.getTime();
if (oldestMessageTimestampInSet <= oldestTimestampMs) hi = mid;
else lo = mid + 1;
}
return lo;
};

private findTargetMessageSet(
newMessages: (MessageResponse | LocalMessage)[],
addIfDoesNotExist = true,
messageSetToAddToIfDoesNotExist: MessageSetType = 'current',
) {
let messagesToAdd: (MessageResponse | LocalMessage)[] = newMessages;
let targetMessageSetIndex!: number;
if (newMessages.length === 0)
return { targetMessageSetIndex: 0, messagesToAdd: newMessages };
if (addIfDoesNotExist) {
const overlappingMessageSetIndices = this.messageSets
const overlappingMessageSetIndicesByMsgIds = this.messageSets
.map((_, i) => i)
.filter((i) =>
this.areMessageSetsOverlap(this.messageSets[i].messages, newMessages),
);
const overlappingMessageSetIndicesByTimestamp = this.messageSets
.map((_, i) => i)
.filter((i) =>
messageSetsOverlapByTimestamp(
this.messageSets[i].messages,
newMessages.map(formatMessage),
),
);
switch (messageSetToAddToIfDoesNotExist) {
case 'new':
if (overlappingMessageSetIndices.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndices[0];
if (overlappingMessageSetIndicesByMsgIds.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndicesByMsgIds[0];
} else if (overlappingMessageSetIndicesByTimestamp.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndicesByTimestamp[0];
// No new message set is created if newMessages only contains thread replies
} else if (newMessages.some((m) => !m.parent_id)) {
this.messageSets.push({
messages: [],
isCurrent: false,
isLatest: false,
pagination: DEFAULT_MESSAGE_SET_PAGINATION,
});
targetMessageSetIndex = this.messageSets.length - 1;
// find the index to insert the set
const setIngestIndex = this.findMessageSetByOldestTimestamp(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
new Date(newMessages[0].created_at!).getTime(),
);
if (setIngestIndex === -1) {
this.messageSets.push({
messages: [],
isCurrent: false,
isLatest: false,
pagination: DEFAULT_MESSAGE_SET_PAGINATION,
});
targetMessageSetIndex = this.messageSets.length - 1;
} else {
const isLatest = setIngestIndex === 0;
this.messageSets.splice(setIngestIndex, 0, {
messages: [],
isCurrent: false,
isLatest,
pagination: DEFAULT_MESSAGE_SET_PAGINATION, // fixme: it is problematic decide about pagination without having data
});
if (isLatest) {
this.messageSets.slice(1).forEach((set) => {
set.isLatest = false;
});
}
targetMessageSetIndex = setIngestIndex;
}
}
break;
case 'current':
targetMessageSetIndex = this.messageSets.findIndex((s) => s.isCurrent);
// determine if there is another set to which it would match taken into consideration the timestamp
if (overlappingMessageSetIndicesByTimestamp.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndicesByTimestamp[0];
} else {
targetMessageSetIndex = this.messageSets.findIndex((s) => s.isCurrent);
}
break;
case 'latest':
targetMessageSetIndex = this.messageSets.findIndex((s) => s.isLatest);
// determine if there is another set to which it would match taken into consideration the timestamp
if (overlappingMessageSetIndicesByTimestamp.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndicesByTimestamp[0];
} else {
targetMessageSetIndex = this.messageSets.findIndex((s) => s.isLatest);
}
break;
default:
targetMessageSetIndex = -1;
}
// when merging the target set will be the first one from the overlapping message sets
const mergeTargetMessageSetIndex = overlappingMessageSetIndices.splice(0, 1)[0];
const mergeSourceMessageSetIndices = [...overlappingMessageSetIndices];
const mergeTargetMessageSetIndex = overlappingMessageSetIndicesByMsgIds.splice(
0,
1,
)[0];
const mergeSourceMessageSetIndices = [...overlappingMessageSetIndicesByMsgIds];
if (
mergeTargetMessageSetIndex !== undefined &&
mergeTargetMessageSetIndex !== targetMessageSetIndex
Expand Down
Loading