Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core, api-client): Add abort controller to notifications api call (WPB-11013) #6577

Merged
merged 17 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions packages/api-client/src/http/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ export class HttpClient extends EventEmitter {
}
}

public async _sendRequest<T>(config: AxiosRequestConfig, isFirstTry = true): Promise<AxiosResponse<T>> {
public async _sendRequest<T>(
config: AxiosRequestConfig,
isFirstTry = true,
abortController?: AbortController,
): Promise<AxiosResponse<T>> {
thisisamir98 marked this conversation as resolved.
Show resolved Hide resolved
if (this.accessTokenStore.accessToken) {
// TODO: remove tokenAsParam
const {token_type, access_token} = this.accessTokenStore.accessToken;
Expand All @@ -145,6 +149,7 @@ export class HttpClient extends EventEmitter {
try {
const response = await this.client.request<T>({
...config,
signal: abortController?.signal,
// We want to prefix all urls, except the ones with cookies which are attached to unprefixed urls
url: config.withCredentials ? config.url : `${this.versionPrefix}${config.url}`,
maxBodyLength: FILE_SIZE_100_MB,
Expand All @@ -161,7 +166,7 @@ export class HttpClient extends EventEmitter {
config['axios-retry'] = {
retries: 0,
};
return this._sendRequest<T>(config, false);
return this._sendRequest<T>(config, false, abortController);
thisisamir98 marked this conversation as resolved.
Show resolved Hide resolved
};

const hasAccessToken = !!this.accessTokenStore?.accessToken;
Expand Down Expand Up @@ -276,10 +281,11 @@ export class HttpClient extends EventEmitter {
public async sendRequest<T>(
config: AxiosRequestConfig,
isSynchronousRequest: boolean = false,
abortController?: AbortController,
): Promise<AxiosResponse<T>> {
const promise = isSynchronousRequest
? this.requestQueue.add(() => this._sendRequest<T>(config))
: this._sendRequest<T>(config);
? this.requestQueue.add(() => this._sendRequest<T>(config, true, abortController))
: this._sendRequest<T>(config, true, abortController);
thisisamir98 marked this conversation as resolved.
Show resolved Hide resolved

try {
return await promise;
Expand All @@ -289,14 +295,18 @@ export class HttpClient extends EventEmitter {
const isTooManyRequestsError = axios.isAxiosError(error) && error.response?.status === 420;

if (isTooManyRequestsError) {
return this.backOffQueue.add(() => this._sendRequest<T>(config));
return this.backOffQueue.add(() => this._sendRequest<T>(config, true, abortController));
thisisamir98 marked this conversation as resolved.
Show resolved Hide resolved
}

throw error;
}
}

public sendJSON<T>(config: AxiosRequestConfig, isSynchronousRequest: boolean = false): Promise<AxiosResponse<T>> {
public sendJSON<T>(
config: AxiosRequestConfig,
isSynchronousRequest: boolean = false,
abortController?: AbortController,
): Promise<AxiosResponse<T>> {
const shouldGzipData =
process.env.NODE_ENV !== 'test' &&
!!config.data &&
Expand All @@ -312,7 +322,7 @@ export class HttpClient extends EventEmitter {
'Content-Encoding': shouldGzipData ? 'gzip' : config.headers?.['Content-Encoding'],
};

return this.sendRequest<T>(config, isSynchronousRequest);
return this.sendRequest<T>(config, isSynchronousRequest, abortController);
thisisamir98 marked this conversation as resolved.
Show resolved Hide resolved
}

public sendXML<T>(config: AxiosRequestConfig): Promise<AxiosResponse<T>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export class NotificationAPI {
clientId?: string,
size: number = NOTIFICATION_SIZE_MAXIMUM,
since?: string,
abortController?: AbortController,
): Promise<NotificationList> {
const config: AxiosRequestConfig = {
method: 'get',
Expand All @@ -76,7 +77,7 @@ export class NotificationAPI {
url: NotificationAPI.URL.NOTIFICATION,
};

const response = await this.client.sendJSON<NotificationList>(config);
const response = await this.client.sendJSON<NotificationList>(config, false, abortController);
return response.data;
}

Expand All @@ -86,7 +87,11 @@ export class NotificationAPI {
* @param lastNotificationId Only return notifications more recent than this
* @see https://staging-nginz-https.zinfra.io/swagger-ui/#!/push/fetchNotifications
*/
public async getAllNotifications(clientId?: string, lastNotificationId?: string): Promise<NotificationsReponse> {
public async getAllNotifications(
clientId?: string,
lastNotificationId?: string,
abortController?: AbortController,
): Promise<NotificationsReponse> {
const getNotificationChunks = async (
notificationList: Notification[],
currentClientId?: string,
Expand All @@ -101,7 +106,12 @@ export class NotificationAPI {
let hasMissedNotifications = false;

try {
payload = await this.getNotifications(currentClientId, NOTIFICATION_SIZE_MAXIMUM, currentNotificationId);
payload = await this.getNotifications(
currentClientId,
NOTIFICATION_SIZE_MAXIMUM,
currentNotificationId,
abortController,
);
} catch (error) {
const isAxiosError = axios.isAxiosError(error);

Expand Down
16 changes: 3 additions & 13 deletions packages/api-client/src/tcp/WebSocketClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,7 @@ export interface WebSocketClient {
on(event: TOPIC.ON_STATE_CHANGE, listener: (state: WEBSOCKET_STATE) => void): this;
}

export class AbortHandler {
private aborted = false;

abort = () => {
this.aborted = true;
};

isAborted = () => this.aborted;
}

export type OnConnect = (abortHandler: AbortHandler) => Promise<void>;
export type OnConnect = (abortHandler: AbortController) => Promise<void>;

export class WebSocketClient extends EventEmitter {
private clientId?: string;
Expand All @@ -64,7 +54,7 @@ export class WebSocketClient extends EventEmitter {
public client: HttpClient;
private isSocketLocked: boolean;
private bufferedMessages: string[];
private abortHandler?: AbortHandler;
private abortHandler?: AbortController;

public static readonly TOPIC = TOPIC;

Expand Down Expand Up @@ -146,7 +136,7 @@ export class WebSocketClient extends EventEmitter {
this.socket.setOnOpen(() => {
this.onOpen();
if (onConnect) {
this.abortHandler = new AbortHandler();
this.abortHandler = new AbortController();
void onConnect(this.abortHandler);
}
});
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/Account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import * as Events from '@wireapp/api-client/lib/event';
import {CONVERSATION_EVENT} from '@wireapp/api-client/lib/event';
import {Notification} from '@wireapp/api-client/lib/notification/';
import {AbortHandler, WebSocketClient} from '@wireapp/api-client/lib/tcp/';
import {WebSocketClient} from '@wireapp/api-client/lib/tcp/';
import {WEBSOCKET_STATE} from '@wireapp/api-client/lib/tcp/ReconnectingWebsocket';
import {QualifiedId} from '@wireapp/api-client/lib/user';
import {TimeInMillis} from '@wireapp/commons/lib/util/TimeUtil';
Expand Down Expand Up @@ -628,14 +628,14 @@

const handleMissedNotifications = async (notificationId: string) => {
if (this.hasMLSDevice) {
queueConversationRejoin('all-conversations', () =>

Check warning on line 631 in packages/core/src/Account.ts

View workflow job for this annotation

GitHub Actions / lint

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
this.service!.conversation.handleConversationsEpochMismatch(),
);
}
return onMissedNotifications(notificationId);
};

const processNotificationStream = async (abortHandler: AbortHandler) => {
const processNotificationStream = async (abortHandler: AbortController) => {
// Lock websocket in order to buffer any message that arrives while we handle the notification stream
this.apiClient.transport.ws.lock();
pauseMessageSending();
Expand All @@ -653,7 +653,7 @@
);
this.logger.info(`Finished processing notifications ${JSON.stringify(results)}`, results);

if (abortHandler.isAborted()) {
if (abortHandler.signal.aborted) {
this.logger.warn('Ending connection process as websocket was closed');
return;
}
Expand All @@ -666,6 +666,7 @@
resumeMessageSending();
resumeRejoiningMLSConversations();
};

this.apiClient.connect(processNotificationStream);

return () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import {APIClient} from '@wireapp/api-client';
export class NotificationBackendRepository {
constructor(private readonly apiClient: APIClient) {}

public async getAllNotifications(clientId?: string, lastNotificationId?: string) {
return this.apiClient.api.notification.getAllNotifications(clientId, lastNotificationId);
public async getAllNotifications(clientId?: string, lastNotificationId?: string, abortController?: AbortController) {
return this.apiClient.api.notification.getAllNotifications(clientId, lastNotificationId, abortController);
}

public getLastNotification(clientId?: string): Promise<Notification> {
Expand Down
11 changes: 5 additions & 6 deletions packages/core/src/notification/NotificationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import {BackendEvent} from '@wireapp/api-client/lib/event';
import {Notification} from '@wireapp/api-client/lib/notification/';
import {AbortHandler} from '@wireapp/api-client/lib/tcp';
import logdown from 'logdown';

import {APIClient} from '@wireapp/api-client';
Expand Down Expand Up @@ -90,9 +89,9 @@ export class NotificationService extends TypedEventEmitter<Events> {
this.database = new NotificationDatabaseRepository(storeEngine);
}

private async getAllNotifications(since: string) {
private async getAllNotifications(since: string, abortController: AbortController) {
const clientId = this.apiClient.clientId;
return this.backend.getAllNotifications(clientId, since);
return this.backend.getAllNotifications(clientId, since, abortController);
}

/** Should only be called with a completely new client. */
Expand Down Expand Up @@ -140,10 +139,10 @@ export class NotificationService extends TypedEventEmitter<Events> {
public async processNotificationStream(
notificationHandler: NotificationHandler,
onMissedNotifications: (notificationId: string) => void,
abortHandler: AbortHandler,
abortHandler: AbortController,
): Promise<{total: number; error: number; success: number}> {
const lastNotificationId = await this.database.getLastNotificationId();
const {notifications, missedNotification} = await this.getAllNotifications(lastNotificationId);
const {notifications, missedNotification} = await this.getAllNotifications(lastNotificationId, abortHandler);
if (missedNotification) {
onMissedNotifications(missedNotification);
}
Expand All @@ -155,7 +154,7 @@ export class NotificationService extends TypedEventEmitter<Events> {
: `No notification to process from the stream`;
this.logger.log(logMessage);
for (const [index, notification] of notifications.entries()) {
if (abortHandler.isAborted()) {
if (abortHandler.signal.aborted) {
/* Stop handling notifications if the websocket has been disconnected.
* Upon reconnecting we are going to restart handling the notification stream for where we left of
*/
Expand Down
Loading