-
Notifications
You must be signed in to change notification settings - Fork 271
implement validator custody with column refill mechanism #7127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: column-syncer
Are you sure you want to change the base?
Changes from all commits
276b659
bb88918
7244d4b
d59af67
3dc502b
100e9c7
5097f6f
ae8ed88
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,240 @@ | ||||||||
# beacon_chain | ||||||||
# Copyright (c) 2018-2025 Status Research & Development GmbH | ||||||||
# Licensed and distributed under either of | ||||||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). | ||||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). | ||||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms. | ||||||||
|
||||||||
{.push raises: [].} | ||||||||
|
||||||||
import chronos, chronicles | ||||||||
import ssz_serialization/[proofs, types] | ||||||||
import | ||||||||
../validators/action_tracker, | ||||||||
../spec/[beaconstate, forks, network, helpers, peerdas_helpers], | ||||||||
../networking/eth2_network, | ||||||||
../consensus_object_pools/blockchain_dag, | ||||||||
../consensus_object_pools/block_dag, | ||||||||
../consensus_object_pools/data_column_quarantine, | ||||||||
"."/[request_manager, sync_manager, sync_protocol] | ||||||||
|
||||||||
from std/algorithm import sort | ||||||||
from std/sequtils import toSeq | ||||||||
from ../beacon_clock import GetBeaconTimeFn | ||||||||
|
||||||||
logScope: topics = "validator_custody" | ||||||||
|
||||||||
const | ||||||||
PARALLEL_REFILL_REQUESTS = 32 | ||||||||
|
||||||||
type | ||||||||
InhibitFn = proc: bool {.gcsafe, raises: [].} | ||||||||
|
||||||||
ValidatorCustody* = object | ||||||||
network*: Eth2Node | ||||||||
dag*: ChainDAGRef | ||||||||
supernode*: bool | ||||||||
getLocalHeadSlot*: GetSlotCallback | ||||||||
older_column_set*: HashSet[ColumnIndex] | ||||||||
newer_column_set*: HashSet[ColumnIndex] | ||||||||
global_refill_list*: HashSet[DataColumnIdentifier] | ||||||||
requested_columns*: seq[DataColumnsByRootIdentifier] | ||||||||
getBeaconTime: GetBeaconTimeFn | ||||||||
inhibit: InhibitFn | ||||||||
dataColumnQuarantine: ref DataColumnQuarantine | ||||||||
validatorCustodyLoopFuture: Future[void].Raising([CancelledError]) | ||||||||
|
||||||||
ValidatorCustodyRef* = ref ValidatorCustody | ||||||||
|
||||||||
proc init*(T: type ValidatorCustodyRef, network: Eth2Node, | ||||||||
dag: ChainDAGRef, | ||||||||
supernode: bool, | ||||||||
getLocalHeadSlotCb: GetSlotCallback, | ||||||||
older_column_set: HashSet[ColumnIndex], | ||||||||
getBeaconTime: GetBeaconTimeFn, | ||||||||
inhibit: InhibitFn, | ||||||||
dataColumnQuarantine: ref DataColumnQuarantine): ValidatorCustodyRef = | ||||||||
let localHeadSlot = getLocalHeadSlotCb | ||||||||
(ValidatorCustodyRef)( | ||||||||
network: network, | ||||||||
dag: dag, | ||||||||
supernode: supernode, | ||||||||
getLocalHeadSlot: getLocalHeadSlotCb, | ||||||||
older_column_set: older_column_set, | ||||||||
getBeaconTime: getBeaconTime, | ||||||||
inhibit: inhibit, | ||||||||
dataColumnQuarantine: dataColumnQuarantine) | ||||||||
|
||||||||
proc detectNewValidatorCustody(vcus: ValidatorCustodyRef, cache: var StateCache): seq[ColumnIndex] = | ||||||||
var | ||||||||
diff_set: HashSet[ColumnIndex] | ||||||||
withState(vcus.dag.headState): | ||||||||
when consensusFork >= ConsensusFork.Fulu: | ||||||||
let total_node_balance = | ||||||||
get_total_active_balance(forkyState.data, cache) | ||||||||
let vcustody = | ||||||||
vcus.dag.cfg.get_validators_custody_requirement(forkyState, total_node_balance) | ||||||||
|
||||||||
let | ||||||||
newer_columns = | ||||||||
vcus.dag.cfg.resolve_columns_from_custody_groups( | ||||||||
vcus.network.nodeId, | ||||||||
max(vcus.dag.cfg.SAMPLES_PER_SLOT.uint64, | ||||||||
vcustody)) | ||||||||
newer_column_set = newer_columns.toHashSet() | ||||||||
|
||||||||
# update data column quarantine custody requirements | ||||||||
vcus.dataColumnQuarantine[].custody_columns = | ||||||||
newer_columns | ||||||||
|
||||||||
# check which custody set is larger | ||||||||
if newer_column_set.len > vcus.older_column_set.len: | ||||||||
diff_set = newer_column_set.difference(vcus.older_column_set) | ||||||||
vcus.newer_column_set = newer_column_set | ||||||||
|
||||||||
toSeq(diff_set) | ||||||||
|
||||||||
proc makeRefillList(vcus: ValidatorCustodyRef, diff: seq[ColumnIndex]) = | ||||||||
let | ||||||||
slot = vcus.getLocalHeadSlot() | ||||||||
|
||||||||
let dataColumnRefillEpoch = (slot.epoch - | ||||||||
vcus.dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS - 1) | ||||||||
if slot.is_epoch() and dataColumnRefillEpoch >= vcus.dag.cfg.FULU_FORK_EPOCH: | ||||||||
var blocks: array[SLOTS_PER_EPOCH.int, BlockId] | ||||||||
let startIndex = vcus.dag.getBlockRange( | ||||||||
dataColumnRefillEpoch.start_slot, blocks.toOpenArray(0, SLOTS_PER_EPOCH - 1)) | ||||||||
for i in startIndex..<SLOTS_PER_EPOCH: | ||||||||
let blck = vcus.dag.getForkedBlock(blocks[int(i)]).valueOr: continue | ||||||||
withBlck(blck): | ||||||||
# No need to check for fork version, as this loop is triggered post Fulu | ||||||||
let entry1 = | ||||||||
DataColumnsByRootIdentifier(block_root: blocks[int(i)].root, | ||||||||
indices: DataColumnIndices.init(diff)) | ||||||||
vcus.requested_columns.add entry1 | ||||||||
for column in vcus.newer_column_set: | ||||||||
let entry2 = | ||||||||
DataColumnIdentifier(block_root: blocks[int(i)].root, | ||||||||
index: ColumnIndex(column)) | ||||||||
vcus.global_refill_list.incl(entry2) | ||||||||
|
||||||||
proc checkIntersectingCustody(vcus: ValidatorCustodyRef, | ||||||||
peer: Peer): seq[DataColumnsByRootIdentifier] = | ||||||||
var columnList: seq[DataColumnsByRootIdentifier] | ||||||||
|
||||||||
# Fetch the remote custody count | ||||||||
let remoteCustodyGroupCount = | ||||||||
peer.lookupCgcFromPeer() | ||||||||
|
||||||||
# Extract remote peer's nodeID from peerID | ||||||||
# Fetch custody columns form remote peer | ||||||||
let | ||||||||
remoteNodeId = fetchNodeIdFromPeerId(peer) | ||||||||
remoteCustodyColumns = | ||||||||
vcus.dag.cfg.resolve_columns_from_custody_groups( | ||||||||
remoteNodeId, | ||||||||
max(vcus.dag.cfg.SAMPLES_PER_SLOT.uint64, | ||||||||
remoteCustodyGroupCount)) | ||||||||
for request_item in vcus.requested_columns: | ||||||||
var colIds: seq[ColumnIndex] | ||||||||
for cindex in request_item.indices: | ||||||||
let lookup = DataColumnIdentifier(block_root: request_item.block_root, | ||||||||
index: cindex) | ||||||||
if lookup notin vcus.global_refill_list and cindex in remoteCustodyColumns: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How often is this nimbus-eth2/beacon_chain/spec/peerdas_helpers.nim Lines 73 to 75 in 77cfa78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this commit affect |
||||||||
colIds.add cindex | ||||||||
columnList.add DataColumnsByRootIdentifier(block_root: request_item.block_root, | ||||||||
indices: DataColumnIndices.init(colIds)) | ||||||||
|
||||||||
columnList | ||||||||
|
||||||||
proc refillDataColumnsFromNetwork(vcus: ValidatorCustodyRef) | ||||||||
{.async: (raises: [CancelledError]).} = | ||||||||
var peer = await vcus.network.peerPool.acquire() | ||||||||
let colIdList = vcus.checkIntersectingCustody(peer) | ||||||||
try: | ||||||||
if colIdList.len > 0: | ||||||||
debug "Requesting data columns by root for refill", peer = peer, | ||||||||
columns = shortLog(colIdList), peer_score = peer.getScore() | ||||||||
let columns = | ||||||||
await dataColumnSidecarsByRoot(peer, DataColumnsByRootIdentifierList colIdList) | ||||||||
if columns.isOk: | ||||||||
var ucolumns = columns.get().asSeq() | ||||||||
ucolumns.sort(cmpSidecarIndexes) | ||||||||
if not checkColumnResponse(colIdList, ucolumns): | ||||||||
debug "Response to columns by root is not a subset", | ||||||||
peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns) | ||||||||
peer.updateScore(PeerScoreBadResponse) | ||||||||
return | ||||||||
for col in ucolumns: | ||||||||
let | ||||||||
block_root = | ||||||||
hash_tree_root(col[].signed_block_header.message) | ||||||||
exclude = | ||||||||
DataColumnIdentifier(block_root: block_root, | ||||||||
index: col[].index) | ||||||||
vcus.global_refill_list.excl(exclude) | ||||||||
# write new columns to database, no need of BlockVerifier | ||||||||
# in this scenario as the columns historically did pass DA, | ||||||||
# and did meet the historical custody requirements | ||||||||
vcus.dag.db.putDataColumnSidecar(col[]) | ||||||||
|
||||||||
else: | ||||||||
debug "Data columns by root request not done, peer doesn't have custody column", | ||||||||
peer = peer, columns = shortLog(colIdList), err = columns.error() | ||||||||
peer.updateScore(PeerScoreNoValues) | ||||||||
|
||||||||
finally: | ||||||||
if not(isNil(peer)): | ||||||||
vcus.network.peerPool.release(peer) | ||||||||
|
||||||||
proc validatorCustodyColumnLoop( | ||||||||
vcus: ValidatorCustodyRef) {.async: (raises: [CancelledError]).} = | ||||||||
var cache = StateCache() | ||||||||
while true: | ||||||||
let diff = vcus.detectNewValidatorCustody(cache) | ||||||||
|
||||||||
await sleepAsync(POLL_INTERVAL) | ||||||||
if diff.len == 0: | ||||||||
# Validator custody same as previous interval | ||||||||
continue | ||||||||
|
||||||||
if vcus.inhibit(): | ||||||||
continue | ||||||||
|
||||||||
vcus.makeRefillList(diff) | ||||||||
|
||||||||
if vcus.global_refill_list.len != 0: | ||||||||
debug "Requesting detected missing data columns for refill", | ||||||||
columns = shortLog(vcus.requested_columns) | ||||||||
let start = SyncMoment.now(0) | ||||||||
var workers: | ||||||||
array[PARALLEL_REFILL_REQUESTS, Future[void].Raising([CancelledError])] | ||||||||
for i in 0..<PARALLEL_REFILL_REQUESTS: | ||||||||
workers[i] = vcus.refillDataColumnsFromNetwork() | ||||||||
|
||||||||
await allFutures(workers) | ||||||||
let finish = SyncMoment.now(uint64(len(vcus.global_refill_list))) | ||||||||
|
||||||||
debug "Validator custody backfill tick", | ||||||||
backfill_speed = speed(start, finish) | ||||||||
|
||||||||
else: | ||||||||
## Done with column refilling | ||||||||
## hence now advertise the updated cgc count | ||||||||
## in ENR and metadata. | ||||||||
if vcus.older_column_set.len != vcus.newer_column_set.len: | ||||||||
# Newer cgc count can also drop from previous if validators detach | ||||||||
vcus.network.loadCgcnetMetadataAndEnr(CgcCount vcus.newer_column_set.lenu64) | ||||||||
# Make the newer set older | ||||||||
vcus.older_column_set = vcus.newer_column_set | ||||||||
# Clear the newer for future validator custody detection | ||||||||
vcus.newer_column_set.clear() | ||||||||
|
||||||||
proc start*(vcus: ValidatorCustodyRef) = | ||||||||
## Start Validator Custody detection loop | ||||||||
vcus.validatorCustodyLoopFuture = vcus.validatorCustodyColumnLoop() | ||||||||
|
||||||||
proc stop*(vcus: ValidatorCustodyRef) = | ||||||||
## Stop Request Manager's loop. | ||||||||
if not(isNil(vcus.validatorCustodyLoopFuture)): | ||||||||
vcus.validatorCustodyLoopFuture.cancelSoon() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why
nimbus-eth2/beacon_chain/spec/peerdas_helpers.nim
Lines 73 to 85 in d2f2338
doesn't just always construct in-place with
incl
or similar and then return aHashSet[ColumnIndex]
rather than thisseq[ColumnIndex]
which as far as I can tell, every single user ofresolve_columns_from_custody_groups()
converts to aHashSet
before using or storing:nimbus-eth2/beacon_chain/nimbus_beacon_node.nim
Lines 425 to 428 in d2f2338
calls only the wrapper function, to get its
HashSet[ColumnIndex]
.nimbus-eth2/beacon_chain/sync/request_manager.nim
Lines 65 to 68 in d2f2338
stores a
HashSet[ColumnIndex]
, not aseq[ColumnIndex]
, which it gets from thenimbus_beacon_node
code.nimbus-eth2/beacon_chain/sync/request_manager.nim
Lines 352 to 359 in d2f2338
uses the
resolve_column_sets_from_custody_groups()
wrapper again.And this code here doesn't use
resolve_column_sets_from_custody_groups()
but does the same thing. So it seems like in generalresolve_columns_from_custody_groups()
should natively return aHashSet
and avoid the redundant copying, allocating, conversions, et cetera.At that point, one wouldn't need a separate
nimbus-eth2/beacon_chain/spec/peerdas_helpers.nim
Lines 87 to 91 in d2f2338
at all, because all those
HashSet[ColumnIndex]
users would just be able to callresolve_columns_from_custody_groups()
directly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nimbus-eth2/beacon_chain/validators/message_router.nim
Lines 191 to 200 in 566257e
doesn't use the
HashSet
but probably should, and it wouldn't particularly be more expensive if it were constructed natively rather than converted from theseq
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are looking at the wrong branch, i think there are significant use cases of
resolve_columns_from_custody_groups
, and hashset conversion is only applicable to most syncers/refillers/backfillers where lookup is more frequentThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is true -- those were mostly from the
stable
branch, so e.g.,resolve_column_sets_from_custody_groups
isn't present in eithercolumn-syncer
orvcus
, so that part isn't relevant/accurate.That said:
Looking again specifically at the current state of the
column-syncer
andvcus
branches:In
column-syncer
:Doesn't use
HashSet
, probably should, only cares interface-wise thatin
works:nimbus-eth2/beacon_chain/validators/message_router.nim
Lines 189 to 200 in 566257e
Immediately converts to
HashSet
viatoHashSet
:nimbus-eth2/beacon_chain/sync/request_manager.nim
Lines 345 to 355 in 566257e
Immediately converts to
HashSet
viatoHashSet
forRequestManager
initialization purposes, but also usesdataColumnQuarantine[].custody_columns
with theseq
:nimbus-eth2/beacon_chain/nimbus_beacon_node.nim
Lines 414 to 422 in 566257e
https://github.com/status-im/nimbus-eth2/blob/083ed100bab70e07331f0c3e9692b1b5d3eed412/beacon_chain/consensus_object_pools/data_column_quarantine.nim from
column-syncer
itself usescustody_columns
field in a few ways:nimbus-eth2/beacon_chain/consensus_object_pools/data_column_quarantine.nim
Line 87 in 083ed10
nimbus-eth2/beacon_chain/consensus_object_pools/data_column_quarantine.nim
Line 99 in 083ed10
nimbus-eth2/beacon_chain/consensus_object_pools/data_column_quarantine.nim
Line 116 in 083ed10
nimbus-eth2/beacon_chain/consensus_object_pools/data_column_quarantine.nim
Line 155 in 083ed10
nimbus-eth2/beacon_chain/consensus_object_pools/data_column_quarantine.nim
Line 160 in 083ed10
nimbus-eth2/beacon_chain/consensus_object_pools/data_column_quarantine.nim
Line 169 in 083ed10
The field is marked for export, and probably shouldn't be, because that's mainly there to let it be initialized from
nimbus_beacon_node
, but nothing reads from it later outside that module.Anyway, it does two things, iterates, in a way it's unclear that order matters (i.e.
HashSet
arbitrary ordering is ok) and checks length, either of which is fine in aHashSet
. And thisHashSet
is constructed anyway, a line later innimbus_beacon_node
.That's all the usage in
column-syncer
. On tovcus
(this PR):message_router
's usage is identical:nimbus-eth2/beacon_chain/validators/message_router.nim
Lines 189 to 200 in 3dc502b
request_manager
's usage is identical:nimbus-eth2/beacon_chain/sync/request_manager.nim
Lines 345 to 355 in 3dc502b
nimbus_beacon_node
's usage is identical:nimbus-eth2/beacon_chain/nimbus_beacon_node.nim
Lines 426 to 434 in 3dc502b
The data column quarantine usage is the same: https://github.com/status-im/nimbus-eth2/blob/083ed100bab70e07331f0c3e9692b1b5d3eed412/beacon_chain/consensus_object_pools/data_column_quarantine.nim so, iteration in a way where exact order shouldn't matter and length-checking, both easy and reasonably efficient with the
HashSet
that's constructed anyway.But
vcus
addsvalidator_custody
usage.Immediately converts via
toHashSet
:nimbus-eth2/beacon_chain/sync/validator_custody.nim
Lines 79 to 85 in 3dc502b
Doesn't use a
HashSet
, but probably should:nimbus-eth2/beacon_chain/sync/validator_custody.nim
Lines 133 to 146 in 3dc502b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deliberately putting this in a separate comment:
nimbus-eth2/beacon_chain/spec/peerdas_helpers.nim
Lines 119 to 131 in 083ed10
could just as easily directly construct the
HashSet
that most of those use case either already use or should use, and never construct theseq
at all. Then no repeatedly converting to it, no issues with some places using theseq
(and incurring O(n) search times) and some places not, et cetera.The data column quarantine itself doesn't really benefit, but it's not hurt either that I can tell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://nim-lang.org/docs/packedsets.html provides another alternative, I'm not that familiar with its performance tradeoffs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be tackled in the
column-syncer
branch with a subsequent rebaseThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok