Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use subnet request mechanism for subnet sampling strategy #7274

Open
wants to merge 5 commits into
base: peerDAS
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions packages/beacon-node/src/network/core/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {RegistryMetricCreator} from "../../metrics/utils/registryMetricCreator.js";
import {SubnetType} from "../metadata.js";
import {DiscoveredPeerStatus} from "../peers/discover.js";
import {DiscoveredPeerStatus, NotDialReason} from "../peers/discover.js";
import {PeerRequestedSubnetType} from "../peers/peerManager.js";
import {SubnetSource} from "../subnets/attnetsService.js";

export type NetworkCoreMetrics = ReturnType<typeof createNetworkCoreMetrics>;
Expand Down Expand Up @@ -31,6 +32,11 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "Histogram of current count of long lived attnets of connected peers",
buckets: [0, 4, 16, 32, 64],
}),
peerColumnSubnetCount: register.histogram({
name: "lodestar_peer_column_subnet_count",
help: "Histogram of current count of column subnets of connected peers",
buckets: [0, 4, 8, 16, 32, 64, 128],
}),
peerScoreByClient: register.histogram<{client: string}>({
name: "lodestar_app_peer_score",
help: "Current peer score at lodestar app side",
Expand Down Expand Up @@ -90,12 +96,12 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "Prioritization results total peers count requested to disconnect",
labelNames: ["reason"],
}),
peersRequestedSubnetsToQuery: register.gauge<{type: SubnetType}>({
peersRequestedSubnetsToQuery: register.gauge<{type: PeerRequestedSubnetType}>({
name: "lodestar_peers_requested_total_subnets_to_query",
help: "Prioritization results total subnets to query and discover peers in",
labelNames: ["type"],
}),
peersRequestedSubnetsPeerCount: register.gauge<{type: SubnetType}>({
peersRequestedSubnetsPeerCount: register.gauge<{type: PeerRequestedSubnetType}>({
name: "lodestar_peers_requested_total_subnets_peers_count",
help: "Prioritization results total peers in subnets to query and discover peers in",
labelNames: ["type"],
Expand All @@ -105,6 +111,11 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "network.reportPeer count by reason",
labelNames: ["reason"],
}),
peerCountPerSamplingColumnSubnet: register.gauge<{subnet: number}>({
name: "lodestar_peer_count_per_sampling_column_subnet",
help: "Current count of peers per sampling column subnet",
labelNames: ["subnet"],
}),
peerManager: {
heartbeatDuration: register.histogram({
name: "lodestar_peer_manager_heartbeat_duration_seconds",
Expand All @@ -127,11 +138,19 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "Current peers to connect count from discoverPeers requests",
labelNames: ["type"],
}),
subnetColumnPeersToConnect: register.gauge({
name: "lodestar_discovery_subnet_column_peers_to_connect",
help: "Current peers to connect count from discoverPeers requests",
}),
subnetsToConnect: register.gauge<{type: SubnetType}>({
name: "lodestar_discovery_subnets_to_connect",
help: "Current subnets to connect count from discoverPeers requests",
labelNames: ["type"],
}),
columnSubnetsToConnect: register.gauge({
name: "lodestar_discovery_column_subnets_to_connect",
help: "Current column subnets to connect count from discoverPeers requests",
}),
cachedENRsSize: register.gauge({
name: "lodestar_discovery_cached_enrs_size",
help: "Current size of the cachedENRs Set",
Expand All @@ -155,6 +174,11 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "Total count of status results of PeerDiscovery.onDiscovered() function",
labelNames: ["status"],
}),
notDialReason: register.gauge<{reason: NotDialReason}>({
name: "lodestar_discovery_not_dial_reason_total_count",
help: "Total count of not dial reasons",
labelNames: ["reason"],
}),
dialAttempts: register.gauge({
name: "lodestar_discovery_total_dial_attempts",
help: "Total dial attempts by peer discovery",
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/network/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ export const defaultNetworkOptions: NetworkOptions = {
beaconAttestationBatchValidation: true,
// This will enable the light client server by default
disableLightClientServer: false,
// for PeerDAS, this is the same to TARGET_SUBNET_PEERS, should reavaluate after devnets
targetColumnSubnetPeers: 6,
};
157 changes: 95 additions & 62 deletions packages/beacon-node/src/network/peers/discover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {BeaconConfig} from "@lodestar/config";
import {pruneSetToMax, sleep} from "@lodestar/utils";
import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {LoggerNode} from "@lodestar/logger/node";
import {ssz} from "@lodestar/types";
import {ColumnIndex} from "@lodestar/types";
import {bytesToInt} from "@lodestar/utils";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {Libp2p} from "../interface.js";
Expand All @@ -18,6 +18,7 @@ import {NodeId, computeNodeId} from "../subnets/interface.js";
import {getDataColumnSubnets} from "../../util/dataColumns.js";
import {deserializeEnrSubnets, zeroAttnets, zeroSyncnets} from "./utils/enrSubnetsDeserialize.js";
import {IPeerRpcScoreStore, ScoreState} from "./score/index.js";
import {ColumnSubnetId} from "./peersData.js";

/** Max number of cached ENRs after discovering a good peer */
const MAX_CACHED_ENRS = 100;
Expand All @@ -30,8 +31,6 @@ export type PeerDiscoveryOpts = {
discv5: LodestarDiscv5Opts;
connectToDiscv5Bootnodes?: boolean;
// experimental flags for debugging
onlyConnectToBiggerDataNodes?: boolean;
onlyConnectToMinimalCustodyOverlapNodes?: boolean;
};

export type PeerDiscoveryModules = {
Expand Down Expand Up @@ -62,6 +61,11 @@ export enum DiscoveredPeerStatus {
no_multiaddrs = "no_multiaddrs",
}

export enum NotDialReason {
not_contain_requested_subnet_sampling_columns = "not_contain_requested_subnet_sampling_columns",
not_contain_requested_attnet_syncnet_subnets = "not_contain_requested_attnet_syncnet_subnets",
}

type UnixMs = number;
/**
* Maintain peersToConnect to avoid having too many topic peers at some point.
Expand All @@ -80,12 +84,17 @@ export type SubnetDiscvQueryMs = {
maxPeersToDiscover: number;
};

/**
* A map of column subnet id to maxPeersToDiscover
*/
type ColumnSubnetQueries = Map<ColumnSubnetId, number>;

type CachedENR = {
peerId: PeerId;
multiaddrTCP: Multiaddr;
subnets: Record<SubnetType, boolean[]>;
addedUnixMs: number;
custodySubnetCount: number;
peerCustodySubnets: number[];
};

/**
Expand All @@ -95,29 +104,24 @@ type CachedENR = {
export class PeerDiscovery {
readonly discv5: Discv5Worker;
private libp2p: Libp2p;
private nodeId: NodeId;
private sampleSubnets: number[];
private peerRpcScores: IPeerRpcScoreStore;
private metrics: NetworkCoreMetrics | null;
private logger: LoggerNode;
private config: BeaconConfig;
private cachedENRs = new Map<PeerIdStr, CachedENR>();
private peerIdToCustodySubnetCount = new Map<PeerIdStr, number>();
private randomNodeQuery: QueryStatus = {code: QueryStatusCode.NotActive};
private peersToConnect = 0;
private subnetRequests: Record<SubnetType, Map<number, SubnetRequestInfo>> = {
attnets: new Map(),
syncnets: new Map(),
};
// map of column subnet to max number of peers to connect
private columnSubnetRequests: Map<number, number>;

/** The maximum number of peers we allow (exceptions for subnet peers) */
private maxPeers: number;
private discv5StartMs: number;
private discv5FirstQueryDelayMs: number;

private connectToDiscv5BootnodesOnStart: boolean | undefined = false;
private onlyConnectToBiggerDataNodes: boolean | undefined = false;
private onlyConnectToMinimalCustodyOverlapNodes: boolean | undefined = false;

constructor(modules: PeerDiscoveryModules, opts: PeerDiscoveryOpts, discv5: Discv5Worker) {
const {libp2p, peerRpcScores, metrics, logger, config, nodeId} = modules;
Expand All @@ -127,20 +131,12 @@ export class PeerDiscovery {
this.logger = logger;
this.config = config;
this.discv5 = discv5;
this.nodeId = nodeId;
// we will only connect to peers that can provide us custody
this.sampleSubnets = getDataColumnSubnets(
nodeId,
Math.max(config.CUSTODY_REQUIREMENT, config.NODE_CUSTODY_REQUIREMENT, config.SAMPLES_PER_SLOT)
);
this.columnSubnetRequests = new Map();

this.maxPeers = opts.maxPeers;
this.discv5StartMs = 0;
this.discv5StartMs = Date.now();
this.discv5FirstQueryDelayMs = opts.discv5FirstQueryDelayMs;
this.connectToDiscv5BootnodesOnStart = opts.connectToDiscv5Bootnodes;
this.onlyConnectToBiggerDataNodes = opts.onlyConnectToBiggerDataNodes;
this.onlyConnectToMinimalCustodyOverlapNodes = opts.onlyConnectToMinimalCustodyOverlapNodes;

this.libp2p.addEventListener("peer:discovery", this.onDiscoveredPeer);
this.discv5.on("discovered", this.onDiscoveredENR);
Expand All @@ -167,6 +163,13 @@ export class PeerDiscovery {
metrics.discovery.cachedENRsSize.addCollect(() => {
metrics.discovery.cachedENRsSize.set(this.cachedENRs.size);
metrics.discovery.peersToConnect.set(this.peersToConnect);

// PeerDAS metrics
const columnSubnetsToConnect = Array.from(this.columnSubnetRequests.values());
const subnetColumnPeersToConnect = columnSubnetsToConnect.reduce((acc, elem) => acc + elem, 0);
metrics.discovery.subnetColumnPeersToConnect.set(subnetColumnPeersToConnect);
metrics.discovery.columnSubnetsToConnect.set(columnSubnetsToConnect.filter((elem) => elem > 0).length);

for (const type of [SubnetType.attnets, SubnetType.syncnets]) {
const subnetPeersToConnect = Array.from(this.subnetRequests[type].values()).reduce(
(acc, {peersToConnect}) => acc + peersToConnect,
Expand Down Expand Up @@ -200,7 +203,11 @@ export class PeerDiscovery {
/**
* Request to find peers, both on specific subnets and in general
*/
discoverPeers(peersToConnect: number, subnetRequests: SubnetDiscvQueryMs[] = []): void {
discoverPeers(
peersToConnect: number,
columnSubnetRequests: ColumnSubnetQueries,
subnetRequests: SubnetDiscvQueryMs[] = []
): void {
const subnetsToDiscoverPeers: SubnetDiscvQueryMs[] = [];
const cachedENRsToDial = new Map<PeerIdStr, CachedENR>();
// Iterate in reverse to consider first the most recent ENRs
Expand All @@ -226,15 +233,40 @@ export class PeerDiscovery {

this.peersToConnect += peersToConnect;

// starting from PeerDAS, we need to prioritize column subnet peers first in order to have stable subnet sampling
const columnSubnetsToDiscover = new Set<ColumnIndex>();
let columnPeersToDiscover = 0;
column: for (const [subnet, maxPeersToConnect] of columnSubnetRequests) {
let cachedENRsInColumn = 0;
for (const cachedENR of cachedENRsReverse) {
if (cachedENR.peerCustodySubnets.includes(subnet)) {
cachedENRsToDial.set(cachedENR.peerId.toString(), cachedENR);

if (++cachedENRsInColumn >= maxPeersToConnect) {
continue column;
}
}

const columnPeersToConnect = Math.max(maxPeersToConnect - cachedENRsInColumn, 0);
this.columnSubnetRequests.set(subnet, columnPeersToConnect);
columnSubnetsToDiscover.add(subnet);
columnPeersToDiscover += columnPeersToConnect;
}
}

subnet: for (const subnetRequest of subnetRequests) {
// Get cached ENRs from the discovery service that are in the requested `subnetId`, but not connected yet
let cachedENRsInSubnet = 0;
for (const cachedENR of cachedENRsReverse) {
if (cachedENR.subnets[subnetRequest.type][subnetRequest.subnet]) {
cachedENRsToDial.set(cachedENR.peerId.toString(), cachedENR);

if (++cachedENRsInSubnet >= subnetRequest.maxPeersToDiscover) {
continue subnet;
// only dial attnet/syncnet peers if subnet sampling peers are stable
if (columnPeersToDiscover === 0) {
for (const cachedENR of cachedENRsReverse) {
if (cachedENR.subnets[subnetRequest.type][subnetRequest.subnet]) {
cachedENRsToDial.set(cachedENR.peerId.toString(), cachedENR);

if (++cachedENRsInSubnet >= subnetRequest.maxPeersToDiscover) {
continue subnet;
}
}
}
}
Expand Down Expand Up @@ -282,6 +314,8 @@ export class PeerDiscovery {
peersToConnect,
peersAvailableToDial: cachedENRsToDial.size,
subnetsToDiscover: subnetsToDiscoverPeers.length,
columnSubnetsToDiscover: columnSubnetsToDiscover.size,
columnPeersToDiscover,
shouldRunFindRandomNodeQuery,
});
}
Expand Down Expand Up @@ -336,12 +370,9 @@ export class PeerDiscovery {

const attnets = zeroAttnets;
const syncnets = zeroSyncnets;
const custodySubnetCount = this.peerIdToCustodySubnetCount.get(id.toString());
if (custodySubnetCount === undefined) {
this.logger.warn("onDiscoveredPeer with unknown custodySubnetCount assuming 4", {peerId: id.toString()});
}
const custodySubnetCount = this.config.CUSTODY_REQUIREMENT;

const status = this.handleDiscoveredPeer(id, multiaddrs[0], attnets, syncnets, custodySubnetCount ?? 4);
const status = this.handleDiscoveredPeer(id, multiaddrs[0], attnets, syncnets, custodySubnetCount);
this.logger.debug("Discovered peer via libp2p", {peer: prettyPrintPeerId(id), status});
this.metrics?.discovery.discoveredStatus.inc({status});
};
Expand Down Expand Up @@ -376,13 +407,16 @@ export class PeerDiscovery {
// never throw and treat too long or too short bitfields as zero-ed
const attnets = attnetsBytes ? deserializeEnrSubnets(attnetsBytes, ATTESTATION_SUBNET_COUNT) : zeroAttnets;
const syncnets = syncnetsBytes ? deserializeEnrSubnets(syncnetsBytes, SYNC_COMMITTEE_SUBNET_COUNT) : zeroSyncnets;
const custodySubnetCount = custodySubnetCountBytes
? bytesToInt(custodySubnetCountBytes, "be")
: this.config.CUSTODY_REQUIREMENT;
this.peerIdToCustodySubnetCount.set(peerId.toString(), custodySubnetCount);

const status = this.handleDiscoveredPeer(peerId, multiaddrTCP, attnets, syncnets, custodySubnetCount);
this.logger.debug("Discovered peer via discv5", {peer: prettyPrintPeerId(peerId), status});
const custodySubnetCount = custodySubnetCountBytes ? bytesToInt(custodySubnetCountBytes, "be") : undefined;

const status = this.handleDiscoveredPeer(
peerId,
multiaddrTCP,
attnets,
syncnets,
custodySubnetCount ?? this.config.CUSTODY_REQUIREMENT
);
this.logger.debug("Discovered peer via discv5", {peer: prettyPrintPeerId(peerId), status, custodySubnetCount});
this.metrics?.discovery.discoveredStatus.inc({status});
};

Expand Down Expand Up @@ -424,7 +458,7 @@ export class PeerDiscovery {
multiaddrTCP,
subnets: {attnets, syncnets},
addedUnixMs: Date.now(),
custodySubnetCount,
peerCustodySubnets: getDataColumnSubnets(nodeId, custodySubnetCount),
};

// Only dial peer if necessary
Expand All @@ -445,33 +479,31 @@ export class PeerDiscovery {
}

private shouldDialPeer(peer: CachedENR): boolean {
const nodeId = computeNodeId(peer.peerId);
const peerCustodySubnetCount = peer.custodySubnetCount;
const peerCustodySubnets = getDataColumnSubnets(nodeId, peerCustodySubnetCount);

const matchingSubnetsNum = this.sampleSubnets.reduce(
(acc, elem) => acc + (peerCustodySubnets.includes(elem) ? 1 : 0),
0
);
const hasAllColumns = matchingSubnetsNum === this.sampleSubnets.length;
const hasMinCustodyMatchingColumns = matchingSubnetsNum >= Math.max(this.config.CUSTODY_REQUIREMENT);

this.logger.warn("peerCustodySubnets", {
peerId: peer.peerId.toString(),
peerNodeId: toHexString(nodeId),
hasAllColumns,
peerCustodySubnetCount,
peerCustodySubnets: peerCustodySubnets.join(" "),
sampleSubnets: this.sampleSubnets.join(" "),
nodeId: `${toHexString(this.nodeId)}`,
});
if (this.onlyConnectToBiggerDataNodes && !hasAllColumns) {
return false;
// starting from PeerDAS fork, we need to make sure we have stable subnet sampling peers first
// given CUSTODY_REQUIREMENT = 4 and 100 peers, we have 400 custody columns from peers
// with NUMBER_OF_COLUMNS = 128, we have 400 / 128 = 3.125 peers per column in average
// it would not be hard to find TARGET_SUBNET_PEERS(6) peers per SAMPLES_PER_SLOT(8) columns
// after some first heartbeats, we should have no more column requested, then go with conditions of prior forks
let hasMatchingColumn = false;
let columnRequestCount = 0;
const {peerCustodySubnets} = peer;
for (const [column, peersToConnect] of this.columnSubnetRequests.entries()) {
if (peersToConnect <= 0) {
this.columnSubnetRequests.delete(column);
} else if (peerCustodySubnets.includes(column)) {
this.columnSubnetRequests.set(column, Math.max(0, peersToConnect - 1));
hasMatchingColumn = true;
columnRequestCount += peersToConnect;
}
}
if (this.onlyConnectToMinimalCustodyOverlapNodes && !hasMinCustodyMatchingColumns) {

// if subnet sampling peers are not stable and this peer is not in the requested columns, ignore it
if (columnRequestCount > 0 && !hasMatchingColumn) {
this.metrics?.discovery.notDialReason.inc({reason: NotDialReason.not_contain_requested_subnet_sampling_columns});
return false;
}

// logics up to Deneb fork
for (const type of [SubnetType.attnets, SubnetType.syncnets]) {
for (const [subnet, {toUnixMs, peersToConnect}] of this.subnetRequests[type].entries()) {
if (toUnixMs < Date.now() || peersToConnect === 0) {
Expand All @@ -497,6 +529,7 @@ export class PeerDiscovery {
return true;
}

this.metrics?.discovery.notDialReason.inc({reason: NotDialReason.not_contain_requested_attnet_syncnet_subnets});
return false;
}

Expand Down
Loading
Loading