Skip to content

Commit 7ff9611

Browse files
committed
refactor: subscriber restart
1 parent 767664d commit 7ff9611

File tree

1 file changed

+16
-20
lines changed

1 file changed

+16
-20
lines changed

packages/core/src/controllers/subscriber.ts

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export class Subscriber extends ISubscriber {
4545
private storagePrefix = CORE_STORAGE_PREFIX;
4646
private subscribeTimeout = toMiliseconds(ONE_MINUTE);
4747
private initialSubscribeTimeout = toMiliseconds(ONE_SECOND * 15);
48-
private restartInProgress = false;
48+
private restartPromise: Promise<void> | undefined;
4949
private clientId: string;
5050
private batchSubscribeTopicsLimit = 500;
5151

@@ -322,7 +322,10 @@ export class Subscriber extends ISubscriber {
322322
this.subscribeTimeout,
323323
"rpcBatchSubscribe failed, please try again",
324324
);
325-
return await subscribe;
325+
await subscribe;
326+
subscriptions.forEach((s) => {
327+
this.pending.delete(s.topic);
328+
});
326329
} catch (err) {
327330
this.relayer.events.emit(RELAYER_EVENTS.connection_stalled);
328331
}
@@ -448,10 +451,14 @@ export class Subscriber extends ISubscriber {
448451
}
449452

450453
private restart = async () => {
451-
this.restartInProgress = true;
452-
await this.restore();
453-
await this.onRestart();
454-
this.restartInProgress = false;
454+
if (this.restartPromise) return;
455+
this.restartPromise = new Promise<void>(async (resolve) => {
456+
await this.restore();
457+
await this.onRestart();
458+
resolve();
459+
});
460+
await this.restartPromise;
461+
this.restartPromise = undefined;
455462
};
456463

457464
private async persist() {
@@ -470,7 +477,7 @@ export class Subscriber extends ISubscriber {
470477
setTimeout(async () => {
471478
await this.batchSubscribe(batch);
472479
resolve();
473-
}, 1000);
480+
}, toMiliseconds(ONE_SECOND));
474481
});
475482
}
476483
}
@@ -503,9 +510,6 @@ export class Subscriber extends ISubscriber {
503510
subscriptions.map((s) => ({ ...s, id: this.getSubscriptionId(s.topic) })),
504511
);
505512
await this.rpcBatchSubscribe(subscriptions);
506-
subscriptions.forEach((s) => {
507-
this.pending.delete(s.topic);
508-
});
509513
}
510514

511515
// @ts-ignore
@@ -574,16 +578,8 @@ export class Subscriber extends ISubscriber {
574578
if (!this.relayer.connected && !this.relayer.connecting) {
575579
await this.relayer.transportOpen();
576580
}
577-
if (!this.restartInProgress) return;
578-
579-
await new Promise<void>((resolve) => {
580-
const interval = setInterval(() => {
581-
if (!this.restartInProgress) {
582-
clearInterval(interval);
583-
resolve();
584-
}
585-
}, this.pollingInterval);
586-
});
581+
if (!this.restartPromise) return;
582+
await this.restartPromise;
587583
}
588584

589585
private getSubscriptionId(topic: string) {

0 commit comments

Comments
 (0)