diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b92f2eb33f..49f5a52038 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,7 +57,7 @@ jobs: browser: runs-on: ubuntu-latest container: - image: mcr.microsoft.com/playwright:v1.48.0-jammy + image: mcr.microsoft.com/playwright:v1.50.0-jammy env: HOME: "/root" steps: diff --git a/.github/workflows/playwright.yml b/.github/workflows/playwright.yml index 2caeab4c91..82009a92f7 100644 --- a/.github/workflows/playwright.yml +++ b/.github/workflows/playwright.yml @@ -20,7 +20,7 @@ jobs: timeout-minutes: 60 runs-on: ubuntu-latest container: - image: mcr.microsoft.com/playwright:v1.48.0-jammy + image: mcr.microsoft.com/playwright:v1.50.0-jammy steps: - uses: actions/checkout@v3 - uses: actions/setup-node@v3 diff --git a/packages/browser-tests/package.json b/packages/browser-tests/package.json index 81244df52c..3e9810b148 100644 --- a/packages/browser-tests/package.json +++ b/packages/browser-tests/package.json @@ -11,7 +11,7 @@ "test": "npx playwright test" }, "devDependencies": { - "@playwright/test": "^1.48.1", + "@playwright/test": "^1.50.0", "@waku/create-app": "^0.1.1-504bcd4", "dotenv-flow": "^4.1.0", "serve": "^14.2.3" diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index f992a497a7..7d5168e27b 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -15,17 +15,34 @@ export type SubscriptionCallback = { callback: Callback; }; -export type SubscribeOptions = { - keepAlive?: number; - pingsBeforePeerRenewed?: number; - enableLightPushFilterCheck?: boolean; +export type FilterProtocolOptions = { + /** + * Interval with which Filter subscription will attempt to send ping requests to subscribed peers. + * + * @default 60_000 + */ + keepAliveIntervalMs: number; + + /** + * Number of failed pings allowed to make to a remote peer before attempting to subscribe to a new one. + * + * @default 3 + */ + pingsBeforePeerRenewed: number; + + /** + * Enables js-waku to send probe LightPush message over subscribed pubsubTopics on created subscription. + * In case message won't be received back through Filter - js-waku will attempt to subscribe to another peer. + * + * @default false + */ + enableLightPushFilterCheck: boolean; }; export interface ISubscription { subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - options?: SubscribeOptions + callback: Callback ): Promise; unsubscribe(contentTopics: ContentTopic[]): Promise; @@ -38,8 +55,7 @@ export interface ISubscription { export type IFilter = IReceiver & { protocol: IBaseProtocolCore } & { subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - subscribeOptions?: SubscribeOptions + callback: Callback ): Promise; }; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 278d67c33e..40fc66542d 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; import type { ConnectionManagerOptions } from "./connection_manager.js"; +import type { FilterProtocolOptions } from "./filter.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; import { ThisAndThat, ThisOrThat } from "./misc.js"; @@ -23,19 +24,13 @@ export type IBaseProtocolCore = { export type NetworkConfig = StaticSharding | AutoSharding; -export type ProtocolCreateOptions = { +export type CreateNodeOptions = { /** - * Configuration for determining the network in use. - * - * If using Static Sharding: - * Default value is configured for The Waku Network. - * The format to specify a shard is: clusterId: number, shards: number[] - * To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/). + * Set the user agent string to be used in identification of the node. * - * If using Auto Sharding: - * See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details. - * You cannot add or remove content topics after initialization of the node. + * @default "js-waku" */ + userAgent?: string; /** * Configuration for determining the network in use. @@ -93,10 +88,17 @@ export type ProtocolCreateOptions = { bootstrapPeers?: string[]; /** - * Configuration for connection manager. If not specified - default values are applied. + * Configuration for connection manager. + * If not specified - default values are applied. */ connectionManager?: Partial; + /** + * Configuration for Filter protocol. + * If not specified - default values are applied. + */ + filter?: Partial; + /** * Options for the Store protocol. */ diff --git a/packages/relay/src/create.ts b/packages/relay/src/create.ts index 448598162b..476e71bf44 100644 --- a/packages/relay/src/create.ts +++ b/packages/relay/src/create.ts @@ -1,10 +1,5 @@ -import type { RelayNode } from "@waku/interfaces"; -import { - createLibp2pAndUpdateOptions, - CreateWakuNodeOptions, - WakuNode, - WakuOptions -} from "@waku/sdk"; +import type { CreateNodeOptions, RelayNode } from "@waku/interfaces"; +import { createLibp2pAndUpdateOptions, WakuNode } from "@waku/sdk"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js"; @@ -19,7 +14,7 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js"; * or use this function with caution. */ export async function createRelayNode( - options: CreateWakuNodeOptions & Partial + options: CreateNodeOptions & Partial ): Promise { options = { ...options, @@ -36,7 +31,7 @@ export async function createRelayNode( return new WakuNode( pubsubTopics, - options as WakuOptions, + options as CreateNodeOptions, libp2p, {}, relay diff --git a/packages/relay/src/relay.ts b/packages/relay/src/relay.ts index ed1cc0d556..8c7e0c1cac 100644 --- a/packages/relay/src/relay.ts +++ b/packages/relay/src/relay.ts @@ -11,6 +11,7 @@ import { sha256 } from "@noble/hashes/sha256"; import { ActiveSubscriptions, Callback, + CreateNodeOptions, IAsyncIterator, IDecodedMessage, IDecoder, @@ -18,7 +19,6 @@ import { IMessage, IRelay, Libp2p, - ProtocolCreateOptions, ProtocolError, PubsubTopic, SDKProtocolResult @@ -39,7 +39,7 @@ export type Observer = { callback: Callback; }; -export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts; +export type RelayCreateOptions = CreateNodeOptions & GossipsubOpts; export type ContentTopic = string; /** diff --git a/packages/sdk/src/create/create.ts b/packages/sdk/src/create/create.ts index 155fee3403..ccd519e7bc 100644 --- a/packages/sdk/src/create/create.ts +++ b/packages/sdk/src/create/create.ts @@ -1,6 +1,6 @@ -import { type LightNode } from "@waku/interfaces"; +import type { CreateNodeOptions, LightNode } from "@waku/interfaces"; -import { CreateWakuNodeOptions, WakuNode } from "../waku/index.js"; +import { WakuNode } from "../waku/index.js"; import { createLibp2pAndUpdateOptions } from "./libp2p.js"; @@ -10,7 +10,7 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js"; * Uses Waku Filter V2 by default. */ export async function createLightNode( - options: CreateWakuNodeOptions = {} + options: CreateNodeOptions = {} ): Promise { const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); diff --git a/packages/sdk/src/create/libp2p.ts b/packages/sdk/src/create/libp2p.ts index 7aca089b2e..1b4afce675 100644 --- a/packages/sdk/src/create/libp2p.ts +++ b/packages/sdk/src/create/libp2p.ts @@ -8,6 +8,7 @@ import { all as filterAll, wss } from "@libp2p/websockets/filters"; import { wakuMetadata } from "@waku/core"; import { type CreateLibp2pOptions, + type CreateNodeOptions, DefaultNetworkConfig, type IMetadata, type Libp2p, @@ -18,11 +19,6 @@ import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; import { createLibp2p } from "libp2p"; import { isTestEnvironment } from "../env.js"; -import { - CreateWakuNodeOptions, - DefaultPingMaxInboundStreams, - DefaultUserAgent -} from "../waku/index.js"; import { defaultPeerDiscoveries } from "./discovery.js"; @@ -32,6 +28,9 @@ type MetadataService = { const log = new Logger("sdk:create"); +const DefaultUserAgent = "js-waku"; +const DefaultPingMaxInboundStreams = 10; + export async function defaultLibp2p( pubsubTopics: PubsubTopic[], options?: Partial, @@ -79,7 +78,7 @@ export async function defaultLibp2p( } export async function createLibp2pAndUpdateOptions( - options: CreateWakuNodeOptions + options: CreateNodeOptions ): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> { const { networkConfig } = options; const pubsubTopics = derivePubsubTopicsFromNetworkConfig( diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 9477a7e417..43fbf11374 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,8 +1,3 @@ export const DEFAULT_KEEP_ALIVE = 60_000; +export const DEFAULT_MAX_PINGS = 3; export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false; -export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000; - -export const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: DEFAULT_KEEP_ALIVE, - enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK -}; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index a70e484793..1e4b6d86bf 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -1,20 +1,19 @@ import { ConnectionManager, FilterCore } from "@waku/core"; -import { - type Callback, - type CreateSubscriptionResult, - type IAsyncIterator, - type IDecodedMessage, - type IDecoder, - type IFilter, - type ILightPush, - type Libp2p, - NetworkConfig, - ProtocolError, - type PubsubTopic, - type SubscribeOptions, +import type { + Callback, + CreateSubscriptionResult, + FilterProtocolOptions, + IAsyncIterator, + IDecodedMessage, + IDecoder, + IFilter, + ILightPush, + Libp2p, + PubsubTopic, SubscribeResult, - type Unsubscribe + Unsubscribe } from "@waku/interfaces"; +import { NetworkConfig, ProtocolError } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, groupByContentTopic, @@ -25,22 +24,26 @@ import { import { PeerManager } from "../peer_manager.js"; -import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; -import { SubscriptionManager } from "./subscription_manager.js"; +import { Subscription } from "./subscription.js"; +import { buildConfig } from "./utils.js"; const log = new Logger("sdk:filter"); class Filter implements IFilter { public readonly protocol: FilterCore; - private activeSubscriptions = new Map(); + private readonly config: FilterProtocolOptions; + private activeSubscriptions = new Map(); public constructor( private connectionManager: ConnectionManager, private libp2p: Libp2p, private peerManager: PeerManager, - private lightPush?: ILightPush + private lightPush?: ILightPush, + config?: Partial ) { + this.config = buildConfig(config); + this.protocol = new FilterCore( async (pubsubTopic, wakuMessage, peerIdStr) => { const subscription = this.getActiveSubscription(pubsubTopic); @@ -50,6 +53,7 @@ class Filter implements IFilter { ); return; } + await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, @@ -66,7 +70,6 @@ class Filter implements IFilter { * * @param {IDecoder | IDecoder[]} decoders - A single decoder or an array of decoders to use for decoding messages. * @param {Callback} callback - The callback function to be invoked with decoded messages. - * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription. * * @returns {Promise} A promise that resolves to an object containing: * - subscription: The created subscription object if successful, or null if failed. @@ -100,8 +103,7 @@ class Filter implements IFilter { */ public async subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + callback: Callback ): Promise { const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); @@ -127,8 +129,7 @@ class Filter implements IFilter { const { failures, successes } = await subscription.subscribe( decoders, - callback, - subscribeOptions + callback ); return { subscription, @@ -173,12 +174,13 @@ class Filter implements IFilter { this.getActiveSubscription(pubsubTopic) ?? this.setActiveSubscription( pubsubTopic, - new SubscriptionManager( + new Subscription( pubsubTopic, this.protocol, this.connectionManager, this.peerManager, this.libp2p, + this.config, this.lightPush ) ); @@ -206,8 +208,7 @@ class Filter implements IFilter { */ public async subscribeWithUnsubscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + callback: Callback ): Promise { const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); @@ -231,7 +232,7 @@ class Filter implements IFilter { throw Error(`Failed to create subscription: ${error}`); } - await subscription.subscribe(decoders, callback, options); + await subscription.subscribe(decoders, callback); const contentTopics = Array.from( groupByContentTopic( @@ -250,17 +251,16 @@ class Filter implements IFilter { return toAsyncIterator(this, decoders); } - //TODO: move to SubscriptionManager private getActiveSubscription( pubsubTopic: PubsubTopic - ): SubscriptionManager | undefined { + ): Subscription | undefined { return this.activeSubscriptions.get(pubsubTopic); } private setActiveSubscription( pubsubTopic: PubsubTopic, - subscription: SubscriptionManager - ): SubscriptionManager { + subscription: Subscription + ): Subscription { this.activeSubscriptions.set(pubsubTopic, subscription); return subscription; } @@ -285,8 +285,9 @@ class Filter implements IFilter { export function wakuFilter( connectionManager: ConnectionManager, peerManager: PeerManager, - lightPush?: ILightPush + lightPush?: ILightPush, + config?: Partial ): (libp2p: Libp2p) => IFilter { return (libp2p: Libp2p) => - new Filter(connectionManager, libp2p, peerManager, lightPush); + new Filter(connectionManager, libp2p, peerManager, lightPush, config); } diff --git a/packages/sdk/src/protocols/filter/subscription.ts b/packages/sdk/src/protocols/filter/subscription.ts new file mode 100644 index 0000000000..c585a05408 --- /dev/null +++ b/packages/sdk/src/protocols/filter/subscription.ts @@ -0,0 +1,260 @@ +import { ConnectionManager, createDecoder, FilterCore } from "@waku/core"; +import { + type Callback, + type ContentTopic, + type CoreProtocolResult, + FilterProtocolOptions, + type IDecodedMessage, + type IDecoder, + type ILightPush, + type IProtoMessage, + type ISubscription, + type Libp2p, + type PeerIdStr, + ProtocolError, + type PubsubTopic, + type SDKProtocolResult, + SubscriptionCallback +} from "@waku/interfaces"; +import { WakuMessage } from "@waku/proto"; +import { groupByContentTopic, Logger } from "@waku/utils"; + +import { PeerManager } from "../peer_manager.js"; + +import { SubscriptionMonitor } from "./subscription_monitor.js"; + +const log = new Logger("sdk:filter:subscription"); + +export class Subscription implements ISubscription { + private readonly monitor: SubscriptionMonitor; + + private subscriptionCallbacks: Map< + ContentTopic, + SubscriptionCallback + > = new Map(); + + public constructor( + private readonly pubsubTopic: PubsubTopic, + private readonly protocol: FilterCore, + connectionManager: ConnectionManager, + peerManager: PeerManager, + libp2p: Libp2p, + private readonly config: FilterProtocolOptions, + lightPush?: ILightPush + ) { + this.pubsubTopic = pubsubTopic; + + this.monitor = new SubscriptionMonitor({ + pubsubTopic, + config, + libp2p, + connectionManager, + filter: protocol, + peerManager, + lightPush, + activeSubscriptions: this.subscriptionCallbacks + }); + } + + public async subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback + ): Promise { + const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; + + // check that all decoders are configured for the same pubsub topic as this subscription + for (const decoder of decodersArray) { + if (decoder.pubsubTopic !== this.pubsubTopic) { + return { + failures: [ + { + error: ProtocolError.TOPIC_DECODER_MISMATCH + } + ], + successes: [] + }; + } + } + + if (this.config.enableLightPushFilterCheck) { + decodersArray.push( + createDecoder( + this.monitor.reservedContentTopic, + this.pubsubTopic + ) as IDecoder + ); + } + + const decodersGroupedByCT = groupByContentTopic(decodersArray); + const contentTopics = Array.from(decodersGroupedByCT.keys()); + + const peers = await this.monitor.getPeers(); + const promises = peers.map(async (peer) => { + return this.protocol.subscribe(this.pubsubTopic, peer, contentTopics); + }); + + const results = await Promise.allSettled(promises); + + const finalResult = this.handleResult(results, "subscribe"); + + // Save the callback functions by content topics so they + // can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`) + // is called for those content topics + decodersGroupedByCT.forEach((decoders, contentTopic) => { + // Cast the type because a given `subscriptionCallbacks` map may hold + // Decoder that decode to different implementations of `IDecodedMessage` + const subscriptionCallback = { + decoders, + callback + } as unknown as SubscriptionCallback; + + // don't handle case of internal content topic + if (contentTopic === this.monitor.reservedContentTopic) { + return; + } + + // The callback and decoder may override previous values, this is on + // purpose as the user may call `subscribe` to refresh the subscription + this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); + }); + + this.monitor.start(); + + return finalResult; + } + + public async unsubscribe( + contentTopics: ContentTopic[] + ): Promise { + const peers = await this.monitor.getPeers(); + const promises = peers.map(async (peer) => { + const response = await this.protocol.unsubscribe( + this.pubsubTopic, + peer, + contentTopics + ); + + contentTopics.forEach((contentTopic: string) => { + this.subscriptionCallbacks.delete(contentTopic); + }); + + return response; + }); + + const results = await Promise.allSettled(promises); + const finalResult = this.handleResult(results, "unsubscribe"); + + if (this.subscriptionCallbacks.size === 0) { + this.monitor.stop(); + } + + return finalResult; + } + + public async ping(): Promise { + const peers = await this.monitor.getPeers(); + const promises = peers.map((peer) => this.protocol.ping(peer)); + + const results = await Promise.allSettled(promises); + return this.handleResult(results, "ping"); + } + + public async unsubscribeAll(): Promise { + const peers = await this.monitor.getPeers(); + const promises = peers.map(async (peer) => + this.protocol.unsubscribeAll(this.pubsubTopic, peer) + ); + + const results = await Promise.allSettled(promises); + + this.subscriptionCallbacks.clear(); + + const finalResult = this.handleResult(results, "unsubscribeAll"); + + this.monitor.stop(); + + return finalResult; + } + + public async processIncomingMessage( + message: WakuMessage, + peerIdStr: PeerIdStr + ): Promise { + const received = this.monitor.notifyMessageReceived( + peerIdStr, + message as IProtoMessage + ); + + if (received) { + log.info("Message already received, skipping"); + return; + } + + const { contentTopic } = message; + const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); + if (!subscriptionCallback) { + log.error("No subscription callback available for ", contentTopic); + return; + } + log.info( + "Processing message with content topic ", + contentTopic, + " on pubsub topic ", + this.pubsubTopic + ); + await pushMessage(subscriptionCallback, this.pubsubTopic, message); + } + + private handleResult( + results: PromiseSettledResult[], + type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" + ): SDKProtocolResult { + const result: SDKProtocolResult = { failures: [], successes: [] }; + + for (const promiseResult of results) { + if (promiseResult.status === "rejected") { + log.error( + `Failed to resolve ${type} promise successfully: `, + promiseResult.reason + ); + result.failures.push({ error: ProtocolError.GENERIC_FAIL }); + } else { + const coreResult = promiseResult.value; + if (coreResult.failure) { + result.failures.push(coreResult.failure); + } else { + result.successes.push(coreResult.success); + } + } + } + return result; + } +} + +async function pushMessage( + subscriptionCallback: SubscriptionCallback, + pubsubTopic: PubsubTopic, + message: WakuMessage +): Promise { + const { decoders, callback } = subscriptionCallback; + + const { contentTopic } = message; + if (!contentTopic) { + log.warn("Message has no content topic, skipping"); + return; + } + + try { + const decodePromises = decoders.map((dec) => + dec + .fromProtoObj(pubsubTopic, message as IProtoMessage) + .then((decoded) => decoded || Promise.reject("Decoding failed")) + ); + + const decodedMessage = await Promise.any(decodePromises); + + await callback(decodedMessage); + } catch (e) { + log.error("Error decoding message", e); + } +} diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts deleted file mode 100644 index 59af175121..0000000000 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ /dev/null @@ -1,441 +0,0 @@ -import type { Peer } from "@libp2p/interface"; -import type { PeerId } from "@libp2p/interface"; -import { - ConnectionManager, - createDecoder, - createEncoder, - FilterCore, - LightPushCore -} from "@waku/core"; -import { - type Callback, - type ContentTopic, - type CoreProtocolResult, - EConnectionStateEvents, - type IDecodedMessage, - type IDecoder, - type ILightPush, - type IProtoMessage, - type ISubscription, - type Libp2p, - type PeerIdStr, - ProtocolError, - type PubsubTopic, - type SDKProtocolResult, - type SubscribeOptions, - SubscriptionCallback -} from "@waku/interfaces"; -import { WakuMessage } from "@waku/proto"; -import { groupByContentTopic, Logger } from "@waku/utils"; - -import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; -import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js"; -import { PeerManager } from "../peer_manager.js"; - -import { - DEFAULT_KEEP_ALIVE, - DEFAULT_LIGHT_PUSH_FILTER_CHECK, - DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL, - DEFAULT_SUBSCRIBE_OPTIONS -} from "./constants.js"; - -const log = new Logger("sdk:filter:subscription_manager"); - -export class SubscriptionManager implements ISubscription { - private reliabilityMonitor: ReceiverReliabilityMonitor; - - private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE; - private keepAliveInterval: ReturnType | null = null; - - private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK; - - private subscriptionCallbacks: Map< - ContentTopic, - SubscriptionCallback - >; - - public constructor( - private readonly pubsubTopic: PubsubTopic, - private readonly protocol: FilterCore, - private readonly connectionManager: ConnectionManager, - private readonly peerManager: PeerManager, - private readonly libp2p: Libp2p, - private readonly lightPush?: ILightPush - ) { - this.pubsubTopic = pubsubTopic; - this.subscriptionCallbacks = new Map(); - - this.reliabilityMonitor = ReliabilityMonitorManager.createReceiverMonitor( - this.pubsubTopic, - this.peerManager, - () => Array.from(this.subscriptionCallbacks.keys()), - this.protocol.subscribe.bind(this.protocol), - this.sendLightPushCheckMessage.bind(this) - ); - } - - public async subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS - ): Promise { - this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed); - this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE; - this.enableLightPushFilterCheck = - options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK; - - const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; - - // check that all decoders are configured for the same pubsub topic as this subscription - for (const decoder of decodersArray) { - if (decoder.pubsubTopic !== this.pubsubTopic) { - return { - failures: [ - { - error: ProtocolError.TOPIC_DECODER_MISMATCH - } - ], - successes: [] - }; - } - } - - if (this.enableLightPushFilterCheck) { - decodersArray.push( - createDecoder( - this.buildLightPushContentTopic(), - this.pubsubTopic - ) as IDecoder - ); - } - - const decodersGroupedByCT = groupByContentTopic(decodersArray); - const contentTopics = Array.from(decodersGroupedByCT.keys()); - - const peers = await this.peerManager.getPeers(); - const promises = peers.map(async (peer) => - this.subscribeWithPeerVerification(peer, contentTopics) - ); - - const results = await Promise.allSettled(promises); - - const finalResult = this.handleResult(results, "subscribe"); - - // Save the callback functions by content topics so they - // can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`) - // is called for those content topics - decodersGroupedByCT.forEach((decoders, contentTopic) => { - // Cast the type because a given `subscriptionCallbacks` map may hold - // Decoder that decode to different implementations of `IDecodedMessage` - const subscriptionCallback = { - decoders, - callback - } as unknown as SubscriptionCallback; - - // don't handle case of internal content topic - if (contentTopic === this.buildLightPushContentTopic()) { - return; - } - - // The callback and decoder may override previous values, this is on - // purpose as the user may call `subscribe` to refresh the subscription - this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); - }); - - this.startSubscriptionsMaintenance(this.keepAliveTimeout); - - return finalResult; - } - - public async unsubscribe( - contentTopics: ContentTopic[] - ): Promise { - const peers = await this.peerManager.getPeers(); - const promises = peers.map(async (peer) => { - const response = await this.protocol.unsubscribe( - this.pubsubTopic, - peer, - contentTopics - ); - - contentTopics.forEach((contentTopic: string) => { - this.subscriptionCallbacks.delete(contentTopic); - }); - - return response; - }); - - const results = await Promise.allSettled(promises); - const finalResult = this.handleResult(results, "unsubscribe"); - - if (this.subscriptionCallbacks.size === 0) { - this.stopSubscriptionsMaintenance(); - } - - return finalResult; - } - - public async ping(peerId?: PeerId): Promise { - log.info("Sending keep-alive ping"); - const peers = peerId - ? [peerId] - : (await this.peerManager.getPeers()).map((peer) => peer.id); - - const promises = peers.map((peerId) => this.pingSpecificPeer(peerId)); - const results = await Promise.allSettled(promises); - - return this.handleResult(results, "ping"); - } - - public async unsubscribeAll(): Promise { - const peers = await this.peerManager.getPeers(); - const promises = peers.map(async (peer) => - this.protocol.unsubscribeAll(this.pubsubTopic, peer) - ); - - const results = await Promise.allSettled(promises); - - this.subscriptionCallbacks.clear(); - - const finalResult = this.handleResult(results, "unsubscribeAll"); - - this.stopSubscriptionsMaintenance(); - - return finalResult; - } - - public async processIncomingMessage( - message: WakuMessage, - peerIdStr: PeerIdStr - ): Promise { - const alreadyReceived = this.reliabilityMonitor.notifyMessageReceived( - peerIdStr, - message as IProtoMessage - ); - - if (alreadyReceived) { - log.info("Message already received, skipping"); - return; - } - - const { contentTopic } = message; - const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); - if (!subscriptionCallback) { - log.error("No subscription callback available for ", contentTopic); - return; - } - log.info( - "Processing message with content topic ", - contentTopic, - " on pubsub topic ", - this.pubsubTopic - ); - await pushMessage(subscriptionCallback, this.pubsubTopic, message); - } - - private async subscribeWithPeerVerification( - peer: Peer, - contentTopics: string[] - ): Promise { - const result = await this.protocol.subscribe( - this.pubsubTopic, - peer, - contentTopics - ); - - await this.sendLightPushCheckMessage(peer); - return result; - } - - private handleResult( - results: PromiseSettledResult[], - type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" - ): SDKProtocolResult { - const result: SDKProtocolResult = { failures: [], successes: [] }; - - for (const promiseResult of results) { - if (promiseResult.status === "rejected") { - log.error( - `Failed to resolve ${type} promise successfully: `, - promiseResult.reason - ); - result.failures.push({ error: ProtocolError.GENERIC_FAIL }); - } else { - const coreResult = promiseResult.value; - if (coreResult.failure) { - result.failures.push(coreResult.failure); - } else { - result.successes.push(coreResult.success); - } - } - } - return result; - } - - private async pingSpecificPeer(peerId: PeerId): Promise { - const peers = await this.peerManager.getPeers(); - const peer = peers.find((p) => p.id.equals(peerId)); - if (!peer) { - return { - success: null, - failure: { - peerId, - error: ProtocolError.NO_PEER_AVAILABLE - } - }; - } - - let result; - try { - result = await this.protocol.ping(peer); - } catch (error) { - result = { - success: null, - failure: { - peerId, - error: ProtocolError.GENERIC_FAIL - } - }; - } - - log.info( - `Received result from filter ping peerId:${peerId.toString()}\tsuccess:${result.success?.toString()}\tfailure:${result.failure?.error}` - ); - await this.reliabilityMonitor.handlePingResult(peerId, result); - return result; - } - - private startSubscriptionsMaintenance(timeout: number): void { - log.info("Starting subscriptions maintenance"); - this.startKeepAlivePings(timeout); - this.startConnectionListener(); - } - - private stopSubscriptionsMaintenance(): void { - log.info("Stopping subscriptions maintenance"); - this.stopKeepAlivePings(); - this.stopConnectionListener(); - } - - private startConnectionListener(): void { - this.connectionManager.addEventListener( - EConnectionStateEvents.CONNECTION_STATUS, - this.connectionListener.bind(this) as (v: CustomEvent) => void - ); - } - - private stopConnectionListener(): void { - this.connectionManager.removeEventListener( - EConnectionStateEvents.CONNECTION_STATUS, - this.connectionListener.bind(this) as (v: CustomEvent) => void - ); - } - - private async connectionListener({ - detail: isConnected - }: CustomEvent): Promise { - if (!isConnected) { - this.stopKeepAlivePings(); - return; - } - - try { - // we do nothing here, as the renewal process is managed internally by `this.ping()` - await this.ping(); - } catch (err) { - log.error(`networkStateListener failed to recover: ${err}`); - } - - this.startKeepAlivePings(this.keepAliveTimeout); - } - - private startKeepAlivePings(timeout: number): void { - if (this.keepAliveInterval) { - log.info("Recurring pings already set up."); - return; - } - - this.keepAliveInterval = setInterval(() => { - void this.ping(); - }, timeout); - } - - private stopKeepAlivePings(): void { - if (!this.keepAliveInterval) { - log.info("Already stopped recurring pings."); - return; - } - - log.info("Stopping recurring pings."); - clearInterval(this.keepAliveInterval); - this.keepAliveInterval = null; - } - - private async sendLightPushCheckMessage(peer: Peer): Promise { - if ( - this.lightPush && - this.libp2p && - this.reliabilityMonitor.shouldVerifyPeer(peer.id) - ) { - const encoder = createEncoder({ - contentTopic: this.buildLightPushContentTopic(), - pubsubTopic: this.pubsubTopic, - ephemeral: true - }); - - const message = { payload: new Uint8Array(1) }; - const protoMessage = await encoder.toProtoObj(message); - - // make a delay to be sure message is send when subscription is in place - setTimeout( - (async () => { - const result = await (this.lightPush!.protocol as LightPushCore).send( - encoder, - message, - peer - ); - this.reliabilityMonitor.notifyMessageSent(peer.id, protoMessage); - if (result.failure) { - log.error( - `failed to send lightPush ping message to peer:${peer.id.toString()}\t${result.failure.error}` - ); - return; - } - }) as () => void, - DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL - ); - } - } - - private buildLightPushContentTopic(): string { - return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`; - } -} - -async function pushMessage( - subscriptionCallback: SubscriptionCallback, - pubsubTopic: PubsubTopic, - message: WakuMessage -): Promise { - const { decoders, callback } = subscriptionCallback; - - const { contentTopic } = message; - if (!contentTopic) { - log.warn("Message has no content topic, skipping"); - return; - } - - try { - const decodePromises = decoders.map((dec) => - dec - .fromProtoObj(pubsubTopic, message as IProtoMessage) - .then((decoded) => decoded || Promise.reject("Decoding failed")) - ); - - const decodedMessage = await Promise.any(decodePromises); - - await callback(decodedMessage); - } catch (e) { - log.error("Error decoding message", e); - } -} diff --git a/packages/sdk/src/protocols/filter/subscription_monitor.ts b/packages/sdk/src/protocols/filter/subscription_monitor.ts new file mode 100644 index 0000000000..966e197f89 --- /dev/null +++ b/packages/sdk/src/protocols/filter/subscription_monitor.ts @@ -0,0 +1,287 @@ +import type { EventHandler, Peer, PeerId } from "@libp2p/interface"; +import { FilterCore } from "@waku/core"; +import type { + FilterProtocolOptions, + IConnectionManager, + ILightPush, + IProtoMessage, + Libp2p +} from "@waku/interfaces"; +import { EConnectionStateEvents } from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; + +import { PeerManager } from "../peer_manager.js"; + +// TODO(weboko): consider adding as config property or combine with maxAllowedPings +const MAX_SUBSCRIBE_ATTEMPTS = 3; + +type SubscriptionMonitorConstructorOptions = { + pubsubTopic: string; + config: FilterProtocolOptions; + libp2p: Libp2p; + connectionManager: IConnectionManager; + filter: FilterCore; + peerManager: PeerManager; + lightPush?: ILightPush; + activeSubscriptions: Map; +}; + +export class SubscriptionMonitor { + /** + * Cached peers that are in use by subscription. + * Needed to understand if they disconnect later or not. + */ + public peers: Peer[] = []; + + private isStarted: boolean = false; + + private readonly pubsubTopic: string; + private readonly config: FilterProtocolOptions; + + private readonly libp2p: Libp2p; + private readonly filter: FilterCore; + private readonly peerManager: PeerManager; + private readonly connectionManager: IConnectionManager; + private readonly activeSubscriptions: Map; + + private keepAliveIntervalId: number | undefined; + private pingFailedAttempts = new Map(); + + private receivedMessagesFormPeer = new Set(); + private receivedMessages = new Set(); + private verifiedPeers = new Set(); + + public constructor(options: SubscriptionMonitorConstructorOptions) { + this.config = options.config; + this.connectionManager = options.connectionManager; + this.filter = options.filter; + this.peerManager = options.peerManager; + this.libp2p = options.libp2p; + this.activeSubscriptions = options.activeSubscriptions; + this.pubsubTopic = options.pubsubTopic; + + this.onConnectionChange = this.onConnectionChange.bind(this); + this.onPeerConnected = this.onPeerConnected.bind(this); + this.onPeerDisconnected = this.onPeerDisconnected.bind(this); + } + + /** + * @returns content topic used for Filter verification + */ + public get reservedContentTopic(): string { + return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`; + } + + /** + * Starts: + * - recurring ping queries; + * - connection event observers; + */ + public start(): void { + if (this.isStarted) { + return; + } + + this.isStarted = true; + + this.startKeepAlive(); + this.startConnectionListener(); + this.startPeerConnectionListener(); + } + + /** + * Stops all recurring queries, event listeners or timers. + */ + public stop(): void { + if (!this.isStarted) { + return; + } + + this.isStarted = false; + + this.stopKeepAlive(); + this.stopConnectionListener(); + this.stopPeerConnectionListener(); + } + + /** + * Method to get peers that are used by particular subscription or, if initially called, peers that can be used by subscription. + * @returns array of peers + */ + public async getPeers(): Promise { + if (!this.isStarted) { + this.peers = await this.peerManager.getPeers(); + } + + return this.peers; + } + + /** + * Notifies monitor if message was received. + * + * @param peerId peer from which message is received + * @param message received message + * + * @returns true if message was received from peer + */ + public notifyMessageReceived( + peerId: string, + message: IProtoMessage + ): boolean { + const hash = this.buildMessageHash(message); + + this.verifiedPeers.add(peerId); + this.receivedMessagesFormPeer.add(`${peerId}-${hash}`); + + if (this.receivedMessages.has(hash)) { + return true; + } + + this.receivedMessages.add(hash); + + return false; + } + + private buildMessageHash(message: IProtoMessage): string { + return messageHashStr(this.pubsubTopic, message); + } + + private startConnectionListener(): void { + this.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.onConnectionChange as (v: CustomEvent) => void + ); + } + + private stopConnectionListener(): void { + this.connectionManager.removeEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.onConnectionChange as (v: CustomEvent) => void + ); + } + + private async onConnectionChange({ + detail: isConnected + }: CustomEvent): Promise { + if (!isConnected) { + this.stopKeepAlive(); + return; + } + + await Promise.all(this.peers.map((peer) => this.ping(peer, true))); + this.startKeepAlive(); + } + + private startKeepAlive(): void { + if (this.keepAliveIntervalId) { + return; + } + + this.keepAliveIntervalId = setInterval(() => { + void this.peers.map((peer) => this.ping(peer)); + }, this.config.keepAliveIntervalMs) as unknown as number; + } + + private stopKeepAlive(): void { + if (!this.keepAliveIntervalId) { + return; + } + + clearInterval(this.keepAliveIntervalId); + this.keepAliveIntervalId = undefined; + } + + private startPeerConnectionListener(): void { + this.libp2p.addEventListener( + "peer:connect", + this.onPeerConnected as EventHandler> + ); + this.libp2p.addEventListener( + "peer:disconnect", + this.onPeerDisconnected as EventHandler> + ); + } + + private stopPeerConnectionListener(): void { + this.libp2p.removeEventListener( + "peer:connect", + this.onPeerConnected as EventHandler> + ); + this.libp2p.removeEventListener( + "peer:disconnect", + this.onPeerDisconnected as EventHandler> + ); + } + + private async onPeerConnected(_event: CustomEvent): Promise { + // TODO(weboko): use config.numOfUsedPeers here + if (this.peers.length > 0) { + return; + } + + this.peers = await this.peerManager.getPeers(); + await Promise.all(this.peers.map((peer) => this.subscribe(peer))); + } + + private async onPeerDisconnected(event: CustomEvent): Promise { + const hasNotBeenUsed = !this.peers.find((p) => p.id.equals(event.detail)); + if (hasNotBeenUsed) { + return; + } + + this.peers = await this.peerManager.getPeers(); + await Promise.all(this.peers.map((peer) => this.subscribe(peer))); + } + + private async subscribe(_peer: Peer | undefined): Promise { + let peer: Peer | undefined = _peer; + + for (let i = 0; i < MAX_SUBSCRIBE_ATTEMPTS; i++) { + if (!peer) { + return; + } + + const response = await this.filter.subscribe( + this.pubsubTopic, + peer, + Array.from(this.activeSubscriptions.keys()) + ); + + if (response.success) { + return; + } + + peer = await this.peerManager.requestRenew(peer.id); + } + } + + private async ping( + peer: Peer, + renewOnFirstFail: boolean = false + ): Promise { + const peerIdStr = peer.id.toString(); + const response = await this.filter.ping(peer); + + if (response.failure && renewOnFirstFail) { + const newPeer = await this.peerManager.requestRenew(peer.id); + await this.subscribe(newPeer); + return; + } + + if (response.failure) { + const prev = this.pingFailedAttempts.get(peerIdStr) || 0; + this.pingFailedAttempts.set(peerIdStr, prev + 1); + } + + if (response.success) { + this.pingFailedAttempts.set(peerIdStr, 0); + } + + const madeAttempts = this.pingFailedAttempts.get(peerIdStr) || 0; + + if (madeAttempts >= this.config.pingsBeforePeerRenewed) { + const newPeer = await this.peerManager.requestRenew(peer.id); + await this.subscribe(newPeer); + } + } +} diff --git a/packages/sdk/src/protocols/filter/utils.ts b/packages/sdk/src/protocols/filter/utils.ts new file mode 100644 index 0000000000..9c926ae36c --- /dev/null +++ b/packages/sdk/src/protocols/filter/utils.ts @@ -0,0 +1,15 @@ +import { FilterProtocolOptions } from "@waku/interfaces"; + +import * as C from "./constants.js"; + +export const buildConfig = ( + config?: Partial +): FilterProtocolOptions => { + return { + keepAliveIntervalMs: config?.keepAliveIntervalMs || C.DEFAULT_KEEP_ALIVE, + pingsBeforePeerRenewed: + config?.pingsBeforePeerRenewed || C.DEFAULT_MAX_PINGS, + enableLightPushFilterCheck: + config?.enableLightPushFilterCheck || C.DEFAULT_LIGHT_PUSH_FILTER_CHECK + }; +}; diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts index c4d26d9ae4..4b874ce89c 100644 --- a/packages/sdk/src/protocols/peer_manager.ts +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -22,6 +22,9 @@ export class PeerManager { private readonly libp2p: Libp2p; public constructor(params: PeerManagerParams) { + this.onConnected = this.onConnected.bind(this); + this.onDisconnected = this.onDisconnected.bind(this); + this.numPeersToUse = params?.config?.numPeersToUse || DEFAULT_NUM_PEERS_TO_USE; @@ -58,7 +61,20 @@ export class PeerManager { .map((c) => this.mapConnectionToPeer(c)) ); - return result[0]; + const newPeer = result[0]; + + if (!newPeer) { + log.warn( + `requestRenew: Couldn't renew peer ${peerId.toString()} - no peers.` + ); + return; + } + + log.info( + `requestRenew: Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` + ); + + return newPeer; } private startConnectionListener(): void { @@ -107,7 +123,9 @@ export class PeerManager { } private lockConnection(c: Connection): Connection { - log.info(`Locking connection for peerId=${c.remotePeer.toString()}`); + log.info( + `requestRenew: Locking connection for peerId=${c.remotePeer.toString()}` + ); c.tags.push(CONNECTION_LOCK_TAG); return c; } diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts deleted file mode 100644 index 120d208404..0000000000 --- a/packages/sdk/src/reliability_monitor/index.ts +++ /dev/null @@ -1,56 +0,0 @@ -import type { Peer } from "@libp2p/interface"; -import { - ContentTopic, - CoreProtocolResult, - PubsubTopic -} from "@waku/interfaces"; - -import { PeerManager } from "../protocols/peer_manager.js"; - -import { ReceiverReliabilityMonitor } from "./receiver.js"; - -export class ReliabilityMonitorManager { - private static receiverMonitors: Map< - PubsubTopic, - ReceiverReliabilityMonitor - > = new Map(); - - public static createReceiverMonitor( - pubsubTopic: PubsubTopic, - peerManager: PeerManager, - getContentTopics: () => ContentTopic[], - protocolSubscribe: ( - pubsubTopic: PubsubTopic, - peer: Peer, - contentTopics: ContentTopic[] - ) => Promise, - sendLightPushMessage: (peer: Peer) => Promise - ): ReceiverReliabilityMonitor { - if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { - return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; - } - - const monitor = new ReceiverReliabilityMonitor( - pubsubTopic, - peerManager, - getContentTopics, - protocolSubscribe, - sendLightPushMessage - ); - ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); - return monitor; - } - - private constructor() {} - - public static stop(pubsubTopic: PubsubTopic): void { - this.receiverMonitors.delete(pubsubTopic); - } - - public static stopAll(): void { - for (const [pubsubTopic, monitor] of this.receiverMonitors) { - monitor.setMaxPingFailures(undefined); - this.receiverMonitors.delete(pubsubTopic); - } - } -} diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts deleted file mode 100644 index cbde6ddd3c..0000000000 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ /dev/null @@ -1,185 +0,0 @@ -import type { Peer, PeerId } from "@libp2p/interface"; -import { - ContentTopic, - CoreProtocolResult, - IProtoMessage, - PeerIdStr, - PubsubTopic -} from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; -import { Logger } from "@waku/utils"; -import { bytesToUtf8 } from "@waku/utils/bytes"; - -import { PeerManager } from "../protocols/peer_manager.js"; - -const log = new Logger("sdk:receiver:reliability_monitor"); - -const DEFAULT_MAX_PINGS = 3; -const MESSAGE_VERIFICATION_DELAY = 5_000; - -export class ReceiverReliabilityMonitor { - private receivedMessagesFormPeer = new Set(); - private receivedMessages = new Set(); - private scheduledVerification = new Map(); - private verifiedPeers = new Set(); - - private peerFailures: Map = new Map(); - private maxPingFailures: number = DEFAULT_MAX_PINGS; - private peerRenewalLocks: Set = new Set(); - - public constructor( - private readonly pubsubTopic: PubsubTopic, - private readonly peerManager: PeerManager, - private getContentTopics: () => ContentTopic[], - private protocolSubscribe: ( - pubsubTopic: PubsubTopic, - peer: Peer, - contentTopics: ContentTopic[] - ) => Promise, - private sendLightPushMessage: (peer: Peer) => Promise - ) {} - - public setMaxPingFailures(value: number | undefined): void { - if (value === undefined) { - return; - } - this.maxPingFailures = value; - } - - public async handlePingResult( - peerId: PeerId, - result?: CoreProtocolResult - ): Promise { - if (result?.success) { - this.peerFailures.delete(peerId.toString()); - return; - } - - const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; - this.peerFailures.set(peerId.toString(), failures); - - if (failures >= this.maxPingFailures) { - try { - log.info( - `Attempting to renew ${peerId.toString()} due to ping failures.` - ); - await this.renewAndSubscribePeer(peerId); - this.peerFailures.delete(peerId.toString()); - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); - } - } - } - - public notifyMessageReceived( - peerIdStr: string, - message: IProtoMessage - ): boolean { - const hash = this.buildMessageHash(message); - - this.verifiedPeers.add(peerIdStr); - this.receivedMessagesFormPeer.add(`${peerIdStr}-${hash}`); - - log.info( - `notifyMessage received debug: ephemeral:${message.ephemeral}\t${bytesToUtf8(message.payload)}` - ); - log.info(`notifyMessage received: peer:${peerIdStr}\tmessage:${hash}`); - - if (this.receivedMessages.has(hash)) { - return true; - } - - this.receivedMessages.add(hash); - - return false; - } - - public notifyMessageSent(peerId: PeerId, message: IProtoMessage): void { - const peerIdStr = peerId.toString(); - const hash = this.buildMessageHash(message); - - log.info(`notifyMessage sent debug: ${bytesToUtf8(message.payload)}`); - - if (this.scheduledVerification.has(peerIdStr)) { - log.warn( - `notifyMessage sent: attempting to schedule verification for pending peer:${peerIdStr}\tmessage:${hash}` - ); - return; - } - - const timeout = setTimeout( - (async () => { - const receivedAnyMessage = this.verifiedPeers.has(peerIdStr); - const receivedTestMessage = this.receivedMessagesFormPeer.has( - `${peerIdStr}-${hash}` - ); - - if (receivedAnyMessage || receivedTestMessage) { - log.info( - `notifyMessage sent setTimeout: verified that peer pushes filter messages, peer:${peerIdStr}\tmessage:${hash}` - ); - return; - } - - log.warn( - `notifyMessage sent setTimeout: peer didn't return probe message, attempting renewAndSubscribe, peer:${peerIdStr}\tmessage:${hash}` - ); - this.scheduledVerification.delete(peerIdStr); - await this.renewAndSubscribePeer(peerId); - }) as () => void, - MESSAGE_VERIFICATION_DELAY - ) as unknown as number; - - this.scheduledVerification.set(peerIdStr, timeout); - } - - public shouldVerifyPeer(peerId: PeerId): boolean { - const peerIdStr = peerId.toString(); - - const isPeerVerified = this.verifiedPeers.has(peerIdStr); - const isVerificationPending = this.scheduledVerification.has(peerIdStr); - - return !(isPeerVerified || isVerificationPending); - } - - private buildMessageHash(message: IProtoMessage): string { - return messageHashStr(this.pubsubTopic, message); - } - - private async renewAndSubscribePeer( - peerId: PeerId - ): Promise { - const peerIdStr = peerId.toString(); - try { - if (this.peerRenewalLocks.has(peerIdStr)) { - log.info(`Peer ${peerIdStr} is already being renewed.`); - return; - } - - this.peerRenewalLocks.add(peerIdStr); - - const newPeer = await this.peerManager.requestRenew(peerId); - if (!newPeer) { - log.warn(`Failed to renew peer ${peerIdStr}: No new peer found.`); - return; - } - - await this.protocolSubscribe( - this.pubsubTopic, - newPeer, - this.getContentTopics() - ); - - await this.sendLightPushMessage(newPeer); - - this.peerFailures.delete(peerIdStr); - - return newPeer; - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}.`); - return; - } finally { - this.peerRenewalLocks.delete(peerIdStr); - } - } -} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index bea192c202..97ab276ed8 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -3,6 +3,7 @@ import type { Peer, PeerId, Stream } from "@libp2p/interface"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core"; import type { + CreateNodeOptions, IFilter, IHealthManager, ILightPush, @@ -10,7 +11,6 @@ import type { IStore, IWaku, Libp2p, - ProtocolCreateOptions, PubsubTopic } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; @@ -20,26 +20,11 @@ import { wakuFilter } from "../protocols/filter/index.js"; import { wakuLightPush } from "../protocols/light_push/index.js"; import { PeerManager } from "../protocols/peer_manager.js"; import { wakuStore } from "../protocols/store/index.js"; -import { ReliabilityMonitorManager } from "../reliability_monitor/index.js"; import { waitForRemotePeer } from "./wait_for_remote_peer.js"; -export const DefaultUserAgent = "js-waku"; -export const DefaultPingMaxInboundStreams = 10; - const log = new Logger("waku"); -export interface WakuOptions { - /** - * Set the user agent string to be used in identification of the node. - * @default {@link @waku/core.DefaultUserAgent} - */ - userAgent?: string; -} - -export type CreateWakuNodeOptions = ProtocolCreateOptions & - Partial; - type ProtocolsEnabled = { filter?: boolean; lightpush?: boolean; @@ -59,7 +44,7 @@ export class WakuNode implements IWaku { public constructor( public readonly pubsubTopics: PubsubTopic[], - options: CreateWakuNodeOptions, + options: CreateNodeOptions, libp2p: Libp2p, protocolsEnabled: ProtocolsEnabled, relay?: IRelay @@ -116,7 +101,8 @@ export class WakuNode implements IWaku { const filter = wakuFilter( this.connectionManager, this.peerManager, - this.lightPush + this.lightPush, + options.filter ); this.filter = filter(libp2p); } @@ -200,7 +186,6 @@ export class WakuNode implements IWaku { } public async stop(): Promise { - ReliabilityMonitorManager.stopAll(); this.peerManager.stop(); this.connectionManager.stop(); await this.libp2p.stop(); diff --git a/packages/tests/src/lib/runNodes.ts b/packages/tests/src/lib/runNodes.ts index adb37170f8..b7d02d0c13 100644 --- a/packages/tests/src/lib/runNodes.ts +++ b/packages/tests/src/lib/runNodes.ts @@ -1,8 +1,4 @@ -import { - NetworkConfig, - ProtocolCreateOptions, - Protocols -} from "@waku/interfaces"; +import { CreateNodeOptions, NetworkConfig, Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; import { createLightNode, WakuNode } from "@waku/sdk"; import { @@ -46,7 +42,7 @@ export async function runNodes( }, { retries: 3 } ); - const waku_options: ProtocolCreateOptions = { + const waku_options: CreateNodeOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, networkConfig: shardInfo diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index 920bb392ba..c3e8cc7a9b 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -1,9 +1,9 @@ import { + CreateNodeOptions, DefaultNetworkConfig, IWaku, LightNode, NetworkConfig, - ProtocolCreateOptions, Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; @@ -35,7 +35,7 @@ export async function runMultipleNodes( withoutFilter ); - const wakuOptions: ProtocolCreateOptions = { + const wakuOptions: CreateNodeOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 649c302cf0..938ac16d99 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,11 +1,11 @@ import { createDecoder, createEncoder } from "@waku/core"; import { + CreateNodeOptions, DefaultNetworkConfig, ISubscription, IWaku, LightNode, NetworkConfig, - ProtocolCreateOptions, Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; @@ -85,7 +85,7 @@ export async function runMultipleNodes( withoutFilter ); - const wakuOptions: ProtocolCreateOptions = { + const wakuOptions: CreateNodeOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 37b5d6e632..a324b15459 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -115,7 +115,9 @@ describe("Wait for remote peer", function () { await delay(1000); await waku2.waitForPeers([Protocols.Store]); - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; @@ -143,7 +145,9 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waitPromise; - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); @@ -170,7 +174,9 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waku2.waitForPeers([Protocols.LightPush]); - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); @@ -197,7 +203,9 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waku2.waitForPeers([Protocols.Filter]); - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); @@ -229,7 +237,9 @@ describe("Wait for remote peer", function () { Protocols.LightPush ]); - const peers = (await waku2.getPeers()).map((peer) => peer.id.toString()); + const peers = (await waku2.getConnectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 0d81d7c8bc..8d037d1bd2 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -11,8 +11,7 @@ import { import { createRelayNode } from "@waku/relay"; import { createLightNode, - createEncoder as createPlainEncoder, - DefaultUserAgent + createEncoder as createPlainEncoder } from "@waku/sdk"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -278,7 +277,7 @@ describe("User Agent", function () { waku1UserAgent ); expect(bytesToUtf8(waku2PeerInfo.metadata.get("AgentVersion")!)).to.eq( - DefaultUserAgent + "js-waku" ); }); }); diff --git a/packages/utils/src/common/sharding/type_guards.ts b/packages/utils/src/common/sharding/type_guards.ts index b0433e11cc..9ab53373aa 100644 --- a/packages/utils/src/common/sharding/type_guards.ts +++ b/packages/utils/src/common/sharding/type_guards.ts @@ -1,11 +1,11 @@ import type { ContentTopicInfo, - ProtocolCreateOptions, + CreateNodeOptions, StaticSharding } from "@waku/interfaces"; export function isStaticSharding( - config: NonNullable + config: NonNullable ): config is StaticSharding { return ( "clusterId" in config && "shards" in config && !("contentTopics" in config) @@ -13,7 +13,7 @@ export function isStaticSharding( } export function isAutoSharding( - config: NonNullable + config: NonNullable ): config is ContentTopicInfo { return "contentTopics" in config; }