diff --git a/packages/beacon-node/src/network/core/metrics.ts b/packages/beacon-node/src/network/core/metrics.ts index c267ec069d88..e5365fc42ab2 100644 --- a/packages/beacon-node/src/network/core/metrics.ts +++ b/packages/beacon-node/src/network/core/metrics.ts @@ -1,6 +1,7 @@ import {RegistryMetricCreator} from "../../metrics/utils/registryMetricCreator.js"; import {SubnetType} from "../metadata.js"; -import {DiscoveredPeerStatus} from "../peers/discover.js"; +import {DiscoveredPeerStatus, NotDialReason} from "../peers/discover.js"; +import {PeerRequestedSubnetType} from "../peers/peerManager.js"; import {SubnetSource} from "../subnets/attnetsService.js"; export type NetworkCoreMetrics = ReturnType; @@ -31,6 +32,11 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) { help: "Histogram of current count of long lived attnets of connected peers", buckets: [0, 4, 16, 32, 64], }), + peerColumnSubnetCount: register.histogram({ + name: "lodestar_peer_column_subnet_count", + help: "Histogram of current count of column subnets of connected peers", + buckets: [0, 4, 8, 16, 32, 64, 128], + }), peerScoreByClient: register.histogram<{client: string}>({ name: "lodestar_app_peer_score", help: "Current peer score at lodestar app side", @@ -90,12 +96,12 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) { help: "Prioritization results total peers count requested to disconnect", labelNames: ["reason"], }), - peersRequestedSubnetsToQuery: register.gauge<{type: SubnetType}>({ + peersRequestedSubnetsToQuery: register.gauge<{type: PeerRequestedSubnetType}>({ name: "lodestar_peers_requested_total_subnets_to_query", help: "Prioritization results total subnets to query and discover peers in", labelNames: ["type"], }), - peersRequestedSubnetsPeerCount: register.gauge<{type: SubnetType}>({ + peersRequestedSubnetsPeerCount: register.gauge<{type: PeerRequestedSubnetType}>({ name: "lodestar_peers_requested_total_subnets_peers_count", help: "Prioritization results total peers in subnets to query and discover peers in", labelNames: ["type"], @@ -105,6 +111,11 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) { help: "network.reportPeer count by reason", labelNames: ["reason"], }), + peerCountPerSamplingColumnSubnet: register.gauge<{subnet: number}>({ + name: "lodestar_peer_count_per_sampling_column_subnet", + help: "Current count of peers per sampling column subnet", + labelNames: ["subnet"], + }), peerManager: { heartbeatDuration: register.histogram({ name: "lodestar_peer_manager_heartbeat_duration_seconds", @@ -127,11 +138,19 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) { help: "Current peers to connect count from discoverPeers requests", labelNames: ["type"], }), + subnetColumnPeersToConnect: register.gauge({ + name: "lodestar_discovery_subnet_column_peers_to_connect", + help: "Current peers to connect count from discoverPeers requests", + }), subnetsToConnect: register.gauge<{type: SubnetType}>({ name: "lodestar_discovery_subnets_to_connect", help: "Current subnets to connect count from discoverPeers requests", labelNames: ["type"], }), + columnSubnetsToConnect: register.gauge({ + name: "lodestar_discovery_column_subnets_to_connect", + help: "Current column subnets to connect count from discoverPeers requests", + }), cachedENRsSize: register.gauge({ name: "lodestar_discovery_cached_enrs_size", help: "Current size of the cachedENRs Set", @@ -155,6 +174,11 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) { help: "Total count of status results of PeerDiscovery.onDiscovered() function", labelNames: ["status"], }), + notDialReason: register.gauge<{reason: NotDialReason}>({ + name: "lodestar_discovery_not_dial_reason_total_count", + help: "Total count of not dial reasons", + labelNames: ["reason"], + }), dialAttempts: register.gauge({ name: "lodestar_discovery_total_dial_attempts", help: "Total dial attempts by peer discovery", diff --git a/packages/beacon-node/src/network/options.ts b/packages/beacon-node/src/network/options.ts index d2070873261b..64d1eae97147 100644 --- a/packages/beacon-node/src/network/options.ts +++ b/packages/beacon-node/src/network/options.ts @@ -44,4 +44,6 @@ export const defaultNetworkOptions: NetworkOptions = { beaconAttestationBatchValidation: true, // This will enable the light client server by default disableLightClientServer: false, + // for PeerDAS, this is the same to TARGET_SUBNET_PEERS, should reavaluate after devnets + targetColumnSubnetPeers: 6, }; diff --git a/packages/beacon-node/src/network/peers/discover.ts b/packages/beacon-node/src/network/peers/discover.ts index 7a9487a3b02d..29886aaf7d19 100644 --- a/packages/beacon-node/src/network/peers/discover.ts +++ b/packages/beacon-node/src/network/peers/discover.ts @@ -6,7 +6,7 @@ import {BeaconConfig} from "@lodestar/config"; import {pruneSetToMax, sleep} from "@lodestar/utils"; import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params"; import {LoggerNode} from "@lodestar/logger/node"; -import {ssz} from "@lodestar/types"; +import {ColumnIndex} from "@lodestar/types"; import {bytesToInt} from "@lodestar/utils"; import {NetworkCoreMetrics} from "../core/metrics.js"; import {Libp2p} from "../interface.js"; @@ -18,6 +18,7 @@ import {NodeId, computeNodeId} from "../subnets/interface.js"; import {getDataColumnSubnets} from "../../util/dataColumns.js"; import {deserializeEnrSubnets, zeroAttnets, zeroSyncnets} from "./utils/enrSubnetsDeserialize.js"; import {IPeerRpcScoreStore, ScoreState} from "./score/index.js"; +import {ColumnSubnetId} from "./peersData.js"; /** Max number of cached ENRs after discovering a good peer */ const MAX_CACHED_ENRS = 100; @@ -30,6 +31,7 @@ export type PeerDiscoveryOpts = { discv5: LodestarDiscv5Opts; connectToDiscv5Bootnodes?: boolean; // experimental flags for debugging + // TODO-das: remove onlyConnectToBiggerDataNodes?: boolean; onlyConnectToMinimalCustodyOverlapNodes?: boolean; }; @@ -62,6 +64,11 @@ export enum DiscoveredPeerStatus { no_multiaddrs = "no_multiaddrs", } +export enum NotDialReason { + not_contain_requested_subnet_sampling_columns = "not_contain_requested_subnet_sampling_columns", + not_contain_requested_attnet_syncnet_subnets = "not_contain_requested_attnet_syncnet_subnets", +} + type UnixMs = number; /** * Maintain peersToConnect to avoid having too many topic peers at some point. @@ -80,12 +87,17 @@ export type SubnetDiscvQueryMs = { maxPeersToDiscover: number; }; +/** + * A map of column subnet id to maxPeersToDiscover + */ +type ColumnSubnetQueries = Map; + type CachedENR = { peerId: PeerId; multiaddrTCP: Multiaddr; subnets: Record; addedUnixMs: number; - custodySubnetCount: number; + peerCustodySubnets: number[]; }; /** @@ -95,6 +107,7 @@ type CachedENR = { export class PeerDiscovery { readonly discv5: Discv5Worker; private libp2p: Libp2p; + // TODO-das: remove nodeId and sampleSubnets once we remove onlyConnect* flag private nodeId: NodeId; private sampleSubnets: number[]; private peerRpcScores: IPeerRpcScoreStore; @@ -102,20 +115,20 @@ export class PeerDiscovery { private logger: LoggerNode; private config: BeaconConfig; private cachedENRs = new Map(); - private peerIdToCustodySubnetCount = new Map(); private randomNodeQuery: QueryStatus = {code: QueryStatusCode.NotActive}; private peersToConnect = 0; private subnetRequests: Record> = { attnets: new Map(), syncnets: new Map(), }; + // map of column subnet to max number of peers to connect + private columnSubnetRequests: Map; - /** The maximum number of peers we allow (exceptions for subnet peers) */ - private maxPeers: number; private discv5StartMs: number; private discv5FirstQueryDelayMs: number; private connectToDiscv5BootnodesOnStart: boolean | undefined = false; + // TODO-das: remove private onlyConnectToBiggerDataNodes: boolean | undefined = false; private onlyConnectToMinimalCustodyOverlapNodes: boolean | undefined = false; @@ -127,14 +140,15 @@ export class PeerDiscovery { this.logger = logger; this.config = config; this.discv5 = discv5; + // TODO-das: remove this.nodeId = nodeId; // we will only connect to peers that can provide us custody this.sampleSubnets = getDataColumnSubnets( nodeId, Math.max(config.CUSTODY_REQUIREMENT, config.NODE_CUSTODY_REQUIREMENT, config.SAMPLES_PER_SLOT) ); + this.columnSubnetRequests = new Map(); - this.maxPeers = opts.maxPeers; this.discv5StartMs = 0; this.discv5StartMs = Date.now(); this.discv5FirstQueryDelayMs = opts.discv5FirstQueryDelayMs; @@ -167,6 +181,13 @@ export class PeerDiscovery { metrics.discovery.cachedENRsSize.addCollect(() => { metrics.discovery.cachedENRsSize.set(this.cachedENRs.size); metrics.discovery.peersToConnect.set(this.peersToConnect); + + // PeerDAS metrics + const columnSubnetsToConnect = Array.from(this.columnSubnetRequests.values()); + const subnetColumnPeersToConnect = columnSubnetsToConnect.reduce((acc, elem) => acc + elem, 0); + metrics.discovery.subnetColumnPeersToConnect.set(subnetColumnPeersToConnect); + metrics.discovery.columnSubnetsToConnect.set(columnSubnetsToConnect.filter((elem) => elem > 0).length); + for (const type of [SubnetType.attnets, SubnetType.syncnets]) { const subnetPeersToConnect = Array.from(this.subnetRequests[type].values()).reduce( (acc, {peersToConnect}) => acc + peersToConnect, @@ -200,7 +221,11 @@ export class PeerDiscovery { /** * Request to find peers, both on specific subnets and in general */ - discoverPeers(peersToConnect: number, subnetRequests: SubnetDiscvQueryMs[] = []): void { + discoverPeers( + peersToConnect: number, + columnSubnetRequests: ColumnSubnetQueries, + subnetRequests: SubnetDiscvQueryMs[] = [] + ): void { const subnetsToDiscoverPeers: SubnetDiscvQueryMs[] = []; const cachedENRsToDial = new Map(); // Iterate in reverse to consider first the most recent ENRs @@ -226,15 +251,40 @@ export class PeerDiscovery { this.peersToConnect += peersToConnect; + // starting from PeerDAS, we need to prioritize column subnet peers first in order to have stable subnet sampling + const columnSubnetsToDiscover = new Set(); + let columnPeersToDiscover = 0; + column: for (const [subnet, maxPeersToConnect] of columnSubnetRequests) { + let cachedENRsInColumn = 0; + for (const cachedENR of cachedENRsReverse) { + if (cachedENR.peerCustodySubnets.includes(subnet)) { + cachedENRsToDial.set(cachedENR.peerId.toString(), cachedENR); + + if (++cachedENRsInColumn >= maxPeersToConnect) { + continue column; + } + } + + const columnPeersToConnect = Math.max(maxPeersToConnect - cachedENRsInColumn, 0); + this.columnSubnetRequests.set(subnet, columnPeersToConnect); + columnSubnetsToDiscover.add(subnet); + columnPeersToDiscover += columnPeersToConnect; + } + } + subnet: for (const subnetRequest of subnetRequests) { // Get cached ENRs from the discovery service that are in the requested `subnetId`, but not connected yet let cachedENRsInSubnet = 0; - for (const cachedENR of cachedENRsReverse) { - if (cachedENR.subnets[subnetRequest.type][subnetRequest.subnet]) { - cachedENRsToDial.set(cachedENR.peerId.toString(), cachedENR); - if (++cachedENRsInSubnet >= subnetRequest.maxPeersToDiscover) { - continue subnet; + // only dial attnet/syncnet peers if subnet sampling peers are stable + if (columnPeersToDiscover === 0) { + for (const cachedENR of cachedENRsReverse) { + if (cachedENR.subnets[subnetRequest.type][subnetRequest.subnet]) { + cachedENRsToDial.set(cachedENR.peerId.toString(), cachedENR); + + if (++cachedENRsInSubnet >= subnetRequest.maxPeersToDiscover) { + continue subnet; + } } } } @@ -282,6 +332,8 @@ export class PeerDiscovery { peersToConnect, peersAvailableToDial: cachedENRsToDial.size, subnetsToDiscover: subnetsToDiscoverPeers.length, + columnSubnetsToDiscover: Array.from(columnSubnetsToDiscover).join(","), + columnPeersToDiscover, shouldRunFindRandomNodeQuery, }); } @@ -336,12 +388,8 @@ export class PeerDiscovery { const attnets = zeroAttnets; const syncnets = zeroSyncnets; - const custodySubnetCount = this.peerIdToCustodySubnetCount.get(id.toString()); - if (custodySubnetCount === undefined) { - this.logger.warn("onDiscoveredPeer with unknown custodySubnetCount assuming 4", {peerId: id.toString()}); - } - const status = this.handleDiscoveredPeer(id, multiaddrs[0], attnets, syncnets, custodySubnetCount ?? 4); + const status = this.handleDiscoveredPeer(id, multiaddrs[0], attnets, syncnets, undefined); this.logger.debug("Discovered peer via libp2p", {peer: prettyPrintPeerId(id), status}); this.metrics?.discovery.discoveredStatus.inc({status}); }; @@ -376,13 +424,16 @@ export class PeerDiscovery { // never throw and treat too long or too short bitfields as zero-ed const attnets = attnetsBytes ? deserializeEnrSubnets(attnetsBytes, ATTESTATION_SUBNET_COUNT) : zeroAttnets; const syncnets = syncnetsBytes ? deserializeEnrSubnets(syncnetsBytes, SYNC_COMMITTEE_SUBNET_COUNT) : zeroSyncnets; - const custodySubnetCount = custodySubnetCountBytes - ? bytesToInt(custodySubnetCountBytes, "be") - : this.config.CUSTODY_REQUIREMENT; - this.peerIdToCustodySubnetCount.set(peerId.toString(), custodySubnetCount); - - const status = this.handleDiscoveredPeer(peerId, multiaddrTCP, attnets, syncnets, custodySubnetCount); - this.logger.debug("Discovered peer via discv5", {peer: prettyPrintPeerId(peerId), status}); + const custodySubnetCount = custodySubnetCountBytes ? bytesToInt(custodySubnetCountBytes, "be") : undefined; + + const status = this.handleDiscoveredPeer( + peerId, + multiaddrTCP, + attnets, + syncnets, + custodySubnetCount + ); + this.logger.debug("Discovered peer via discv5", {peer: prettyPrintPeerId(peerId), status, custodySubnetCount}); this.metrics?.discovery.discoveredStatus.inc({status}); }; @@ -394,7 +445,7 @@ export class PeerDiscovery { multiaddrTCP: Multiaddr, attnets: boolean[], syncnets: boolean[], - custodySubnetCount: number + custodySubnetCount?: number ): DiscoveredPeerStatus { const nodeId = computeNodeId(peerId); this.logger.warn("handleDiscoveredPeer", {nodeId: toHexString(nodeId), peerId: peerId.toString()}); @@ -424,7 +475,7 @@ export class PeerDiscovery { multiaddrTCP, subnets: {attnets, syncnets}, addedUnixMs: Date.now(), - custodySubnetCount, + peerCustodySubnets: getDataColumnSubnets(nodeId, custodySubnetCount ?? this.config.CUSTODY_REQUIREMENT), }; // Only dial peer if necessary @@ -445,9 +496,11 @@ export class PeerDiscovery { } private shouldDialPeer(peer: CachedENR): boolean { + // begin onlyConnect* experimental logic + // TODO-das: remove const nodeId = computeNodeId(peer.peerId); - const peerCustodySubnetCount = peer.custodySubnetCount; - const peerCustodySubnets = getDataColumnSubnets(nodeId, peerCustodySubnetCount); + const {peerCustodySubnets} = peer; + const peerCustodySubnetCount = peerCustodySubnets.length; const matchingSubnetsNum = this.sampleSubnets.reduce( (acc, elem) => acc + (peerCustodySubnets.includes(elem) ? 1 : 0), @@ -468,10 +521,36 @@ export class PeerDiscovery { if (this.onlyConnectToBiggerDataNodes && !hasAllColumns) { return false; } + if (this.onlyConnectToMinimalCustodyOverlapNodes && !hasMinCustodyMatchingColumns) { return false; } + // end onlyConnect* experimental logic + + // starting from PeerDAS fork, we need to make sure we have stable subnet sampling peers first + // given CUSTODY_REQUIREMENT = 4 and 100 peers, we have 400 custody columns from peers + // with NUMBER_OF_COLUMNS = 128, we have 400 / 128 = 3.125 peers per column in average + // it would not be hard to find TARGET_SUBNET_PEERS(6) peers per SAMPLES_PER_SLOT(8) columns + // after some first heartbeats, we should have no more column requested, then go with conditions of prior forks + let hasMatchingColumn = false; + let columnRequestCount = 0; + for (const [column, peersToConnect] of this.columnSubnetRequests.entries()) { + if (peersToConnect <= 0) { + this.columnSubnetRequests.delete(column); + } else if (peerCustodySubnets.includes(column)) { + this.columnSubnetRequests.set(column, Math.max(0, peersToConnect - 1)); + hasMatchingColumn = true; + columnRequestCount += peersToConnect; + } + } + + // if subnet sampling peers are not stable and this peer is not in the requested columns, ignore it + if (columnRequestCount > 0 && !hasMatchingColumn) { + this.metrics?.discovery.notDialReason.inc({reason: NotDialReason.not_contain_requested_subnet_sampling_columns}); + return false; + } + // logics up to Deneb fork for (const type of [SubnetType.attnets, SubnetType.syncnets]) { for (const [subnet, {toUnixMs, peersToConnect}] of this.subnetRequests[type].entries()) { if (toUnixMs < Date.now() || peersToConnect === 0) { @@ -497,6 +576,7 @@ export class PeerDiscovery { return true; } + this.metrics?.discovery.notDialReason.inc({reason: NotDialReason.not_contain_requested_attnet_syncnet_subnets}); return false; } diff --git a/packages/beacon-node/src/network/peers/peerManager.ts b/packages/beacon-node/src/network/peers/peerManager.ts index 1bb1b36ab1e5..d1c529f40725 100644 --- a/packages/beacon-node/src/network/peers/peerManager.ts +++ b/packages/beacon-node/src/network/peers/peerManager.ts @@ -19,7 +19,7 @@ import {NetworkCoreMetrics} from "../core/metrics.js"; import {LodestarDiscv5Opts} from "../discv5/types.js"; import {getDataColumnSubnets, getDataColumns} from "../../util/dataColumns.js"; import {PeerDiscovery, SubnetDiscvQueryMs} from "./discover.js"; -import {PeersData, PeerData} from "./peersData.js"; +import {PeersData, PeerData, ColumnSubnetId} from "./peersData.js"; import {getKnownClientFromAgentVersion, ClientKind} from "./client.js"; import { getConnectedPeerIds, @@ -27,6 +27,7 @@ import { assertPeerRelevance, prioritizePeers, renderIrrelevantPeerType, + PrioritizePeersOpts, } from "./utils/index.js"; import {IPeerRpcScoreStore, PeerAction, PeerScoreStats, ScoreState, updateGossipsubScores} from "./score/index.js"; @@ -65,10 +66,6 @@ const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR = 0.1; // to terminal if it deviates significantly from the user's settings export type PeerManagerOpts = { - /** The target number of peers we would like to connect to. */ - targetPeers: number; - /** The maximum number of peers we allow (exceptions for subnet peers) */ - maxPeers: number; /** * Delay the 1st query after starting discv5 * See https://github.com/ChainSafe/lodestar/issues/3423 @@ -83,9 +80,10 @@ export type PeerManagerOpts = { */ connectToDiscv5Bootnodes?: boolean; // experimental flags for debugging + // TODO-das: remove onlyConnectToBiggerDataNodes?: boolean; onlyConnectToMinimalCustodyOverlapNodes?: boolean; -}; +} & PrioritizePeersOpts; /** * ReqResp methods used only be PeerManager, so the main thread never has to call them @@ -114,6 +112,8 @@ export type PeerManagerModules = { statusCache: StatusCache; }; +export type PeerRequestedSubnetType = SubnetType | "column"; + type PeerIdStr = string; enum RelevantPeerStatus { @@ -132,7 +132,7 @@ enum RelevantPeerStatus { */ export class PeerManager { private nodeId: NodeId; - private sampleSubnets: number[]; + private sampleSubnets: ColumnSubnetId[]; private readonly libp2p: Libp2p; private readonly logger: LoggerNode; private readonly metrics: NetworkCoreMetrics | null; @@ -332,15 +332,21 @@ export class PeerManager { }); if (peerData) { const oldMetadata = peerData.metadata; + const csc = (metadata as Partial).csc ?? this.config.CUSTODY_REQUIREMENT; + const nodeId = peerData?.nodeId ?? computeNodeId(peer); + const custodySubnets = (csc !== oldMetadata?.csc || oldMetadata?.custodySubnets == null)? getDataColumnSubnets(nodeId, csc) : oldMetadata?.custodySubnets; peerData.metadata = { seqNumber: metadata.seqNumber, attnets: metadata.attnets, syncnets: (metadata as Partial).syncnets ?? BitArray.fromBitLen(SYNC_COMMITTEE_SUBNET_COUNT), csc: (metadata as Partial).csc ?? - this.discovery?.["peerIdToCustodySubnetCount"].get(peer.toString()) ?? + // TODO: from peerDAS, all metadata should have csc field + // may consider to disconnect them to have more qualified peers this.config.CUSTODY_REQUIREMENT, + custodySubnets, }; + // request status again in order to update network's datacolumns of this peer if (oldMetadata === null || oldMetadata.csc !== peerData.metadata.csc) { void this.requestStatus(peer, this.statusCache.get()); } @@ -413,7 +419,8 @@ export class PeerManager { const custodySubnetCount = peerData?.metadata?.csc; const peerCustodySubnetCount = custodySubnetCount ?? this.config.CUSTODY_REQUIREMENT; - const peerCustodySubnets = getDataColumnSubnets(nodeId, peerCustodySubnetCount); + // on metadata, we should have custodySubnets + const peerCustodySubnets = peerData?.metadata?.custodySubnets ?? getDataColumnSubnets(nodeId, peerCustodySubnetCount); const matchingSubnetsNum = this.sampleSubnets.reduce( (acc, elem) => acc + (peerCustodySubnets.includes(elem) ? 1 : 0), @@ -429,6 +436,7 @@ export class PeerManager { peerId: peer.toString(), custodySubnetCount, hasAllColumns, + matchingSubnetsNum, peerCustodySubnets: peerCustodySubnets.join(" "), mySampleSubnets: this.sampleSubnets.join(" "), clientAgent, @@ -450,7 +458,7 @@ export class PeerManager { return; } - // coule be optimized by directly using the previously calculated subnet + // TODO: could be optimized by directly using the previously calculated subnet const dataColumns = getDataColumns(nodeId, peerCustodySubnetCount); this.networkEventBus.emit(NetworkEvent.peerConnected, { peer: peer.toString(), @@ -530,7 +538,7 @@ export class PeerManager { } } - const {peersToDisconnect, peersToConnect, attnetQueries, syncnetQueries} = prioritizePeers( + const {peersToDisconnect, peersToConnect, attnetQueries, syncnetQueries, columnSubnetQueries} = prioritizePeers( connectedHealthyPeers.map((peer) => { const peerData = this.connectedPeers.get(peer.toString()); return { @@ -538,13 +546,16 @@ export class PeerManager { direction: peerData?.direction ?? null, attnets: peerData?.metadata?.attnets ?? null, syncnets: peerData?.metadata?.syncnets ?? null, + custodySubnets: peerData?.metadata?.custodySubnets ?? null, score: this.peerRpcScores.getScore(peer), }; }), // Collect subnets which we need peers for in the current slot this.attnetsService.getActiveSubnets(), this.syncnetsService.getActiveSubnets(), - this.opts + this.sampleSubnets, + this.opts, + this.metrics ); const queriesMerged: SubnetDiscvQueryMs[] = []; @@ -569,6 +580,11 @@ export class PeerManager { } } + for (const maxPeersToDiscover of columnSubnetQueries.values()) { + this.metrics?.peersRequestedSubnetsToQuery.inc({type: "column"}, 1); + this.metrics?.peersRequestedSubnetsPeerCount.inc({type: "column"}, maxPeersToDiscover); + } + // disconnect first to have more slots before we dial new peers for (const [reason, peers] of peersToDisconnect) { this.metrics?.peersRequestedToDisconnect.inc({reason}, peers.length); @@ -580,7 +596,8 @@ export class PeerManager { if (this.discovery) { try { this.metrics?.peersRequestedToConnect.inc(peersToConnect); - this.discovery.discoverPeers(peersToConnect, queriesMerged); + // for PeerDAS, lodestar implements subnet sampling strategy, hence we need to issue columnSubnetQueries to PeerDiscovery + this.discovery.discoverPeers(peersToConnect, columnSubnetQueries, queriesMerged); } catch (e) { this.logger.error("Error on discoverPeers", {}, e as Error); } @@ -785,6 +802,7 @@ export class PeerManager { // TODO: Consider optimizing by doing observe in batch metrics.peerLongLivedAttnets.observe(attnets ? attnets.getTrueBitIndexes().length : 0); + metrics.peerColumnSubnetCount.observe(peerData?.metadata?.csc ?? 0); metrics.peerScoreByClient.observe({client}, this.peerRpcScores.getScore(peerId)); metrics.peerGossipScoreByClient.observe({client}, this.peerRpcScores.getGossipScore(peerId)); metrics.peerConnectionLength.observe((now - openCnx.timeline.open) / 1000); diff --git a/packages/beacon-node/src/network/peers/peersData.ts b/packages/beacon-node/src/network/peers/peersData.ts index b117e817e91e..311f031c306c 100644 --- a/packages/beacon-node/src/network/peers/peersData.ts +++ b/packages/beacon-node/src/network/peers/peersData.ts @@ -5,6 +5,8 @@ import {NodeId} from "../subnets/interface.js"; import {ClientKind} from "./client.js"; type PeerIdStr = string; +export type ColumnSubnetId = number; +type Metadata = peerdas.Metadata & {custodySubnets: ColumnSubnetId[]}; export enum RelevantPeerStatus { Unknown = "unknown", @@ -20,7 +22,7 @@ export type PeerData = { direction: "inbound" | "outbound"; peerId: PeerId; nodeId: NodeId | null; - metadata: peerdas.Metadata | null; + metadata: Metadata | null; agentVersion: string | null; agentClient: ClientKind | null; encodingPreference: Encoding | null; diff --git a/packages/beacon-node/src/network/peers/utils/prioritizePeers.ts b/packages/beacon-node/src/network/peers/utils/prioritizePeers.ts index b6fe4cb6bc77..f2b7bc5e3629 100644 --- a/packages/beacon-node/src/network/peers/utils/prioritizePeers.ts +++ b/packages/beacon-node/src/network/peers/utils/prioritizePeers.ts @@ -5,6 +5,8 @@ import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/p import {MapDef} from "@lodestar/utils"; import {shuffle} from "../../../util/shuffle.js"; import {sortBy} from "../../../util/sortBy.js"; +import {ColumnSubnetId} from "../peersData.js"; +import {NetworkCoreMetrics} from "../../core/metrics.js"; import {RequestedSubnet} from "./subnetMap.js"; /** Target number of peers we'd like to have connected to a given long-lived subnet */ @@ -39,23 +41,29 @@ const attnetsZero = BitArray.fromBitLen(ATTESTATION_SUBNET_COUNT); const syncnetsZero = BitArray.fromBitLen(SYNC_COMMITTEE_SUBNET_COUNT); type SubnetDiscvQuery = {subnet: number; toSlot: number; maxPeersToDiscover: number}; +/** + * A map of column subnet id to maxPeersToDiscover + */ +type ColumnSubnetQueries = Map; type PeerInfo = { id: PeerId; direction: Direction | null; attnets: phase0.AttestationSubnets; syncnets: altair.SyncSubnets; + custodySubnets: ColumnSubnetId[]; attnetsTrueBitIndices: number[]; syncnetsTrueBitIndices: number[]; score: number; }; -export interface PrioritizePeersOpts { +export type PrioritizePeersOpts = { targetPeers: number; maxPeers: number; + targetColumnSubnetPeers: number; outboundPeersRatio?: number; targetSubnetPeers?: number; -} +}; export enum ExcessPeerDisconnectReason { LOW_SCORE = "low_score", @@ -77,16 +85,20 @@ export function prioritizePeers( direction: Direction | null; attnets: phase0.AttestationSubnets | null; syncnets: altair.SyncSubnets | null; + custodySubnets: ColumnSubnetId[] | null; score: number; }[], activeAttnets: RequestedSubnet[], activeSyncnets: RequestedSubnet[], - opts: PrioritizePeersOpts + sampleSubnets: ColumnSubnetId[], + opts: PrioritizePeersOpts, + metrics: NetworkCoreMetrics | null ): { peersToConnect: number; peersToDisconnect: Map; attnetQueries: SubnetDiscvQuery[]; syncnetQueries: SubnetDiscvQuery[]; + columnSubnetQueries: ColumnSubnetQueries; } { const {targetPeers, maxPeers} = opts; @@ -100,17 +112,20 @@ export function prioritizePeers( direction: peer.direction, attnets: peer.attnets ?? attnetsZero, syncnets: peer.syncnets ?? syncnetsZero, + custodySubnets: peer.custodySubnets ?? [], attnetsTrueBitIndices: peer.attnets?.getTrueBitIndexes() ?? [], syncnetsTrueBitIndices: peer.syncnets?.getTrueBitIndexes() ?? [], score: peer.score, }) ); - const {attnetQueries, syncnetQueries, dutiesByPeer} = requestAttnetPeers( + const {attnetQueries, syncnetQueries, columnSubnetQueries, dutiesByPeer} = requestSubnetPeers( connectedPeers, activeAttnets, activeSyncnets, - opts + sampleSubnets, + opts, + metrics ); const connectedPeerCount = connectedPeers.length; @@ -134,20 +149,24 @@ export function prioritizePeers( peersToDisconnect, attnetQueries, syncnetQueries, + columnSubnetQueries, }; } /** - * If more peers are needed in attnets and syncnets, create SubnetDiscvQuery for each subnet + * If more peers are needed in attnets and syncnets and column subnets, create SubnetDiscvQuery for each subnet */ -function requestAttnetPeers( +function requestSubnetPeers( connectedPeers: PeerInfo[], activeAttnets: RequestedSubnet[], activeSyncnets: RequestedSubnet[], - opts: PrioritizePeersOpts + samplingSubnets: ColumnSubnetId[], + opts: PrioritizePeersOpts, + metrics: NetworkCoreMetrics | null ): { attnetQueries: SubnetDiscvQuery[]; syncnetQueries: SubnetDiscvQuery[]; + columnSubnetQueries: ColumnSubnetQueries; dutiesByPeer: Map; } { const {targetSubnetPeers = TARGET_SUBNET_PEERS} = opts; @@ -202,6 +221,9 @@ function requestAttnetPeers( for (const {subnet, toSlot} of activeSyncnets) { const peersInSubnet = peersPerSubnet.get(subnet) ?? 0; + // there could be in runPeerCountMetrics() of PeerManager + // however set it here since we have the data + metrics?.peerCountPerSamplingColumnSubnet.set({subnet}, peersInSubnet); if (peersInSubnet < targetSubnetPeers) { // We need more peers syncnetQueries.push({subnet, toSlot, maxPeersToDiscover: targetSubnetPeers - peersInSubnet}); @@ -209,7 +231,26 @@ function requestAttnetPeers( } } - return {attnetQueries, syncnetQueries, dutiesByPeer}; + // column subnets, do we need queries for more peers + const targetColumnSubnetPeers = opts.targetColumnSubnetPeers; + const columnSubnetQueries: ColumnSubnetQueries = new Map(); + const peersPerColumnSubnet = new Map(); + for (const peer of connectedPeers) { + const {custodySubnets} = peer; + for (const columnSubnet of custodySubnets) { + peersPerColumnSubnet.set(columnSubnet, 1 + (peersPerColumnSubnet.get(columnSubnet) ?? 0)); + } + } + + for (const columnSubnet of samplingSubnets) { + const peersInColumnSubnet = peersPerColumnSubnet.get(columnSubnet) ?? 0; + if (peersInColumnSubnet < targetColumnSubnetPeers) { + // We need more peers + columnSubnetQueries.set(columnSubnet, targetColumnSubnetPeers - peersInColumnSubnet); + } + } + + return {attnetQueries, syncnetQueries, columnSubnetQueries, dutiesByPeer}; } /** diff --git a/packages/cli/src/options/beaconNodeOptions/network.ts b/packages/cli/src/options/beaconNodeOptions/network.ts index 60268f4a2be4..148f35f71527 100644 --- a/packages/cli/src/options/beaconNodeOptions/network.ts +++ b/packages/cli/src/options/beaconNodeOptions/network.ts @@ -39,6 +39,7 @@ export type NetworkArgs = { "network.maxGossipTopicConcurrency"?: number; "network.useWorker"?: boolean; "network.maxYoungGenerationSizeMb"?: number; + "network.targetColumnSubnetPeers"?: number; /** @deprecated This option is deprecated and should be removed in next major release. */ "network.requestCountPeerLimit"?: number; @@ -160,6 +161,7 @@ export function parseArgs(args: NetworkArgs): IBeaconNodeOptions["network"] { maxGossipTopicConcurrency: args["network.maxGossipTopicConcurrency"], useWorker: args["network.useWorker"], maxYoungGenerationSizeMb: args["network.maxYoungGenerationSizeMb"], + targetColumnSubnetPeers: args["network.targetColumnSubnetPeers"] ?? defaultOptions.network.targetColumnSubnetPeers, }; } @@ -414,4 +416,12 @@ export const options: CliCommandOptions = { description: "Max size of young generation in megabytes. Defaults to 152mb", defaultDescription: String(defaultOptions.network.maxYoungGenerationSizeMb), }, + + "network.targetColumnSubnetPeers": { + type: "number", + hidden: true, + group: "network", + description: "Target number of peers per sampling column subnet", + defaultDescription: String(defaultOptions.network.targetColumnSubnetPeers), + }, }; diff --git a/packages/cli/test/unit/options/beaconNodeOptions.test.ts b/packages/cli/test/unit/options/beaconNodeOptions.test.ts index d74ae73b966f..26ebe7d11d9f 100644 --- a/packages/cli/test/unit/options/beaconNodeOptions.test.ts +++ b/packages/cli/test/unit/options/beaconNodeOptions.test.ts @@ -104,6 +104,7 @@ describe("options / beaconNodeOptions", () => { "network.maxGossipTopicConcurrency": 64, "network.useWorker": true, "network.maxYoungGenerationSizeMb": 152, + "network.targetColumnSubnetPeers": 12, "sync.isSingleNode": true, "sync.disableProcessAsChainSegment": true, @@ -215,6 +216,7 @@ describe("options / beaconNodeOptions", () => { maxGossipTopicConcurrency: 64, useWorker: true, maxYoungGenerationSizeMb: 152, + targetColumnSubnetPeers: 12, }, sync: { isSingleNode: true,