From 5147ee328cf1056300dc652d89e9b793be9dc2bb Mon Sep 17 00:00:00 2001 From: Sasha Date: Sun, 27 Oct 2024 17:05:51 +0100 Subject: [PATCH 1/7] add message cache to Filter --- packages/sdk/src/protocols/filter/index.ts | 41 +++++++++----- .../sdk/src/protocols/filter/message_cache.ts | 53 +++++++++++++++++++ packages/sdk/src/protocols/peer_manager.ts | 20 ++++++- 3 files changed, 98 insertions(+), 16 deletions(-) create mode 100644 packages/sdk/src/protocols/filter/message_cache.ts diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index a70e484793..1d4cc4e656 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -1,20 +1,20 @@ import { ConnectionManager, FilterCore } from "@waku/core"; -import { - type Callback, - type CreateSubscriptionResult, - type IAsyncIterator, - type IDecodedMessage, - type IDecoder, - type IFilter, - type ILightPush, - type Libp2p, - NetworkConfig, - ProtocolError, - type PubsubTopic, - type SubscribeOptions, +import type { + Callback, + CreateSubscriptionResult, + IAsyncIterator, + IDecodedMessage, + IDecoder, + IFilter, + ILightPush, + IProtoMessage, + Libp2p, + PubsubTopic, + SubscribeOptions, SubscribeResult, - type Unsubscribe + Unsubscribe } from "@waku/interfaces"; +import { NetworkConfig, ProtocolError } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, groupByContentTopic, @@ -26,6 +26,7 @@ import { import { PeerManager } from "../peer_manager.js"; import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; +import { MessageCache } from "./message_cache.js"; import { SubscriptionManager } from "./subscription_manager.js"; const log = new Logger("sdk:filter"); @@ -33,6 +34,7 @@ const log = new Logger("sdk:filter"); class Filter implements IFilter { public readonly protocol: FilterCore; + private readonly messageCache: MessageCache; private activeSubscriptions = new Map(); public constructor( @@ -41,6 +43,8 @@ class Filter implements IFilter { private peerManager: PeerManager, private lightPush?: ILightPush ) { + this.messageCache = new MessageCache(libp2p); + this.protocol = new FilterCore( async (pubsubTopic, wakuMessage, peerIdStr) => { const subscription = this.getActiveSubscription(pubsubTopic); @@ -50,6 +54,15 @@ class Filter implements IFilter { ); return; } + + if (this.messageCache.has(pubsubTopic, wakuMessage as IProtoMessage)) { + log.info( + `Skipping duplicate message for pubsubTopic:${pubsubTopic} peerId:${peerIdStr}` + ); + return; + } + + this.messageCache.set(pubsubTopic, wakuMessage as IProtoMessage); await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, diff --git a/packages/sdk/src/protocols/filter/message_cache.ts b/packages/sdk/src/protocols/filter/message_cache.ts new file mode 100644 index 0000000000..5c28b34f43 --- /dev/null +++ b/packages/sdk/src/protocols/filter/message_cache.ts @@ -0,0 +1,53 @@ +import type { IProtoMessage, Libp2p } from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; + +type Hash = string; +type Timestamp = number; + +export class MessageCache { + private intervalID: number | undefined = undefined; + private readonly messages: Map = new Map(); + + public constructor(libp2p: Libp2p) { + this.onStart = this.onStart.bind(this); + this.onStop = this.onStop.bind(this); + + libp2p.addEventListener("start", this.onStart); + libp2p.addEventListener("stop", this.onStop); + } + + public set(pubsubTopic: string, message: IProtoMessage): void { + const hash = messageHashStr(pubsubTopic, message); + this.messages.set(hash, Date.now()); + } + + public has(pubsubTopic: string, message: IProtoMessage): boolean { + const hash = messageHashStr(pubsubTopic, message); + return this.messages.has(hash); + } + + private onStart(): void { + if (this.intervalID) { + return; + } + + this.intervalID = setInterval(() => { + this.prune(); + }, 60_000) as unknown as number; + } + + private onStop(): void { + if (!this.intervalID) { + return; + } + + clearInterval(this.intervalID); + } + + private prune(): void { + Array.from(this.messages.entries()) + .filter(([_, seenTimestamp]) => Date.now() - seenTimestamp >= 60_000) + .map(([hash, _]) => hash) + .forEach((hash) => this.messages.delete(hash)); + } +} diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts index c4d26d9ae4..80eba1b2ed 100644 --- a/packages/sdk/src/protocols/peer_manager.ts +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -22,6 +22,9 @@ export class PeerManager { private readonly libp2p: Libp2p; public constructor(params: PeerManagerParams) { + this.onConnected = this.onConnected.bind(this); + this.onDisconnected = this.onDisconnected.bind(this); + this.numPeersToUse = params?.config?.numPeersToUse || DEFAULT_NUM_PEERS_TO_USE; @@ -58,7 +61,18 @@ export class PeerManager { .map((c) => this.mapConnectionToPeer(c)) ); - return result[0]; + const newPeer = result[0]; + + if (!newPeer) { + log.warn(`requestRenew: Couldn't renew peer ${peerId.toString()}.`); + return; + } + + log.info( + `requestRenew: Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` + ); + + return newPeer; } private startConnectionListener(): void { @@ -107,7 +121,9 @@ export class PeerManager { } private lockConnection(c: Connection): Connection { - log.info(`Locking connection for peerId=${c.remotePeer.toString()}`); + log.info( + `requestRenew: Locking connection for peerId=${c.remotePeer.toString()}` + ); c.tags.push(CONNECTION_LOCK_TAG); return c; } From 7eb67ff751c675860cd2d2d57e58705509953d2d Mon Sep 17 00:00:00 2001 From: Sasha Date: Sun, 27 Oct 2024 17:41:36 +0100 Subject: [PATCH 2/7] remove WakuOptions and use only ProtocolCreateOptions --- packages/interfaces/src/protocols.ts | 12 +++--------- packages/relay/src/create.ts | 13 ++++--------- packages/sdk/src/create/create.ts | 6 +++--- packages/sdk/src/create/libp2p.ts | 12 +++++------- packages/sdk/src/waku/waku.ts | 16 +--------------- 5 files changed, 16 insertions(+), 43 deletions(-) diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 00d330152e..20cdcc77ee 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -24,17 +24,11 @@ export type NetworkConfig = StaticSharding | AutoSharding; export type ProtocolCreateOptions = { /** - * Configuration for determining the network in use. + * Set the user agent string to be used in identification of the node. * - * If using Static Sharding: - * Default value is configured for The Waku Network. - * The format to specify a shard is: clusterId: number, shards: number[] - * To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/). - * - * If using Auto Sharding: - * See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details. - * You cannot add or remove content topics after initialization of the node. + * @default "js-waku" */ + userAgent?: string; /** * Configuration for determining the network in use. diff --git a/packages/relay/src/create.ts b/packages/relay/src/create.ts index 448598162b..34e795b3f8 100644 --- a/packages/relay/src/create.ts +++ b/packages/relay/src/create.ts @@ -1,10 +1,5 @@ -import type { RelayNode } from "@waku/interfaces"; -import { - createLibp2pAndUpdateOptions, - CreateWakuNodeOptions, - WakuNode, - WakuOptions -} from "@waku/sdk"; +import type { ProtocolCreateOptions, RelayNode } from "@waku/interfaces"; +import { createLibp2pAndUpdateOptions, WakuNode } from "@waku/sdk"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js"; @@ -19,7 +14,7 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js"; * or use this function with caution. */ export async function createRelayNode( - options: CreateWakuNodeOptions & Partial + options: ProtocolCreateOptions & Partial ): Promise { options = { ...options, @@ -36,7 +31,7 @@ export async function createRelayNode( return new WakuNode( pubsubTopics, - options as WakuOptions, + options as ProtocolCreateOptions, libp2p, {}, relay diff --git a/packages/sdk/src/create/create.ts b/packages/sdk/src/create/create.ts index 155fee3403..331d1c2862 100644 --- a/packages/sdk/src/create/create.ts +++ b/packages/sdk/src/create/create.ts @@ -1,6 +1,6 @@ -import { type LightNode } from "@waku/interfaces"; +import type { LightNode, ProtocolCreateOptions } from "@waku/interfaces"; -import { CreateWakuNodeOptions, WakuNode } from "../waku/index.js"; +import { WakuNode } from "../waku/index.js"; import { createLibp2pAndUpdateOptions } from "./libp2p.js"; @@ -10,7 +10,7 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js"; * Uses Waku Filter V2 by default. */ export async function createLightNode( - options: CreateWakuNodeOptions = {} + options: ProtocolCreateOptions = {} ): Promise { const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); diff --git a/packages/sdk/src/create/libp2p.ts b/packages/sdk/src/create/libp2p.ts index e979487f9b..6efb17d1e7 100644 --- a/packages/sdk/src/create/libp2p.ts +++ b/packages/sdk/src/create/libp2p.ts @@ -12,17 +12,12 @@ import { type IMetadata, type Libp2p, type Libp2pComponents, + type ProtocolCreateOptions, PubsubTopic } from "@waku/interfaces"; import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; import { createLibp2p } from "libp2p"; -import { - CreateWakuNodeOptions, - DefaultPingMaxInboundStreams, - DefaultUserAgent -} from "../waku/index.js"; - import { defaultPeerDiscoveries } from "./discovery.js"; type MetadataService = { @@ -31,6 +26,9 @@ type MetadataService = { const log = new Logger("sdk:create"); +const DefaultUserAgent = "js-waku"; +const DefaultPingMaxInboundStreams = 10; + export async function defaultLibp2p( pubsubTopics: PubsubTopic[], options?: Partial, @@ -78,7 +76,7 @@ export async function defaultLibp2p( } export async function createLibp2pAndUpdateOptions( - options: CreateWakuNodeOptions + options: ProtocolCreateOptions ): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> { const { networkConfig } = options; const pubsubTopics = derivePubsubTopicsFromNetworkConfig( diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 96706e1a13..4700c372e9 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -24,22 +24,8 @@ import { ReliabilityMonitorManager } from "../reliability_monitor/index.js"; import { waitForRemotePeer } from "./wait_for_remote_peer.js"; -export const DefaultUserAgent = "js-waku"; -export const DefaultPingMaxInboundStreams = 10; - const log = new Logger("waku"); -export interface WakuOptions { - /** - * Set the user agent string to be used in identification of the node. - * @default {@link @waku/core.DefaultUserAgent} - */ - userAgent?: string; -} - -export type CreateWakuNodeOptions = ProtocolCreateOptions & - Partial; - type ProtocolsEnabled = { filter?: boolean; lightpush?: boolean; @@ -59,7 +45,7 @@ export class WakuNode implements IWaku { public constructor( public readonly pubsubTopics: PubsubTopic[], - options: CreateWakuNodeOptions, + options: ProtocolCreateOptions, libp2p: Libp2p, protocolsEnabled: ProtocolsEnabled, relay?: IRelay From 09127dfc9187bfc15dcd35330553de0de602e093 Mon Sep 17 00:00:00 2001 From: Sasha Date: Sun, 27 Oct 2024 18:13:26 +0100 Subject: [PATCH 3/7] move subscribe options to createLightNode Fitler protocol options --- packages/interfaces/src/filter.ts | 32 ++++++++++++++----- packages/interfaces/src/protocols.ts | 10 +++++- .../sdk/src/protocols/filter/constants.ts | 6 +--- packages/sdk/src/protocols/filter/index.ts | 27 ++++++++-------- .../protocols/filter/subscription_manager.ts | 26 ++++++--------- packages/sdk/src/protocols/filter/utils.ts | 15 +++++++++ packages/sdk/src/waku/waku.ts | 3 +- 7 files changed, 74 insertions(+), 45 deletions(-) create mode 100644 packages/sdk/src/protocols/filter/utils.ts diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index f992a497a7..7d5168e27b 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -15,17 +15,34 @@ export type SubscriptionCallback = { callback: Callback; }; -export type SubscribeOptions = { - keepAlive?: number; - pingsBeforePeerRenewed?: number; - enableLightPushFilterCheck?: boolean; +export type FilterProtocolOptions = { + /** + * Interval with which Filter subscription will attempt to send ping requests to subscribed peers. + * + * @default 60_000 + */ + keepAliveIntervalMs: number; + + /** + * Number of failed pings allowed to make to a remote peer before attempting to subscribe to a new one. + * + * @default 3 + */ + pingsBeforePeerRenewed: number; + + /** + * Enables js-waku to send probe LightPush message over subscribed pubsubTopics on created subscription. + * In case message won't be received back through Filter - js-waku will attempt to subscribe to another peer. + * + * @default false + */ + enableLightPushFilterCheck: boolean; }; export interface ISubscription { subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - options?: SubscribeOptions + callback: Callback ): Promise; unsubscribe(contentTopics: ContentTopic[]): Promise; @@ -38,8 +55,7 @@ export interface ISubscription { export type IFilter = IReceiver & { protocol: IBaseProtocolCore } & { subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - subscribeOptions?: SubscribeOptions + callback: Callback ): Promise; }; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 20cdcc77ee..b386e1d4fb 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; import type { ConnectionManagerOptions } from "./connection_manager.js"; +import type { FilterProtocolOptions } from "./filter.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; import { ThisAndThat, ThisOrThat } from "./misc.js"; @@ -86,9 +87,16 @@ export type ProtocolCreateOptions = { bootstrapPeers?: string[]; /** - * Configuration for connection manager. If not specified - default values are applied. + * Configuration for connection manager. + * If not specified - default values are applied. */ connectionManager?: Partial; + + /** + * Configuration for Filter protocol. + * If not specified - default values are applied. + */ + filter?: Partial; }; export type Callback = ( diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 9477a7e417..90ec3794b6 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,8 +1,4 @@ export const DEFAULT_KEEP_ALIVE = 60_000; +export const DEFAULT_MAX_PINGS = 3; export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false; export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000; - -export const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: DEFAULT_KEEP_ALIVE, - enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK -}; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index 1d4cc4e656..f78e4d8ae7 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -2,6 +2,7 @@ import { ConnectionManager, FilterCore } from "@waku/core"; import type { Callback, CreateSubscriptionResult, + FilterProtocolOptions, IAsyncIterator, IDecodedMessage, IDecoder, @@ -10,7 +11,6 @@ import type { IProtoMessage, Libp2p, PubsubTopic, - SubscribeOptions, SubscribeResult, Unsubscribe } from "@waku/interfaces"; @@ -25,15 +25,16 @@ import { import { PeerManager } from "../peer_manager.js"; -import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; import { MessageCache } from "./message_cache.js"; import { SubscriptionManager } from "./subscription_manager.js"; +import { buildConfig } from "./utils.js"; const log = new Logger("sdk:filter"); class Filter implements IFilter { public readonly protocol: FilterCore; + private readonly config: FilterProtocolOptions; private readonly messageCache: MessageCache; private activeSubscriptions = new Map(); @@ -41,8 +42,10 @@ class Filter implements IFilter { private connectionManager: ConnectionManager, private libp2p: Libp2p, private peerManager: PeerManager, - private lightPush?: ILightPush + private lightPush?: ILightPush, + config?: Partial ) { + this.config = buildConfig(config); this.messageCache = new MessageCache(libp2p); this.protocol = new FilterCore( @@ -79,7 +82,6 @@ class Filter implements IFilter { * * @param {IDecoder | IDecoder[]} decoders - A single decoder or an array of decoders to use for decoding messages. * @param {Callback} callback - The callback function to be invoked with decoded messages. - * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription. * * @returns {Promise} A promise that resolves to an object containing: * - subscription: The created subscription object if successful, or null if failed. @@ -113,8 +115,7 @@ class Filter implements IFilter { */ public async subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + callback: Callback ): Promise { const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); @@ -140,8 +141,7 @@ class Filter implements IFilter { const { failures, successes } = await subscription.subscribe( decoders, - callback, - subscribeOptions + callback ); return { subscription, @@ -192,6 +192,7 @@ class Filter implements IFilter { this.connectionManager, this.peerManager, this.libp2p, + this.config, this.lightPush ) ); @@ -219,8 +220,7 @@ class Filter implements IFilter { */ public async subscribeWithUnsubscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + callback: Callback ): Promise { const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); @@ -244,7 +244,7 @@ class Filter implements IFilter { throw Error(`Failed to create subscription: ${error}`); } - await subscription.subscribe(decoders, callback, options); + await subscription.subscribe(decoders, callback); const contentTopics = Array.from( groupByContentTopic( @@ -298,8 +298,9 @@ class Filter implements IFilter { export function wakuFilter( connectionManager: ConnectionManager, peerManager: PeerManager, - lightPush?: ILightPush + lightPush?: ILightPush, + config?: Partial ): (libp2p: Libp2p) => IFilter { return (libp2p: Libp2p) => - new Filter(connectionManager, libp2p, peerManager, lightPush); + new Filter(connectionManager, libp2p, peerManager, lightPush, config); } diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 59af175121..21bc45a446 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -12,6 +12,7 @@ import { type ContentTopic, type CoreProtocolResult, EConnectionStateEvents, + FilterProtocolOptions, type IDecodedMessage, type IDecoder, type ILightPush, @@ -22,7 +23,6 @@ import { ProtocolError, type PubsubTopic, type SDKProtocolResult, - type SubscribeOptions, SubscriptionCallback } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; @@ -32,23 +32,17 @@ import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js"; import { PeerManager } from "../peer_manager.js"; -import { - DEFAULT_KEEP_ALIVE, - DEFAULT_LIGHT_PUSH_FILTER_CHECK, - DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL, - DEFAULT_SUBSCRIBE_OPTIONS -} from "./constants.js"; +import { DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL } from "./constants.js"; const log = new Logger("sdk:filter:subscription_manager"); export class SubscriptionManager implements ISubscription { private reliabilityMonitor: ReceiverReliabilityMonitor; - private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE; + private keepAliveTimeout: number; + private enableLightPushFilterCheck: boolean; private keepAliveInterval: ReturnType | null = null; - private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK; - private subscriptionCallbacks: Map< ContentTopic, SubscriptionCallback @@ -60,6 +54,7 @@ export class SubscriptionManager implements ISubscription { private readonly connectionManager: ConnectionManager, private readonly peerManager: PeerManager, private readonly libp2p: Libp2p, + config: FilterProtocolOptions, private readonly lightPush?: ILightPush ) { this.pubsubTopic = pubsubTopic; @@ -72,18 +67,15 @@ export class SubscriptionManager implements ISubscription { this.protocol.subscribe.bind(this.protocol), this.sendLightPushCheckMessage.bind(this) ); + this.reliabilityMonitor.setMaxPingFailures(config.pingsBeforePeerRenewed); + this.keepAliveTimeout = config.keepAliveIntervalMs; + this.enableLightPushFilterCheck = config.enableLightPushFilterCheck; } public async subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + callback: Callback ): Promise { - this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed); - this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE; - this.enableLightPushFilterCheck = - options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK; - const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; // check that all decoders are configured for the same pubsub topic as this subscription diff --git a/packages/sdk/src/protocols/filter/utils.ts b/packages/sdk/src/protocols/filter/utils.ts new file mode 100644 index 0000000000..9c926ae36c --- /dev/null +++ b/packages/sdk/src/protocols/filter/utils.ts @@ -0,0 +1,15 @@ +import { FilterProtocolOptions } from "@waku/interfaces"; + +import * as C from "./constants.js"; + +export const buildConfig = ( + config?: Partial +): FilterProtocolOptions => { + return { + keepAliveIntervalMs: config?.keepAliveIntervalMs || C.DEFAULT_KEEP_ALIVE, + pingsBeforePeerRenewed: + config?.pingsBeforePeerRenewed || C.DEFAULT_MAX_PINGS, + enableLightPushFilterCheck: + config?.enableLightPushFilterCheck || C.DEFAULT_LIGHT_PUSH_FILTER_CHECK + }; +}; diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 4700c372e9..c2f3a339e2 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -92,7 +92,8 @@ export class WakuNode implements IWaku { const filter = wakuFilter( this.connectionManager, this.peerManager, - this.lightPush + this.lightPush, + options.filter ); this.filter = filter(libp2p); } From 2a5df25ca98254b714aa5629aefbc689fbf810f6 Mon Sep 17 00:00:00 2001 From: Sasha Date: Sun, 27 Oct 2024 18:36:39 +0100 Subject: [PATCH 4/7] rename SubscriptionManager to Subscription --- packages/sdk/src/protocols/filter/index.ts | 13 ++++++------- .../{subscription_manager.ts => subscription.ts} | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) rename packages/sdk/src/protocols/filter/{subscription_manager.ts => subscription.ts} (99%) diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index f78e4d8ae7..07e7ec7876 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -26,7 +26,7 @@ import { import { PeerManager } from "../peer_manager.js"; import { MessageCache } from "./message_cache.js"; -import { SubscriptionManager } from "./subscription_manager.js"; +import { Subscription } from "./subscription.js"; import { buildConfig } from "./utils.js"; const log = new Logger("sdk:filter"); @@ -36,7 +36,7 @@ class Filter implements IFilter { private readonly config: FilterProtocolOptions; private readonly messageCache: MessageCache; - private activeSubscriptions = new Map(); + private activeSubscriptions = new Map(); public constructor( private connectionManager: ConnectionManager, @@ -186,7 +186,7 @@ class Filter implements IFilter { this.getActiveSubscription(pubsubTopic) ?? this.setActiveSubscription( pubsubTopic, - new SubscriptionManager( + new Subscription( pubsubTopic, this.protocol, this.connectionManager, @@ -263,17 +263,16 @@ class Filter implements IFilter { return toAsyncIterator(this, decoders); } - //TODO: move to SubscriptionManager private getActiveSubscription( pubsubTopic: PubsubTopic - ): SubscriptionManager | undefined { + ): Subscription | undefined { return this.activeSubscriptions.get(pubsubTopic); } private setActiveSubscription( pubsubTopic: PubsubTopic, - subscription: SubscriptionManager - ): SubscriptionManager { + subscription: Subscription + ): Subscription { this.activeSubscriptions.set(pubsubTopic, subscription); return subscription; } diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription.ts similarity index 99% rename from packages/sdk/src/protocols/filter/subscription_manager.ts rename to packages/sdk/src/protocols/filter/subscription.ts index 21bc45a446..3534f26a7f 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription.ts @@ -36,7 +36,7 @@ import { DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL } from "./constants.js"; const log = new Logger("sdk:filter:subscription_manager"); -export class SubscriptionManager implements ISubscription { +export class Subscription implements ISubscription { private reliabilityMonitor: ReceiverReliabilityMonitor; private keepAliveTimeout: number; From 99130578522852034e8805869e8d18b46a491c8c Mon Sep 17 00:00:00 2001 From: Sasha Date: Wed, 29 Jan 2025 00:28:52 +0100 Subject: [PATCH 5/7] rename to CreateNodeOptions --- packages/interfaces/src/protocols.ts | 2 +- packages/relay/src/create.ts | 6 +++--- packages/relay/src/relay.ts | 4 ++-- packages/sdk/src/create/create.ts | 4 ++-- packages/sdk/src/create/libp2p.ts | 4 ++-- packages/sdk/src/waku/waku.ts | 4 ++-- packages/tests/src/lib/runNodes.ts | 8 ++------ packages/tests/src/utils/nodes.ts | 4 ++-- packages/tests/tests/filter/utils.ts | 4 ++-- packages/utils/src/common/sharding/type_guards.ts | 6 +++--- 10 files changed, 21 insertions(+), 25 deletions(-) diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index b6d4297c8d..40fc66542d 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -24,7 +24,7 @@ export type IBaseProtocolCore = { export type NetworkConfig = StaticSharding | AutoSharding; -export type ProtocolCreateOptions = { +export type CreateNodeOptions = { /** * Set the user agent string to be used in identification of the node. * diff --git a/packages/relay/src/create.ts b/packages/relay/src/create.ts index 34e795b3f8..476e71bf44 100644 --- a/packages/relay/src/create.ts +++ b/packages/relay/src/create.ts @@ -1,4 +1,4 @@ -import type { ProtocolCreateOptions, RelayNode } from "@waku/interfaces"; +import type { CreateNodeOptions, RelayNode } from "@waku/interfaces"; import { createLibp2pAndUpdateOptions, WakuNode } from "@waku/sdk"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js"; @@ -14,7 +14,7 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js"; * or use this function with caution. */ export async function createRelayNode( - options: ProtocolCreateOptions & Partial + options: CreateNodeOptions & Partial ): Promise { options = { ...options, @@ -31,7 +31,7 @@ export async function createRelayNode( return new WakuNode( pubsubTopics, - options as ProtocolCreateOptions, + options as CreateNodeOptions, libp2p, {}, relay diff --git a/packages/relay/src/relay.ts b/packages/relay/src/relay.ts index ed1cc0d556..8c7e0c1cac 100644 --- a/packages/relay/src/relay.ts +++ b/packages/relay/src/relay.ts @@ -11,6 +11,7 @@ import { sha256 } from "@noble/hashes/sha256"; import { ActiveSubscriptions, Callback, + CreateNodeOptions, IAsyncIterator, IDecodedMessage, IDecoder, @@ -18,7 +19,6 @@ import { IMessage, IRelay, Libp2p, - ProtocolCreateOptions, ProtocolError, PubsubTopic, SDKProtocolResult @@ -39,7 +39,7 @@ export type Observer = { callback: Callback; }; -export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts; +export type RelayCreateOptions = CreateNodeOptions & GossipsubOpts; export type ContentTopic = string; /** diff --git a/packages/sdk/src/create/create.ts b/packages/sdk/src/create/create.ts index 331d1c2862..ccd519e7bc 100644 --- a/packages/sdk/src/create/create.ts +++ b/packages/sdk/src/create/create.ts @@ -1,4 +1,4 @@ -import type { LightNode, ProtocolCreateOptions } from "@waku/interfaces"; +import type { CreateNodeOptions, LightNode } from "@waku/interfaces"; import { WakuNode } from "../waku/index.js"; @@ -10,7 +10,7 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js"; * Uses Waku Filter V2 by default. */ export async function createLightNode( - options: ProtocolCreateOptions = {} + options: CreateNodeOptions = {} ): Promise { const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); diff --git a/packages/sdk/src/create/libp2p.ts b/packages/sdk/src/create/libp2p.ts index 4c96109a54..1b4afce675 100644 --- a/packages/sdk/src/create/libp2p.ts +++ b/packages/sdk/src/create/libp2p.ts @@ -8,11 +8,11 @@ import { all as filterAll, wss } from "@libp2p/websockets/filters"; import { wakuMetadata } from "@waku/core"; import { type CreateLibp2pOptions, + type CreateNodeOptions, DefaultNetworkConfig, type IMetadata, type Libp2p, type Libp2pComponents, - type ProtocolCreateOptions, PubsubTopic } from "@waku/interfaces"; import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; @@ -78,7 +78,7 @@ export async function defaultLibp2p( } export async function createLibp2pAndUpdateOptions( - options: ProtocolCreateOptions + options: CreateNodeOptions ): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> { const { networkConfig } = options; const pubsubTopics = derivePubsubTopicsFromNetworkConfig( diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 9b771e078c..f126e0e460 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -3,6 +3,7 @@ import type { Peer, PeerId, Stream } from "@libp2p/interface"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core"; import type { + CreateNodeOptions, IFilter, IHealthManager, ILightPush, @@ -10,7 +11,6 @@ import type { IStore, IWaku, Libp2p, - ProtocolCreateOptions, PubsubTopic } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; @@ -45,7 +45,7 @@ export class WakuNode implements IWaku { public constructor( public readonly pubsubTopics: PubsubTopic[], - options: ProtocolCreateOptions, + options: CreateNodeOptions, libp2p: Libp2p, protocolsEnabled: ProtocolsEnabled, relay?: IRelay diff --git a/packages/tests/src/lib/runNodes.ts b/packages/tests/src/lib/runNodes.ts index adb37170f8..b7d02d0c13 100644 --- a/packages/tests/src/lib/runNodes.ts +++ b/packages/tests/src/lib/runNodes.ts @@ -1,8 +1,4 @@ -import { - NetworkConfig, - ProtocolCreateOptions, - Protocols -} from "@waku/interfaces"; +import { CreateNodeOptions, NetworkConfig, Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; import { createLightNode, WakuNode } from "@waku/sdk"; import { @@ -46,7 +42,7 @@ export async function runNodes( }, { retries: 3 } ); - const waku_options: ProtocolCreateOptions = { + const waku_options: CreateNodeOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, networkConfig: shardInfo diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index 920bb392ba..c3e8cc7a9b 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -1,9 +1,9 @@ import { + CreateNodeOptions, DefaultNetworkConfig, IWaku, LightNode, NetworkConfig, - ProtocolCreateOptions, Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; @@ -35,7 +35,7 @@ export async function runMultipleNodes( withoutFilter ); - const wakuOptions: ProtocolCreateOptions = { + const wakuOptions: CreateNodeOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 649c302cf0..938ac16d99 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,11 +1,11 @@ import { createDecoder, createEncoder } from "@waku/core"; import { + CreateNodeOptions, DefaultNetworkConfig, ISubscription, IWaku, LightNode, NetworkConfig, - ProtocolCreateOptions, Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; @@ -85,7 +85,7 @@ export async function runMultipleNodes( withoutFilter ); - const wakuOptions: ProtocolCreateOptions = { + const wakuOptions: CreateNodeOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } diff --git a/packages/utils/src/common/sharding/type_guards.ts b/packages/utils/src/common/sharding/type_guards.ts index b0433e11cc..9ab53373aa 100644 --- a/packages/utils/src/common/sharding/type_guards.ts +++ b/packages/utils/src/common/sharding/type_guards.ts @@ -1,11 +1,11 @@ import type { ContentTopicInfo, - ProtocolCreateOptions, + CreateNodeOptions, StaticSharding } from "@waku/interfaces"; export function isStaticSharding( - config: NonNullable + config: NonNullable ): config is StaticSharding { return ( "clusterId" in config && "shards" in config && !("contentTopics" in config) @@ -13,7 +13,7 @@ export function isStaticSharding( } export function isAutoSharding( - config: NonNullable + config: NonNullable ): config is ContentTopicInfo { return "contentTopics" in config; } From befbd468851f45d08c069bb8276f26517a9296ac Mon Sep 17 00:00:00 2001 From: Sasha Date: Wed, 29 Jan 2025 00:32:11 +0100 Subject: [PATCH 6/7] add warning --- packages/sdk/src/protocols/peer_manager.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts index 80eba1b2ed..4b874ce89c 100644 --- a/packages/sdk/src/protocols/peer_manager.ts +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -64,7 +64,9 @@ export class PeerManager { const newPeer = result[0]; if (!newPeer) { - log.warn(`requestRenew: Couldn't renew peer ${peerId.toString()}.`); + log.warn( + `requestRenew: Couldn't renew peer ${peerId.toString()} - no peers.` + ); return; } From b52653ffa45ba7db0161dc98e0f996d480abd598 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Wed, 29 Jan 2025 17:39:33 +0100 Subject: [PATCH 7/7] feat: introduce subscription manager (#2202) * feat: inroduce subscription manager * fix: make pipeline succeed (#2238) * fix test * use hardcoded value * update playwright * fix test:browser --- .github/workflows/ci.yml | 2 +- .github/workflows/playwright.yml | 2 +- packages/browser-tests/package.json | 2 +- .../sdk/src/protocols/filter/constants.ts | 1 - packages/sdk/src/protocols/filter/index.ts | 12 - .../sdk/src/protocols/filter/message_cache.ts | 53 ---- .../sdk/src/protocols/filter/subscription.ts | 249 +++------------ .../protocols/filter/subscription_monitor.ts | 287 ++++++++++++++++++ packages/sdk/src/reliability_monitor/index.ts | 56 ---- .../sdk/src/reliability_monitor/receiver.ts | 185 ----------- packages/sdk/src/waku/waku.ts | 2 - .../tests/wait_for_remote_peer.node.spec.ts | 20 +- packages/tests/tests/waku.node.spec.ts | 5 +- 13 files changed, 345 insertions(+), 531 deletions(-) delete mode 100644 packages/sdk/src/protocols/filter/message_cache.ts create mode 100644 packages/sdk/src/protocols/filter/subscription_monitor.ts delete mode 100644 packages/sdk/src/reliability_monitor/index.ts delete mode 100644 packages/sdk/src/reliability_monitor/receiver.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b92f2eb33f..49f5a52038 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,7 +57,7 @@ jobs: browser: runs-on: ubuntu-latest container: - image: mcr.microsoft.com/playwright:v1.48.0-jammy + image: mcr.microsoft.com/playwright:v1.50.0-jammy env: HOME: "/root" steps: diff --git a/.github/workflows/playwright.yml b/.github/workflows/playwright.yml index 2caeab4c91..82009a92f7 100644 --- a/.github/workflows/playwright.yml +++ b/.github/workflows/playwright.yml @@ -20,7 +20,7 @@ jobs: timeout-minutes: 60 runs-on: ubuntu-latest container: - image: mcr.microsoft.com/playwright:v1.48.0-jammy + image: mcr.microsoft.com/playwright:v1.50.0-jammy steps: - uses: actions/checkout@v3 - uses: actions/setup-node@v3 diff --git a/packages/browser-tests/package.json b/packages/browser-tests/package.json index 81244df52c..3e9810b148 100644 --- a/packages/browser-tests/package.json +++ b/packages/browser-tests/package.json @@ -11,7 +11,7 @@ "test": "npx playwright test" }, "devDependencies": { - "@playwright/test": "^1.48.1", + "@playwright/test": "^1.50.0", "@waku/create-app": "^0.1.1-504bcd4", "dotenv-flow": "^4.1.0", "serve": "^14.2.3" diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 90ec3794b6..43fbf11374 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,4 +1,3 @@ export const DEFAULT_KEEP_ALIVE = 60_000; export const DEFAULT_MAX_PINGS = 3; export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false; -export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index 07e7ec7876..1e4b6d86bf 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -8,7 +8,6 @@ import type { IDecoder, IFilter, ILightPush, - IProtoMessage, Libp2p, PubsubTopic, SubscribeResult, @@ -25,7 +24,6 @@ import { import { PeerManager } from "../peer_manager.js"; -import { MessageCache } from "./message_cache.js"; import { Subscription } from "./subscription.js"; import { buildConfig } from "./utils.js"; @@ -35,7 +33,6 @@ class Filter implements IFilter { public readonly protocol: FilterCore; private readonly config: FilterProtocolOptions; - private readonly messageCache: MessageCache; private activeSubscriptions = new Map(); public constructor( @@ -46,7 +43,6 @@ class Filter implements IFilter { config?: Partial ) { this.config = buildConfig(config); - this.messageCache = new MessageCache(libp2p); this.protocol = new FilterCore( async (pubsubTopic, wakuMessage, peerIdStr) => { @@ -58,14 +54,6 @@ class Filter implements IFilter { return; } - if (this.messageCache.has(pubsubTopic, wakuMessage as IProtoMessage)) { - log.info( - `Skipping duplicate message for pubsubTopic:${pubsubTopic} peerId:${peerIdStr}` - ); - return; - } - - this.messageCache.set(pubsubTopic, wakuMessage as IProtoMessage); await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, diff --git a/packages/sdk/src/protocols/filter/message_cache.ts b/packages/sdk/src/protocols/filter/message_cache.ts deleted file mode 100644 index 5c28b34f43..0000000000 --- a/packages/sdk/src/protocols/filter/message_cache.ts +++ /dev/null @@ -1,53 +0,0 @@ -import type { IProtoMessage, Libp2p } from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; - -type Hash = string; -type Timestamp = number; - -export class MessageCache { - private intervalID: number | undefined = undefined; - private readonly messages: Map = new Map(); - - public constructor(libp2p: Libp2p) { - this.onStart = this.onStart.bind(this); - this.onStop = this.onStop.bind(this); - - libp2p.addEventListener("start", this.onStart); - libp2p.addEventListener("stop", this.onStop); - } - - public set(pubsubTopic: string, message: IProtoMessage): void { - const hash = messageHashStr(pubsubTopic, message); - this.messages.set(hash, Date.now()); - } - - public has(pubsubTopic: string, message: IProtoMessage): boolean { - const hash = messageHashStr(pubsubTopic, message); - return this.messages.has(hash); - } - - private onStart(): void { - if (this.intervalID) { - return; - } - - this.intervalID = setInterval(() => { - this.prune(); - }, 60_000) as unknown as number; - } - - private onStop(): void { - if (!this.intervalID) { - return; - } - - clearInterval(this.intervalID); - } - - private prune(): void { - Array.from(this.messages.entries()) - .filter(([_, seenTimestamp]) => Date.now() - seenTimestamp >= 60_000) - .map(([hash, _]) => hash) - .forEach((hash) => this.messages.delete(hash)); - } -} diff --git a/packages/sdk/src/protocols/filter/subscription.ts b/packages/sdk/src/protocols/filter/subscription.ts index 3534f26a7f..c585a05408 100644 --- a/packages/sdk/src/protocols/filter/subscription.ts +++ b/packages/sdk/src/protocols/filter/subscription.ts @@ -1,17 +1,8 @@ -import type { Peer } from "@libp2p/interface"; -import type { PeerId } from "@libp2p/interface"; -import { - ConnectionManager, - createDecoder, - createEncoder, - FilterCore, - LightPushCore -} from "@waku/core"; +import { ConnectionManager, createDecoder, FilterCore } from "@waku/core"; import { type Callback, type ContentTopic, type CoreProtocolResult, - EConnectionStateEvents, FilterProtocolOptions, type IDecodedMessage, type IDecoder, @@ -28,48 +19,41 @@ import { import { WakuMessage } from "@waku/proto"; import { groupByContentTopic, Logger } from "@waku/utils"; -import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; -import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js"; import { PeerManager } from "../peer_manager.js"; -import { DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL } from "./constants.js"; +import { SubscriptionMonitor } from "./subscription_monitor.js"; -const log = new Logger("sdk:filter:subscription_manager"); +const log = new Logger("sdk:filter:subscription"); export class Subscription implements ISubscription { - private reliabilityMonitor: ReceiverReliabilityMonitor; - - private keepAliveTimeout: number; - private enableLightPushFilterCheck: boolean; - private keepAliveInterval: ReturnType | null = null; + private readonly monitor: SubscriptionMonitor; private subscriptionCallbacks: Map< ContentTopic, SubscriptionCallback - >; + > = new Map(); public constructor( private readonly pubsubTopic: PubsubTopic, private readonly protocol: FilterCore, - private readonly connectionManager: ConnectionManager, - private readonly peerManager: PeerManager, - private readonly libp2p: Libp2p, - config: FilterProtocolOptions, - private readonly lightPush?: ILightPush + connectionManager: ConnectionManager, + peerManager: PeerManager, + libp2p: Libp2p, + private readonly config: FilterProtocolOptions, + lightPush?: ILightPush ) { this.pubsubTopic = pubsubTopic; - this.subscriptionCallbacks = new Map(); - - this.reliabilityMonitor = ReliabilityMonitorManager.createReceiverMonitor( - this.pubsubTopic, - this.peerManager, - () => Array.from(this.subscriptionCallbacks.keys()), - this.protocol.subscribe.bind(this.protocol), - this.sendLightPushCheckMessage.bind(this) - ); - this.reliabilityMonitor.setMaxPingFailures(config.pingsBeforePeerRenewed); - this.keepAliveTimeout = config.keepAliveIntervalMs; - this.enableLightPushFilterCheck = config.enableLightPushFilterCheck; + + this.monitor = new SubscriptionMonitor({ + pubsubTopic, + config, + libp2p, + connectionManager, + filter: protocol, + peerManager, + lightPush, + activeSubscriptions: this.subscriptionCallbacks + }); } public async subscribe( @@ -92,10 +76,10 @@ export class Subscription implements ISubscription { } } - if (this.enableLightPushFilterCheck) { + if (this.config.enableLightPushFilterCheck) { decodersArray.push( createDecoder( - this.buildLightPushContentTopic(), + this.monitor.reservedContentTopic, this.pubsubTopic ) as IDecoder ); @@ -104,10 +88,10 @@ export class Subscription implements ISubscription { const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); - const peers = await this.peerManager.getPeers(); - const promises = peers.map(async (peer) => - this.subscribeWithPeerVerification(peer, contentTopics) - ); + const peers = await this.monitor.getPeers(); + const promises = peers.map(async (peer) => { + return this.protocol.subscribe(this.pubsubTopic, peer, contentTopics); + }); const results = await Promise.allSettled(promises); @@ -125,7 +109,7 @@ export class Subscription implements ISubscription { } as unknown as SubscriptionCallback; // don't handle case of internal content topic - if (contentTopic === this.buildLightPushContentTopic()) { + if (contentTopic === this.monitor.reservedContentTopic) { return; } @@ -134,7 +118,7 @@ export class Subscription implements ISubscription { this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); }); - this.startSubscriptionsMaintenance(this.keepAliveTimeout); + this.monitor.start(); return finalResult; } @@ -142,7 +126,7 @@ export class Subscription implements ISubscription { public async unsubscribe( contentTopics: ContentTopic[] ): Promise { - const peers = await this.peerManager.getPeers(); + const peers = await this.monitor.getPeers(); const promises = peers.map(async (peer) => { const response = await this.protocol.unsubscribe( this.pubsubTopic, @@ -161,26 +145,22 @@ export class Subscription implements ISubscription { const finalResult = this.handleResult(results, "unsubscribe"); if (this.subscriptionCallbacks.size === 0) { - this.stopSubscriptionsMaintenance(); + this.monitor.stop(); } return finalResult; } - public async ping(peerId?: PeerId): Promise { - log.info("Sending keep-alive ping"); - const peers = peerId - ? [peerId] - : (await this.peerManager.getPeers()).map((peer) => peer.id); + public async ping(): Promise { + const peers = await this.monitor.getPeers(); + const promises = peers.map((peer) => this.protocol.ping(peer)); - const promises = peers.map((peerId) => this.pingSpecificPeer(peerId)); const results = await Promise.allSettled(promises); - return this.handleResult(results, "ping"); } public async unsubscribeAll(): Promise { - const peers = await this.peerManager.getPeers(); + const peers = await this.monitor.getPeers(); const promises = peers.map(async (peer) => this.protocol.unsubscribeAll(this.pubsubTopic, peer) ); @@ -191,7 +171,7 @@ export class Subscription implements ISubscription { const finalResult = this.handleResult(results, "unsubscribeAll"); - this.stopSubscriptionsMaintenance(); + this.monitor.stop(); return finalResult; } @@ -200,12 +180,12 @@ export class Subscription implements ISubscription { message: WakuMessage, peerIdStr: PeerIdStr ): Promise { - const alreadyReceived = this.reliabilityMonitor.notifyMessageReceived( + const received = this.monitor.notifyMessageReceived( peerIdStr, message as IProtoMessage ); - if (alreadyReceived) { + if (received) { log.info("Message already received, skipping"); return; } @@ -225,20 +205,6 @@ export class Subscription implements ISubscription { await pushMessage(subscriptionCallback, this.pubsubTopic, message); } - private async subscribeWithPeerVerification( - peer: Peer, - contentTopics: string[] - ): Promise { - const result = await this.protocol.subscribe( - this.pubsubTopic, - peer, - contentTopics - ); - - await this.sendLightPushCheckMessage(peer); - return result; - } - private handleResult( results: PromiseSettledResult[], type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" @@ -263,145 +229,6 @@ export class Subscription implements ISubscription { } return result; } - - private async pingSpecificPeer(peerId: PeerId): Promise { - const peers = await this.peerManager.getPeers(); - const peer = peers.find((p) => p.id.equals(peerId)); - if (!peer) { - return { - success: null, - failure: { - peerId, - error: ProtocolError.NO_PEER_AVAILABLE - } - }; - } - - let result; - try { - result = await this.protocol.ping(peer); - } catch (error) { - result = { - success: null, - failure: { - peerId, - error: ProtocolError.GENERIC_FAIL - } - }; - } - - log.info( - `Received result from filter ping peerId:${peerId.toString()}\tsuccess:${result.success?.toString()}\tfailure:${result.failure?.error}` - ); - await this.reliabilityMonitor.handlePingResult(peerId, result); - return result; - } - - private startSubscriptionsMaintenance(timeout: number): void { - log.info("Starting subscriptions maintenance"); - this.startKeepAlivePings(timeout); - this.startConnectionListener(); - } - - private stopSubscriptionsMaintenance(): void { - log.info("Stopping subscriptions maintenance"); - this.stopKeepAlivePings(); - this.stopConnectionListener(); - } - - private startConnectionListener(): void { - this.connectionManager.addEventListener( - EConnectionStateEvents.CONNECTION_STATUS, - this.connectionListener.bind(this) as (v: CustomEvent) => void - ); - } - - private stopConnectionListener(): void { - this.connectionManager.removeEventListener( - EConnectionStateEvents.CONNECTION_STATUS, - this.connectionListener.bind(this) as (v: CustomEvent) => void - ); - } - - private async connectionListener({ - detail: isConnected - }: CustomEvent): Promise { - if (!isConnected) { - this.stopKeepAlivePings(); - return; - } - - try { - // we do nothing here, as the renewal process is managed internally by `this.ping()` - await this.ping(); - } catch (err) { - log.error(`networkStateListener failed to recover: ${err}`); - } - - this.startKeepAlivePings(this.keepAliveTimeout); - } - - private startKeepAlivePings(timeout: number): void { - if (this.keepAliveInterval) { - log.info("Recurring pings already set up."); - return; - } - - this.keepAliveInterval = setInterval(() => { - void this.ping(); - }, timeout); - } - - private stopKeepAlivePings(): void { - if (!this.keepAliveInterval) { - log.info("Already stopped recurring pings."); - return; - } - - log.info("Stopping recurring pings."); - clearInterval(this.keepAliveInterval); - this.keepAliveInterval = null; - } - - private async sendLightPushCheckMessage(peer: Peer): Promise { - if ( - this.lightPush && - this.libp2p && - this.reliabilityMonitor.shouldVerifyPeer(peer.id) - ) { - const encoder = createEncoder({ - contentTopic: this.buildLightPushContentTopic(), - pubsubTopic: this.pubsubTopic, - ephemeral: true - }); - - const message = { payload: new Uint8Array(1) }; - const protoMessage = await encoder.toProtoObj(message); - - // make a delay to be sure message is send when subscription is in place - setTimeout( - (async () => { - const result = await (this.lightPush!.protocol as LightPushCore).send( - encoder, - message, - peer - ); - this.reliabilityMonitor.notifyMessageSent(peer.id, protoMessage); - if (result.failure) { - log.error( - `failed to send lightPush ping message to peer:${peer.id.toString()}\t${result.failure.error}` - ); - return; - } - }) as () => void, - DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL - ); - } - } - - private buildLightPushContentTopic(): string { - return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`; - } } async function pushMessage( diff --git a/packages/sdk/src/protocols/filter/subscription_monitor.ts b/packages/sdk/src/protocols/filter/subscription_monitor.ts new file mode 100644 index 0000000000..966e197f89 --- /dev/null +++ b/packages/sdk/src/protocols/filter/subscription_monitor.ts @@ -0,0 +1,287 @@ +import type { EventHandler, Peer, PeerId } from "@libp2p/interface"; +import { FilterCore } from "@waku/core"; +import type { + FilterProtocolOptions, + IConnectionManager, + ILightPush, + IProtoMessage, + Libp2p +} from "@waku/interfaces"; +import { EConnectionStateEvents } from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; + +import { PeerManager } from "../peer_manager.js"; + +// TODO(weboko): consider adding as config property or combine with maxAllowedPings +const MAX_SUBSCRIBE_ATTEMPTS = 3; + +type SubscriptionMonitorConstructorOptions = { + pubsubTopic: string; + config: FilterProtocolOptions; + libp2p: Libp2p; + connectionManager: IConnectionManager; + filter: FilterCore; + peerManager: PeerManager; + lightPush?: ILightPush; + activeSubscriptions: Map; +}; + +export class SubscriptionMonitor { + /** + * Cached peers that are in use by subscription. + * Needed to understand if they disconnect later or not. + */ + public peers: Peer[] = []; + + private isStarted: boolean = false; + + private readonly pubsubTopic: string; + private readonly config: FilterProtocolOptions; + + private readonly libp2p: Libp2p; + private readonly filter: FilterCore; + private readonly peerManager: PeerManager; + private readonly connectionManager: IConnectionManager; + private readonly activeSubscriptions: Map; + + private keepAliveIntervalId: number | undefined; + private pingFailedAttempts = new Map(); + + private receivedMessagesFormPeer = new Set(); + private receivedMessages = new Set(); + private verifiedPeers = new Set(); + + public constructor(options: SubscriptionMonitorConstructorOptions) { + this.config = options.config; + this.connectionManager = options.connectionManager; + this.filter = options.filter; + this.peerManager = options.peerManager; + this.libp2p = options.libp2p; + this.activeSubscriptions = options.activeSubscriptions; + this.pubsubTopic = options.pubsubTopic; + + this.onConnectionChange = this.onConnectionChange.bind(this); + this.onPeerConnected = this.onPeerConnected.bind(this); + this.onPeerDisconnected = this.onPeerDisconnected.bind(this); + } + + /** + * @returns content topic used for Filter verification + */ + public get reservedContentTopic(): string { + return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`; + } + + /** + * Starts: + * - recurring ping queries; + * - connection event observers; + */ + public start(): void { + if (this.isStarted) { + return; + } + + this.isStarted = true; + + this.startKeepAlive(); + this.startConnectionListener(); + this.startPeerConnectionListener(); + } + + /** + * Stops all recurring queries, event listeners or timers. + */ + public stop(): void { + if (!this.isStarted) { + return; + } + + this.isStarted = false; + + this.stopKeepAlive(); + this.stopConnectionListener(); + this.stopPeerConnectionListener(); + } + + /** + * Method to get peers that are used by particular subscription or, if initially called, peers that can be used by subscription. + * @returns array of peers + */ + public async getPeers(): Promise { + if (!this.isStarted) { + this.peers = await this.peerManager.getPeers(); + } + + return this.peers; + } + + /** + * Notifies monitor if message was received. + * + * @param peerId peer from which message is received + * @param message received message + * + * @returns true if message was received from peer + */ + public notifyMessageReceived( + peerId: string, + message: IProtoMessage + ): boolean { + const hash = this.buildMessageHash(message); + + this.verifiedPeers.add(peerId); + this.receivedMessagesFormPeer.add(`${peerId}-${hash}`); + + if (this.receivedMessages.has(hash)) { + return true; + } + + this.receivedMessages.add(hash); + + return false; + } + + private buildMessageHash(message: IProtoMessage): string { + return messageHashStr(this.pubsubTopic, message); + } + + private startConnectionListener(): void { + this.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.onConnectionChange as (v: CustomEvent) => void + ); + } + + private stopConnectionListener(): void { + this.connectionManager.removeEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.onConnectionChange as (v: CustomEvent) => void + ); + } + + private async onConnectionChange({ + detail: isConnected + }: CustomEvent): Promise { + if (!isConnected) { + this.stopKeepAlive(); + return; + } + + await Promise.all(this.peers.map((peer) => this.ping(peer, true))); + this.startKeepAlive(); + } + + private startKeepAlive(): void { + if (this.keepAliveIntervalId) { + return; + } + + this.keepAliveIntervalId = setInterval(() => { + void this.peers.map((peer) => this.ping(peer)); + }, this.config.keepAliveIntervalMs) as unknown as number; + } + + private stopKeepAlive(): void { + if (!this.keepAliveIntervalId) { + return; + } + + clearInterval(this.keepAliveIntervalId); + this.keepAliveIntervalId = undefined; + } + + private startPeerConnectionListener(): void { + this.libp2p.addEventListener( + "peer:connect", + this.onPeerConnected as EventHandler> + ); + this.libp2p.addEventListener( + "peer:disconnect", + this.onPeerDisconnected as EventHandler> + ); + } + + private stopPeerConnectionListener(): void { + this.libp2p.removeEventListener( + "peer:connect", + this.onPeerConnected as EventHandler> + ); + this.libp2p.removeEventListener( + "peer:disconnect", + this.onPeerDisconnected as EventHandler> + ); + } + + private async onPeerConnected(_event: CustomEvent): Promise { + // TODO(weboko): use config.numOfUsedPeers here + if (this.peers.length > 0) { + return; + } + + this.peers = await this.peerManager.getPeers(); + await Promise.all(this.peers.map((peer) => this.subscribe(peer))); + } + + private async onPeerDisconnected(event: CustomEvent): Promise { + const hasNotBeenUsed = !this.peers.find((p) => p.id.equals(event.detail)); + if (hasNotBeenUsed) { + return; + } + + this.peers = await this.peerManager.getPeers(); + await Promise.all(this.peers.map((peer) => this.subscribe(peer))); + } + + private async subscribe(_peer: Peer | undefined): Promise { + let peer: Peer | undefined = _peer; + + for (let i = 0; i < MAX_SUBSCRIBE_ATTEMPTS; i++) { + if (!peer) { + return; + } + + const response = await this.filter.subscribe( + this.pubsubTopic, + peer, + Array.from(this.activeSubscriptions.keys()) + ); + + if (response.success) { + return; + } + + peer = await this.peerManager.requestRenew(peer.id); + } + } + + private async ping( + peer: Peer, + renewOnFirstFail: boolean = false + ): Promise { + const peerIdStr = peer.id.toString(); + const response = await this.filter.ping(peer); + + if (response.failure && renewOnFirstFail) { + const newPeer = await this.peerManager.requestRenew(peer.id); + await this.subscribe(newPeer); + return; + } + + if (response.failure) { + const prev = this.pingFailedAttempts.get(peerIdStr) || 0; + this.pingFailedAttempts.set(peerIdStr, prev + 1); + } + + if (response.success) { + this.pingFailedAttempts.set(peerIdStr, 0); + } + + const madeAttempts = this.pingFailedAttempts.get(peerIdStr) || 0; + + if (madeAttempts >= this.config.pingsBeforePeerRenewed) { + const newPeer = await this.peerManager.requestRenew(peer.id); + await this.subscribe(newPeer); + } + } +} diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts deleted file mode 100644 index 120d208404..0000000000 --- a/packages/sdk/src/reliability_monitor/index.ts +++ /dev/null @@ -1,56 +0,0 @@ -import type { Peer } from "@libp2p/interface"; -import { - ContentTopic, - CoreProtocolResult, - PubsubTopic -} from "@waku/interfaces"; - -import { PeerManager } from "../protocols/peer_manager.js"; - -import { ReceiverReliabilityMonitor } from "./receiver.js"; - -export class ReliabilityMonitorManager { - private static receiverMonitors: Map< - PubsubTopic, - ReceiverReliabilityMonitor - > = new Map(); - - public static createReceiverMonitor( - pubsubTopic: PubsubTopic, - peerManager: PeerManager, - getContentTopics: () => ContentTopic[], - protocolSubscribe: ( - pubsubTopic: PubsubTopic, - peer: Peer, - contentTopics: ContentTopic[] - ) => Promise, - sendLightPushMessage: (peer: Peer) => Promise - ): ReceiverReliabilityMonitor { - if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { - return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; - } - - const monitor = new ReceiverReliabilityMonitor( - pubsubTopic, - peerManager, - getContentTopics, - protocolSubscribe, - sendLightPushMessage - ); - ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); - return monitor; - } - - private constructor() {} - - public static stop(pubsubTopic: PubsubTopic): void { - this.receiverMonitors.delete(pubsubTopic); - } - - public static stopAll(): void { - for (const [pubsubTopic, monitor] of this.receiverMonitors) { - monitor.setMaxPingFailures(undefined); - this.receiverMonitors.delete(pubsubTopic); - } - } -} diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts deleted file mode 100644 index cbde6ddd3c..0000000000 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ /dev/null @@ -1,185 +0,0 @@ -import type { Peer, PeerId } from "@libp2p/interface"; -import { - ContentTopic, - CoreProtocolResult, - IProtoMessage, - PeerIdStr, - PubsubTopic -} from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; -import { Logger } from "@waku/utils"; -import { bytesToUtf8 } from "@waku/utils/bytes"; - -import { PeerManager } from "../protocols/peer_manager.js"; - -const log = new Logger("sdk:receiver:reliability_monitor"); - -const DEFAULT_MAX_PINGS = 3; -const MESSAGE_VERIFICATION_DELAY = 5_000; - -export class ReceiverReliabilityMonitor { - private receivedMessagesFormPeer = new Set(); - private receivedMessages = new Set(); - private scheduledVerification = new Map(); - private verifiedPeers = new Set(); - - private peerFailures: Map = new Map(); - private maxPingFailures: number = DEFAULT_MAX_PINGS; - private peerRenewalLocks: Set = new Set(); - - public constructor( - private readonly pubsubTopic: PubsubTopic, - private readonly peerManager: PeerManager, - private getContentTopics: () => ContentTopic[], - private protocolSubscribe: ( - pubsubTopic: PubsubTopic, - peer: Peer, - contentTopics: ContentTopic[] - ) => Promise, - private sendLightPushMessage: (peer: Peer) => Promise - ) {} - - public setMaxPingFailures(value: number | undefined): void { - if (value === undefined) { - return; - } - this.maxPingFailures = value; - } - - public async handlePingResult( - peerId: PeerId, - result?: CoreProtocolResult - ): Promise { - if (result?.success) { - this.peerFailures.delete(peerId.toString()); - return; - } - - const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; - this.peerFailures.set(peerId.toString(), failures); - - if (failures >= this.maxPingFailures) { - try { - log.info( - `Attempting to renew ${peerId.toString()} due to ping failures.` - ); - await this.renewAndSubscribePeer(peerId); - this.peerFailures.delete(peerId.toString()); - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); - } - } - } - - public notifyMessageReceived( - peerIdStr: string, - message: IProtoMessage - ): boolean { - const hash = this.buildMessageHash(message); - - this.verifiedPeers.add(peerIdStr); - this.receivedMessagesFormPeer.add(`${peerIdStr}-${hash}`); - - log.info( - `notifyMessage received debug: ephemeral:${message.ephemeral}\t${bytesToUtf8(message.payload)}` - ); - log.info(`notifyMessage received: peer:${peerIdStr}\tmessage:${hash}`); - - if (this.receivedMessages.has(hash)) { - return true; - } - - this.receivedMessages.add(hash); - - return false; - } - - public notifyMessageSent(peerId: PeerId, message: IProtoMessage): void { - const peerIdStr = peerId.toString(); - const hash = this.buildMessageHash(message); - - log.info(`notifyMessage sent debug: ${bytesToUtf8(message.payload)}`); - - if (this.scheduledVerification.has(peerIdStr)) { - log.warn( - `notifyMessage sent: attempting to schedule verification for pending peer:${peerIdStr}\tmessage:${hash}` - ); - return; - } - - const timeout = setTimeout( - (async () => { - const receivedAnyMessage = this.verifiedPeers.has(peerIdStr); - const receivedTestMessage = this.receivedMessagesFormPeer.has( - `${peerIdStr}-${hash}` - ); - - if (receivedAnyMessage || receivedTestMessage) { - log.info( - `notifyMessage sent setTimeout: verified that peer pushes filter messages, peer:${peerIdStr}\tmessage:${hash}` - ); - return; - } - - log.warn( - `notifyMessage sent setTimeout: peer didn't return probe message, attempting renewAndSubscribe, peer:${peerIdStr}\tmessage:${hash}` - ); - this.scheduledVerification.delete(peerIdStr); - await this.renewAndSubscribePeer(peerId); - }) as () => void, - MESSAGE_VERIFICATION_DELAY - ) as unknown as number; - - this.scheduledVerification.set(peerIdStr, timeout); - } - - public shouldVerifyPeer(peerId: PeerId): boolean { - const peerIdStr = peerId.toString(); - - const isPeerVerified = this.verifiedPeers.has(peerIdStr); - const isVerificationPending = this.scheduledVerification.has(peerIdStr); - - return !(isPeerVerified || isVerificationPending); - } - - private buildMessageHash(message: IProtoMessage): string { - return messageHashStr(this.pubsubTopic, message); - } - - private async renewAndSubscribePeer( - peerId: PeerId - ): Promise { - const peerIdStr = peerId.toString(); - try { - if (this.peerRenewalLocks.has(peerIdStr)) { - log.info(`Peer ${peerIdStr} is already being renewed.`); - return; - } - - this.peerRenewalLocks.add(peerIdStr); - - const newPeer = await this.peerManager.requestRenew(peerId); - if (!newPeer) { - log.warn(`Failed to renew peer ${peerIdStr}: No new peer found.`); - return; - } - - await this.protocolSubscribe( - this.pubsubTopic, - newPeer, - this.getContentTopics() - ); - - await this.sendLightPushMessage(newPeer); - - this.peerFailures.delete(peerIdStr); - - return newPeer; - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}.`); - return; - } finally { - this.peerRenewalLocks.delete(peerIdStr); - } - } -} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index f126e0e460..97ab276ed8 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -20,7 +20,6 @@ import { wakuFilter } from "../protocols/filter/index.js"; import { wakuLightPush } from "../protocols/light_push/index.js"; import { PeerManager } from "../protocols/peer_manager.js"; import { wakuStore } from "../protocols/store/index.js"; -import { ReliabilityMonitorManager } from "../reliability_monitor/index.js"; import { waitForRemotePeer } from "./wait_for_remote_peer.js"; @@ -187,7 +186,6 @@ export class WakuNode implements IWaku { } public async stop(): Promise { - ReliabilityMonitorManager.stopAll(); this.peerManager.stop(); this.connectionManager.stop(); await this.libp2p.stop(); diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 37b5d6e632..a324b15459 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -115,7 +115,9 @@ describe("Wait for remote peer", function () { await delay(1000); await waku2.waitForPeers([Protocols.Store]); - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; @@ -143,7 +145,9 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waitPromise; - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); @@ -170,7 +174,9 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waku2.waitForPeers([Protocols.LightPush]); - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); @@ -197,7 +203,9 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waku2.waitForPeers([Protocols.Filter]); - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); @@ -229,7 +237,9 @@ describe("Wait for remote peer", function () { Protocols.LightPush ]); - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 0d81d7c8bc..8d037d1bd2 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -11,8 +11,7 @@ import { import { createRelayNode } from "@waku/relay"; import { createLightNode, - createEncoder as createPlainEncoder, - DefaultUserAgent + createEncoder as createPlainEncoder } from "@waku/sdk"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -278,7 +277,7 @@ describe("User Agent", function () { waku1UserAgent ); expect(bytesToUtf8(waku2PeerInfo.metadata.get("AgentVersion")!)).to.eq( - DefaultUserAgent + "js-waku" ); }); });