Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: connection robustness and flaky tests #5515

Merged
merged 98 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
54e478d
fix: 1CA tests flakiness
ganchoradkov Nov 22, 2024
f49c793
feat: cache result of expiring promise
ganchoradkov Nov 22, 2024
c6d40c3
fix: relayer to do only single connection attempt at a time
ganchoradkov Nov 22, 2024
d919e6e
refactor: network drop test
ganchoradkov Nov 22, 2024
16e9b26
feat: adds 5 time retry to connect
ganchoradkov Nov 22, 2024
9d50cfc
refactor: compare arrays before rejecting
ganchoradkov Nov 22, 2024
f1849ae
chore: prettier
ganchoradkov Nov 22, 2024
962bc89
refactor: subscribes via rpc call or pending list
ganchoradkov Nov 25, 2024
0646193
refactor: increase test throttle
ganchoradkov Nov 25, 2024
df95abb
chore: log fatal error
ganchoradkov Nov 25, 2024
7672b13
chore: log batch subscribe
ganchoradkov Nov 25, 2024
51d05cc
chore: handle transport stall error
ganchoradkov Nov 25, 2024
1457c18
refactor: relayer tests
ganchoradkov Nov 25, 2024
8090e07
chore: logs
ganchoradkov Nov 25, 2024
a39c6e4
chore: log result
ganchoradkov Nov 25, 2024
650a211
chore: more logs
ganchoradkov Nov 25, 2024
d02b5d4
fix: relayer publish
ganchoradkov Nov 25, 2024
a73778a
chore: rm unused var
ganchoradkov Nov 25, 2024
99ac3ad
chore: rm socket stall listener
ganchoradkov Nov 25, 2024
461b56f
refactor: subscriber start
ganchoradkov Nov 25, 2024
8bdc9c3
chore: log rpcSubscribe result
ganchoradkov Nov 25, 2024
7422e9b
fix: pending subscriber queue
ganchoradkov Nov 25, 2024
e3abdcb
fix: check cached on heartbeat
ganchoradkov Nov 25, 2024
26cd856
refactor: use pending.size
ganchoradkov Nov 25, 2024
b77cad3
feat: restart transport if pending subs fail a few times
ganchoradkov Nov 25, 2024
59aef56
fix: remove from attempts on successful sub
ganchoradkov Nov 25, 2024
ea2c483
refactor: publisher queue
ganchoradkov Nov 25, 2024
96c116d
chore: rm logss
ganchoradkov Nov 25, 2024
dd0dadf
chore: reduce connect timeout to 15s
ganchoradkov Nov 25, 2024
5197f32
chore: comment out
ganchoradkov Nov 25, 2024
f18ef8c
refactor: create new providers
ganchoradkov Nov 25, 2024
e8aa2fb
refactor: catch publish err
ganchoradkov Nov 25, 2024
f2179cf
refactor: increase connect timeout to 15s
ganchoradkov Nov 25, 2024
446e19e
fix: 15s
ganchoradkov Nov 25, 2024
c62c445
chore: log client attempts
ganchoradkov Nov 25, 2024
a4aaa22
chore: log successful attempt
ganchoradkov Nov 25, 2024
3ba195d
chore: log connect state
ganchoradkov Nov 25, 2024
0e69447
fix: lint
ganchoradkov Nov 25, 2024
8061869
chore: logs
ganchoradkov Nov 26, 2024
df3d007
chore: custom core logger name
ganchoradkov Nov 26, 2024
844df23
refactor: publish logic
ganchoradkov Nov 26, 2024
d0f7326
refactor: reqInFlight
ganchoradkov Nov 26, 2024
f284858
refactor: request attempt limit
ganchoradkov Nov 26, 2024
8500f8e
refactor: wraps request in a promise
ganchoradkov Nov 26, 2024
2e61424
refactor: publisher track attempts
ganchoradkov Nov 26, 2024
2c2f298
refactor: await .request
ganchoradkov Nov 26, 2024
4ec834a
refactor: reconnection
ganchoradkov Nov 26, 2024
7b372f7
chore: log ping timeout
ganchoradkov Nov 26, 2024
62a2350
refactor: delay subscribe after connection
ganchoradkov Nov 26, 2024
dff6ade
chore: disable multi threading
ganchoradkov Nov 26, 2024
c554e14
refactor: publishing
ganchoradkov Nov 26, 2024
2096ebc
chore: skip rejection tags & web3wallet tests
ganchoradkov Nov 26, 2024
ac5d8a0
chore: skip tests
ganchoradkov Nov 26, 2024
18fb537
chore: reenable threading
ganchoradkov Nov 26, 2024
190c0bb
chore: rm comments
ganchoradkov Nov 26, 2024
ddc1144
fix: core persistence tests
ganchoradkov Nov 26, 2024
e8bfeea
chore: reenable skipped tests
ganchoradkov Nov 26, 2024
bb4eb19
chore: skip tests
ganchoradkov Nov 26, 2024
f827d24
chore: cleanup
ganchoradkov Nov 27, 2024
d11e623
fix: batch fetch messages
ganchoradkov Nov 27, 2024
d9f2253
chore: enable multi threads testing
ganchoradkov Nov 27, 2024
a51d034
refactor: await before emitting batch subscribe
ganchoradkov Nov 27, 2024
c07653d
chore: debug logs
ganchoradkov Nov 27, 2024
e1b4528
fix: promise handling
ganchoradkov Nov 28, 2024
843252f
fix: tests
ganchoradkov Nov 28, 2024
33cf401
chore: skip tests
ganchoradkov Nov 28, 2024
d8e589d
chore: log requests being sent
ganchoradkov Nov 29, 2024
e97635a
chore: log ignored message events
ganchoradkov Nov 29, 2024
9c569f2
chore: double check shouldIgnoreMessages
ganchoradkov Nov 29, 2024
8b3c71e
refactor: double check before ignoring a message
ganchoradkov Nov 29, 2024
94126b5
chore: awaits respond to request
ganchoradkov Nov 29, 2024
c2a910d
refactor: test
ganchoradkov Nov 29, 2024
c3881a8
chore: reenable tests
ganchoradkov Nov 29, 2024
5481d04
chore: log publisher queuue
ganchoradkov Nov 29, 2024
b3ca5f4
chore: log
ganchoradkov Nov 29, 2024
a674347
refactor: fetch batch messages on batch subscribe
ganchoradkov Nov 29, 2024
2040a43
Merge pull request #5514 from WalletConnect/fix/1ca-test-flakiness
ganchoradkov Nov 29, 2024
0d27ca5
chore: log sent/received requests
ganchoradkov Nov 29, 2024
0714e73
chore: fix logs
ganchoradkov Dec 2, 2024
621c2c6
chore: fix tests
ganchoradkov Dec 2, 2024
fd6c479
chore: disable batch fetch messages
ganchoradkov Dec 2, 2024
3b8b57e
refactor: disable fetch batch messages
ganchoradkov Dec 2, 2024
767664d
refactor: optimistic subscribe
ganchoradkov Dec 2, 2024
7ff9611
refactor: subscriber restart
ganchoradkov Dec 2, 2024
e48bc9f
chore: rm restartPromise await
ganchoradkov Dec 2, 2024
5334c60
refactor: subscriber start as part of connect flow
ganchoradkov Dec 2, 2024
c0fe4cc
refactor: awaits subscriber on connect
ganchoradkov Dec 2, 2024
6da2602
chore: throwing err after few attempts
ganchoradkov Dec 2, 2024
4147fd4
chore: comment out
ganchoradkov Dec 2, 2024
c4f23aa
chore: only log failed runs
ganchoradkov Dec 2, 2024
8572e46
chore: cleanup
ganchoradkov Dec 2, 2024
b6031b9
Merge branch 'v2.0' into fix/single-connection
ganchoradkov Dec 2, 2024
e26ccae
refactor: replaces magic number with variable and adds promise
ganchoradkov Dec 3, 2024
f545653
Merge branch 'fix/single-connection' of github.com:WalletConnect/wall…
ganchoradkov Dec 3, 2024
3e93bcf
chore: log error msg
ganchoradkov Dec 3, 2024
097d64c
refactor: uses utils for timeout
ganchoradkov Dec 5, 2024
799ddcf
chore: catch failed requests in tests
ganchoradkov Dec 5, 2024
cfb6723
refactor: async usage
ganchoradkov Dec 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"build:types": "tsc",
"build:source": "rollup --config rollup.config.js",
"build": "npm run build:pre; npm run build:source; npm run build:types",
"test:pre": "rm -rf ./test/*.db",
"test:pre": "rm -rf ./test/tmp",
"test:run": "vitest run --dir test",
"test": "npm run test:pre; npm run test:run",
"test:ignoreUnhandled": "npm run test:pre; npm run test:run -- --dangerouslyIgnoreUnhandledErrors",
Expand Down
129 changes: 82 additions & 47 deletions packages/core/src/controllers/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { HEARTBEAT_EVENTS } from "@walletconnect/heartbeat";
import { JsonRpcPayload, RequestArguments } from "@walletconnect/jsonrpc-types";
import { generateChildLogger, getLoggerContext, Logger } from "@walletconnect/logger";
import { RelayJsonRpc } from "@walletconnect/relay-api";
import { IPublisher, IRelayer, PublisherTypes, RelayerTypes } from "@walletconnect/types";
import { IPublisher, IRelayer, PublisherTypes } from "@walletconnect/types";
import {
getRelayProtocolApi,
getRelayProtocolName,
Expand All @@ -15,13 +15,17 @@ import { PUBLISHER_CONTEXT, PUBLISHER_DEFAULT_TTL, RELAYER_EVENTS } from "../con
import { getBigIntRpcId } from "@walletconnect/jsonrpc-utils";
import { ONE_MINUTE, ONE_SECOND, toMiliseconds } from "@walletconnect/time";

type IPublishType = PublisherTypes.Params & {
attestation?: string;
attempt: number;
};
export class Publisher extends IPublisher {
public events = new EventEmitter();
public name = PUBLISHER_CONTEXT;
public queue = new Map<string, PublisherTypes.Params>();
public queue = new Map<string, IPublishType>();

private publishTimeout = toMiliseconds(ONE_MINUTE);
private failedPublishTimeout = toMiliseconds(ONE_SECOND);
private initialPublishTimeout = toMiliseconds(ONE_SECOND * 15);
private needsTransportRestart = false;

constructor(public relayer: IRelayer, public logger: Logger) {
Expand Down Expand Up @@ -57,52 +61,65 @@ export class Publisher extends IPublisher {
},
};
const failedPublishMessage = `Failed to publish payload, please try again. id:${id} tag:${tag}`;
const startPublish = Date.now();
let result;
let attempts = 1;

try {
/**
* Loop until the publish is successful or the timeout is reached
* The loop allows to retry to retry the publish in case of disconnect
* attempt to publish the payload for <initialPublishTimeout> seconds,
* if the publish fails, add the payload to the queue and it will be retried on every pulse
* until it is successfully published or <publishTimeout> seconds have passed
*/
while (result === undefined) {
// Terminate the publishing attempts if publisTimeout has been exceeded
if (Date.now() - startPublish > this.publishTimeout) {
throw new Error(failedPublishMessage);
}

this.logger.trace({ id, attempts }, `publisher.publish - attempt ${attempts}`);
const publish = await createExpiringPromise(
this.rpcPublish(topic, message, ttl, relay, prompt, tag, id, opts?.attestation).catch(
(e) => this.logger.warn(e),
),
this.publishTimeout,
failedPublishMessage,
const publishPromise = new Promise(async (resolve) => {
const onPublish = ({ id }: { id: string }) => {
if (params.opts.id === id) {
this.removeRequestFromQueue(id);
this.relayer.events.removeListener(RELAYER_EVENTS.publish, onPublish);
resolve(params);
}
};
this.relayer.events.on(RELAYER_EVENTS.publish, onPublish);
const initialPublish = createExpiringPromise(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR but createExpiringPromise could use Promise.race to be simplified and avoid potential race conditions.
This approach eliminates the need for manual clearTimeout calls because once the Promise settles, the other Promise in the race is ignored, and the timer will be garbage collected.

export function createExpiringPromise<T>(
  promise: Promise<T>,
  expiry: number,
  expireErrorMessage?: string,
): Promise<T> {
  const timeoutPromise = new Promise<T>((_, reject) =>
    setTimeout(() => reject(new Error(expireErrorMessage)), expiry),
  );

  return Promise.race([promise, timeoutPromise]);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will test it out duing tech debt week

new Promise((resolve, reject) => {
ganchoradkov marked this conversation as resolved.
Show resolved Hide resolved
this.rpcPublish({
topic,
message,
ttl,
prompt,
tag,
id,
attestation: opts?.attestation,
})
.then(resolve)
.catch((e) => {
this.logger.warn(e, e?.message);
reject(e);
});
}),
this.initialPublishTimeout,
`Failed initial publish, retrying.... id:${id} tag:${tag}`,
);
result = await publish;
attempts++;

if (!result) {
// small delay before retrying so we can limit retry to max 1 time per second
// if network is down `rpcPublish` will throw immediately
await new Promise((resolve) => setTimeout(resolve, this.failedPublishTimeout));
try {
await initialPublish;
this.events.removeListener(RELAYER_EVENTS.publish, onPublish);
} catch (e) {
this.queue.set(id, { ...params, attempt: 1 });
this.logger.warn(e, (e as Error)?.message);
}
}
this.relayer.events.emit(RELAYER_EVENTS.publish, params);
this.logger.debug(`Successfully Published Payload`);
});
this.logger.trace({
type: "method",
method: "publish",
params: { id, topic, message, opts },
});

await createExpiringPromise(publishPromise, this.publishTimeout, failedPublishMessage);
} catch (e) {
this.logger.debug(`Failed to Publish Payload`);
this.logger.error(e as any);
if (opts?.internal?.throwOnFailedPublish) {
throw e;
}
this.queue.set(id, params);
} finally {
this.queue.delete(id);
ganchoradkov marked this conversation as resolved.
Show resolved Hide resolved
}
};

Expand All @@ -124,17 +141,17 @@ export class Publisher extends IPublisher {

// ---------- Private ----------------------------------------------- //

private rpcPublish(
topic: string,
message: string,
ttl: number,
relay: RelayerTypes.ProtocolOptions,
prompt?: boolean,
tag?: number,
id?: number,
attestation?: string,
) {
const api = getRelayProtocolApi(relay.protocol);
private async rpcPublish(params: {
topic: string;
message: string;
ttl?: number;
prompt?: boolean;
tag?: number;
id?: number;
attestation?: string;
}) {
const { topic, message, ttl = PUBLISHER_DEFAULT_TTL, prompt, tag, id, attestation } = params;
const api = getRelayProtocolApi(getRelayProtocolName().protocol);
const request: RequestArguments<RelayJsonRpc.PublishParams> = {
method: api.publish,
params: {
Expand All @@ -151,17 +168,35 @@ export class Publisher extends IPublisher {
if (isUndefined(request.params?.tag)) delete request.params?.tag;
this.logger.debug(`Outgoing Relay Payload`);
this.logger.trace({ type: "message", direction: "outgoing", request });
return this.relayer.request(request);
const result = await this.relayer.request(request);
this.relayer.events.emit(RELAYER_EVENTS.publish, params);
this.logger.debug(`Successfully Published Payload`);
return result;
}

private removeRequestFromQueue(id: string) {
this.queue.delete(id);
}

private checkQueue() {
this.queue.forEach(async (params) => {
const { topic, message, opts } = params;
await this.publish(topic, message, opts);
this.queue.forEach(async (params, id) => {
const attempt = params.attempt + 1;
this.queue.set(id, { ...params, attempt });
const { topic, message, opts, attestation } = params;
this.logger.warn(
{},
ganchoradkov marked this conversation as resolved.
Show resolved Hide resolved
`Publisher: queue->publishing: ${params.opts.id}, tag: ${params.opts.tag}, attempt: ${attempt}`,
);
await this.rpcPublish({
topic,
message,
ttl: opts.ttl,
prompt: opts.prompt,
tag: opts.tag,
id: opts.id,
attestation,
});
this.logger.warn({}, `Publisher: queue->published: ${params.opts.id}`);
});
}

Expand Down
Loading
Loading