Skip to content

implement validator custody with column refill mechanism #7127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: column-syncer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import
./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients,
./spec/signatures_batch,
./sync/[sync_manager, request_manager, sync_types],
./sync/[sync_manager, request_manager, sync_types, validator_custody],
./validators/[
action_tracker, message_router, validator_monitor, validator_pool,
keystore_management],
Expand Down Expand Up @@ -95,6 +95,7 @@ type
eventBus*: EventBus
vcProcess*: Process
requestManager*: RequestManager
validatorCustody*: ValidatorCustodyRef
syncManager*: SyncManager[Peer, PeerId]
backfiller*: SyncManager[Peer, PeerId]
untrustedManager*: SyncManager[Peer, PeerId]
Expand Down
10 changes: 9 additions & 1 deletion beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import
./spec/[
deposit_snapshots, engine_authentication, weak_subjectivity,
peerdas_helpers],
./sync/[sync_protocol, light_client_protocol, sync_overseer],
./sync/[sync_protocol, light_client_protocol, sync_overseer, validator_custody],
./validators/[keystore_management, beacon_validators],
"."/[
beacon_node, beacon_node_light_client, deposits,
Expand Down Expand Up @@ -585,6 +585,9 @@ proc initFullNode(
dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader)
validatorCustody = ValidatorCustodyRef.init(node.network, dag, supernode,
getLocalHeadSlot, custody_columns_set, getBeaconTime,
(proc(): bool = syncManager.inProgress), dataColumnQuarantine)

# As per EIP 7594, the BN is now categorised into a
# `Fullnode` and a `Supernode`, the fullnodes custodies a
Expand Down Expand Up @@ -645,6 +648,7 @@ proc initFullNode(
node.blockProcessor = blockProcessor
node.consensusManager = consensusManager
node.requestManager = requestManager
node.validatorCustody = validatorCustody
node.syncManager = syncManager
node.backfiller = backfiller
node.untrustedManager = untrustedManager
Expand Down Expand Up @@ -1322,8 +1326,11 @@ func getSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits =
subnets + node.getNextSyncCommitteeSubnets(epoch)

func readCustodyGroupSubnets(node: BeaconNode): uint64 =
let vcus_count = node.dataColumnQuarantine.custody_columns.lenu64
if node.config.peerdasSupernode:
node.dag.cfg.NUMBER_OF_CUSTODY_GROUPS.uint64
elif vcus_count > node.dag.cfg.CUSTODY_REQUIREMENT.uint64:
vcus_count
else:
node.dag.cfg.CUSTODY_REQUIREMENT.uint64

Expand Down Expand Up @@ -2277,6 +2284,7 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} =
if node.network.getBeaconTime().slotOrZero.epoch >=
node.network.cfg.FULU_FORK_EPOCH:
node.requestManager.switchToColumnLoop()
node.validatorCustody.start()
node.syncOverseer.start()

waitFor node.updateGossipStatus(wallSlot)
Expand Down
10 changes: 4 additions & 6 deletions beacon_chain/spec/peerdas_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,11 @@ proc verify_data_column_sidecar_kzg_proofs*(sidecar: DataColumnSidecar):

ok()

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.3/specs/fulu/das-core.md#validator-custody
func get_validators_custody_requirement*(cfg: RuntimeConfig, state: fulu.BeaconState,
validator_indices: openArray[ValidatorIndex]):
# https://github.com/ethereum/consensus-specs/blob/v1.5.0/specs/fulu/validator.md#validator-custody
func get_validators_custody_requirement*(cfg: RuntimeConfig,
hstate: ForkyHashedBeaconState,
total_node_balance: Gwei):
uint64 =
var total_node_balance: Gwei
for index in validator_indices:
total_node_balance += state.balances[index]
let count = total_node_balance div BALANCE_PER_ADDITIONAL_CUSTODY_GROUP
min(max(count.uint64, cfg.VALIDATOR_CUSTODY_REQUIREMENT),
cfg.NUMBER_OF_CUSTODY_GROUPS.uint64)
6 changes: 3 additions & 3 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const
DATA_COLUMN_GOSSIP_WAIT_TIME_NS = 2 * 1_000_000_000
## How long to wait for blobs to arri ve over gossip before fetching.

POLL_INTERVAL = 1.seconds
POLL_INTERVAL* = 1.seconds

type
BlockVerifierFn = proc(
Expand Down Expand Up @@ -169,7 +169,7 @@ func checkResponseSubset(idList: seq[BlobIdentifier],
return false
true

func checkColumnResponse(idList: seq[DataColumnsByRootIdentifier],
func checkColumnResponse*(idList: seq[DataColumnsByRootIdentifier],
columns: openArray[ref DataColumnSidecar]): bool =
for colresp in columns:
let block_root =
Expand Down Expand Up @@ -260,7 +260,7 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async:
if not(isNil(peer)):
rman.network.peerPool.release(peer)

func cmpSidecarIndexes(x, y: ref BlobSidecar | ref DataColumnSidecar): int =
func cmpSidecarIndexes*(x, y: ref BlobSidecar | ref DataColumnSidecar): int =
cmp(x[].index, y[].index)

proc fetchBlobsFromNetwork(self: RequestManager,
Expand Down
240 changes: 240 additions & 0 deletions beacon_chain/sync/validator_custody.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
# beacon_chain
# Copyright (c) 2018-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import chronos, chronicles
import ssz_serialization/[proofs, types]
import
../validators/action_tracker,
../spec/[beaconstate, forks, network, helpers, peerdas_helpers],
../networking/eth2_network,
../consensus_object_pools/blockchain_dag,
../consensus_object_pools/block_dag,
../consensus_object_pools/data_column_quarantine,
"."/[request_manager, sync_manager, sync_protocol]

from std/algorithm import sort
from std/sequtils import toSeq
from ../beacon_clock import GetBeaconTimeFn

logScope: topics = "validator_custody"

const
PARALLEL_REFILL_REQUESTS = 32

type
InhibitFn = proc: bool {.gcsafe, raises: [].}

ValidatorCustody* = object
network*: Eth2Node
dag*: ChainDAGRef
supernode*: bool
getLocalHeadSlot*: GetSlotCallback
older_column_set*: HashSet[ColumnIndex]
newer_column_set*: HashSet[ColumnIndex]
global_refill_list*: HashSet[DataColumnIdentifier]
requested_columns*: seq[DataColumnsByRootIdentifier]
getBeaconTime: GetBeaconTimeFn
inhibit: InhibitFn
dataColumnQuarantine: ref DataColumnQuarantine
validatorCustodyLoopFuture: Future[void].Raising([CancelledError])

ValidatorCustodyRef* = ref ValidatorCustody

proc init*(T: type ValidatorCustodyRef, network: Eth2Node,
dag: ChainDAGRef,
supernode: bool,
getLocalHeadSlotCb: GetSlotCallback,
older_column_set: HashSet[ColumnIndex],
getBeaconTime: GetBeaconTimeFn,
inhibit: InhibitFn,
dataColumnQuarantine: ref DataColumnQuarantine): ValidatorCustodyRef =
let localHeadSlot = getLocalHeadSlotCb
(ValidatorCustodyRef)(
network: network,
dag: dag,
supernode: supernode,
getLocalHeadSlot: getLocalHeadSlotCb,
older_column_set: older_column_set,
getBeaconTime: getBeaconTime,
inhibit: inhibit,
dataColumnQuarantine: dataColumnQuarantine)

proc detectNewValidatorCustody(vcus: ValidatorCustodyRef, cache: var StateCache): seq[ColumnIndex] =
var
diff_set: HashSet[ColumnIndex]
withState(vcus.dag.headState):
when consensusFork >= ConsensusFork.Fulu:
let total_node_balance =
get_total_active_balance(forkyState.data, cache)
let vcustody =
vcus.dag.cfg.get_validators_custody_requirement(forkyState, total_node_balance)

let
newer_columns =
vcus.dag.cfg.resolve_columns_from_custody_groups(
vcus.network.nodeId,
max(vcus.dag.cfg.SAMPLES_PER_SLOT.uint64,
vcustody))
newer_column_set = newer_columns.toHashSet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why

func resolve_columns_from_custody_groups*(node_id: NodeId,
custody_group_count: CustodyIndex):
seq[ColumnIndex] =
let
custody_groups = node_id.get_custody_groups(custody_group_count)
var flattened =
newSeqOfCap[ColumnIndex](COLUMNS_PER_GROUP * custody_groups.len)
for group in custody_groups:
for index in compute_columns_for_custody_group(group):
flattened.add index
flattened

doesn't just always construct in-place with incl or similar and then return a HashSet[ColumnIndex] rather than this seq[ColumnIndex] which as far as I can tell, every single user of resolve_columns_from_custody_groups() converts to a HashSet before using or storing:

custody_columns_set =
node.network.nodeId.resolve_column_sets_from_custody_groups(
max(SAMPLES_PER_SLOT.uint64,
localCustodyGroups))

calls only the wrapper function, to get its HashSet[ColumnIndex].

RequestManager* = object
network*: Eth2Node
supernode*: bool
custody_columns_set: HashSet[ColumnIndex]

stores a HashSet[ColumnIndex], not a seq[ColumnIndex], which it gets from the nimbus_beacon_node code.

remoteCustodyColumns =
remoteNodeId.resolve_column_sets_from_custody_groups(
max(SAMPLES_PER_SLOT.uint64,
remoteCustodyGroupCount))
for local_column in rman.custody_columns_set:
if local_column notin remoteCustodyColumns:
return false

uses the resolve_column_sets_from_custody_groups() wrapper again.

And this code here doesn't use resolve_column_sets_from_custody_groups() but does the same thing. So it seems like in general resolve_columns_from_custody_groups() should natively return a HashSet and avoid the redundant copying, allocating, conversions, et cetera.

At that point, one wouldn't need a separate

func resolve_column_sets_from_custody_groups*(node_id: NodeId,
custody_group_count: CustodyIndex):
HashSet[ColumnIndex] =
node_id.resolve_columns_from_custody_groups(custody_group_count).toHashSet()

at all, because all those HashSet[ColumnIndex] users would just be able to call resolve_columns_from_custody_groups() directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

custody_columns =
router[].network.cfg.resolve_columns_from_custody_groups(
router[].network.node_id,
max(SAMPLES_PER_SLOT.uint64,
metadata))
var final_columns: seq[DataColumnSidecar]
for dc in dataColumns:
if dc.index in custody_columns:
final_columns.add dc

doesn't use the HashSet but probably should, and it wouldn't particularly be more expensive if it were constructed natively rather than converted from the seq.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are looking at the wrong branch, i think there are significant use cases of resolve_columns_from_custody_groups, and hashset conversion is only applicable to most syncers/refillers/backfillers where lookup is more frequent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are looking at the wrong branch

Yes, this is true -- those were mostly from the stable branch, so e.g., resolve_column_sets_from_custody_groups isn't present in either column-syncer or vcus, so that part isn't relevant/accurate.

That said:

i think there are significant use cases of resolve_columns_from_custody_groups, and hashset conversion is only applicable to most syncers/refillers/backfillers where lookup is more frequent

Looking again specifically at the current state of the column-syncer and vcus branches:

In column-syncer:

Doesn't use HashSet, probably should, only cares interface-wise that in works:

let
metadata = router[].network.metadata.custody_group_count
custody_columns =
router[].network.cfg.resolve_columns_from_custody_groups(
router[].network.node_id,
max(SAMPLES_PER_SLOT.uint64,
metadata))
var final_columns: seq[DataColumnSidecar]
for dc in dataColumns:
if dc.index in custody_columns:
final_columns.add dc

Immediately converts to HashSet via toHashSet:

let
remoteNodeId = fetchNodeIdFromPeerId(peer)
remoteCustodyColumns =
rman.cfg.resolve_columns_from_custody_groups(
remoteNodeId,
max(rman.cfg.SAMPLES_PER_SLOT.uint64,
remoteCustodyGroupCount))
remoteSet = remoteCustodyColumns.toHashSet()
for local_column in rman.custody_columns_set:
if local_column notin remoteSet:
return false

Immediately converts to HashSet via toHashSet for RequestManager initialization purposes, but also uses dataColumnQuarantine[].custody_columns with the seq:

dataColumnQuarantine[].custody_columns =
dag.cfg.resolve_columns_from_custody_groups(
node.network.nodeId,
max(dag.cfg.SAMPLES_PER_SLOT.uint64,
localCustodyGroups))
let
custody_columns_set =
dataColumnQuarantine[].custody_columns.toHashSet()

https://github.com/status-im/nimbus-eth2/blob/083ed100bab70e07331f0c3e9692b1b5d3eed412/beacon_chain/consensus_object_pools/data_column_quarantine.nim from column-syncer itself uses custody_columns field in a few ways:

for col_idx in quarantine.custody_columns:


for idx in quarantine.custody_columns:

if collectedColumns.len >= (quarantine.custody_columns.len div 2):

if collectedColumns.len == quarantine.custody_columns.len:

The field is marked for export, and probably shouldn't be, because that's mainly there to let it be initialized from nimbus_beacon_node, but nothing reads from it later outside that module.

Anyway, it does two things, iterates, in a way it's unclear that order matters (i.e. HashSet arbitrary ordering is ok) and checks length, either of which is fine in a HashSet. And this HashSet is constructed anyway, a line later in nimbus_beacon_node.

That's all the usage in column-syncer. On to vcus (this PR):

message_router's usage is identical:

let
metadata = router[].network.metadata.custody_group_count
custody_columns =
router[].network.cfg.resolve_columns_from_custody_groups(
router[].network.node_id,
max(SAMPLES_PER_SLOT.uint64,
metadata))
var final_columns: seq[DataColumnSidecar]
for dc in dataColumns:
if dc.index in custody_columns:
final_columns.add dc

request_manager's usage is identical:

let
remoteNodeId = fetchNodeIdFromPeerId(peer)
remoteCustodyColumns =
rman.cfg.resolve_columns_from_custody_groups(
remoteNodeId,
max(rman.cfg.SAMPLES_PER_SLOT.uint64,
remoteCustodyGroupCount))
remoteSet = remoteCustodyColumns.toHashSet()
for local_column in rman.custody_columns_set:
if local_column notin remoteSet:
return false

nimbus_beacon_node's usage is identical:

dataColumnQuarantine[].custody_columns =
dag.cfg.resolve_columns_from_custody_groups(
node.network.nodeId,
max(dag.cfg.SAMPLES_PER_SLOT.uint64,
localCustodyGroups))
let
custody_columns_set =
dataColumnQuarantine[].custody_columns.toHashSet()

The data column quarantine usage is the same: https://github.com/status-im/nimbus-eth2/blob/083ed100bab70e07331f0c3e9692b1b5d3eed412/beacon_chain/consensus_object_pools/data_column_quarantine.nim so, iteration in a way where exact order shouldn't matter and length-checking, both easy and reasonably efficient with the HashSet that's constructed anyway.

But vcus adds validator_custody usage.

Immediately converts via toHashSet:

let
newer_columns =
vcus.dag.cfg.resolve_columns_from_custody_groups(
vcus.network.nodeId,
max(vcus.dag.cfg.SAMPLES_PER_SLOT.uint64,
vcustody))
newer_column_set = newer_columns.toHashSet()

Doesn't use a HashSet, but probably should:

let
remoteNodeId = fetchNodeIdFromPeerId(peer)
remoteCustodyColumns =
vcus.dag.cfg.resolve_columns_from_custody_groups(
remoteNodeId,
max(vcus.dag.cfg.SAMPLES_PER_SLOT.uint64,
remoteCustodyGroupCount))
for request_item in vcus.requested_columns:
var colIds: seq[ColumnIndex]
for cindex in request_item.indices:
let lookup = DataColumnIdentifier(block_root: request_item.block_root,
index: cindex)
if lookup notin vcus.global_refill_list and cindex in remoteCustodyColumns:
colIds.add cindex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deliberately putting this in a separate comment:

func resolve_columns_from_custody_groups*(cfg: RuntimeConfig, node_id: NodeId,
custody_group_count: CustodyIndex):
seq[ColumnIndex] =
let
custody_groups = node_id.get_custody_groups(custody_group_count)
var flattened =
newSeqOfCap[ColumnIndex](COLUMNS_PER_GROUP * custody_groups.len)
for group in custody_groups:
for index in compute_columns_for_custody_group(cfg, group):
flattened.add index
flattened

could just as easily directly construct the HashSet that most of those use case either already use or should use, and never construct the seq at all. Then no repeatedly converting to it, no issues with some places using the seq (and incurring O(n) search times) and some places not, et cetera.

The data column quarantine itself doesn't really benefit, but it's not hurt either that I can tell.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://nim-lang.org/docs/packedsets.html provides another alternative, I'm not that familiar with its performance tradeoffs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be tackled in the column-syncer branch with a subsequent rebase

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


# update data column quarantine custody requirements
vcus.dataColumnQuarantine[].custody_columns =
newer_columns

# check which custody set is larger
if newer_column_set.len > vcus.older_column_set.len:
diff_set = newer_column_set.difference(vcus.older_column_set)
vcus.newer_column_set = newer_column_set

toSeq(diff_set)

proc makeRefillList(vcus: ValidatorCustodyRef, diff: seq[ColumnIndex]) =
let
slot = vcus.getLocalHeadSlot()

let dataColumnRefillEpoch = (slot.epoch -
vcus.dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS - 1)
if slot.is_epoch() and dataColumnRefillEpoch >= vcus.dag.cfg.FULU_FORK_EPOCH:
var blocks: array[SLOTS_PER_EPOCH.int, BlockId]
let startIndex = vcus.dag.getBlockRange(
dataColumnRefillEpoch.start_slot, blocks.toOpenArray(0, SLOTS_PER_EPOCH - 1))
for i in startIndex..<SLOTS_PER_EPOCH:
let blck = vcus.dag.getForkedBlock(blocks[int(i)]).valueOr: continue
withBlck(blck):
# No need to check for fork version, as this loop is triggered post Fulu
let entry1 =
DataColumnsByRootIdentifier(block_root: blocks[int(i)].root,
indices: DataColumnIndices.init(diff))
vcus.requested_columns.add entry1
for column in vcus.newer_column_set:
let entry2 =
DataColumnIdentifier(block_root: blocks[int(i)].root,
index: ColumnIndex(column))
vcus.global_refill_list.incl(entry2)

proc checkIntersectingCustody(vcus: ValidatorCustodyRef,
peer: Peer): seq[DataColumnsByRootIdentifier] =
var columnList: seq[DataColumnsByRootIdentifier]

# Fetch the remote custody count
let remoteCustodyGroupCount =
peer.lookupCgcFromPeer()

# Extract remote peer's nodeID from peerID
# Fetch custody columns form remote peer
let
remoteNodeId = fetchNodeIdFromPeerId(peer)
remoteCustodyColumns =
vcus.dag.cfg.resolve_columns_from_custody_groups(
remoteNodeId,
max(vcus.dag.cfg.SAMPLES_PER_SLOT.uint64,
remoteCustodyGroupCount))
for request_item in vcus.requested_columns:
var colIds: seq[ColumnIndex]
for cindex in request_item.indices:
let lookup = DataColumnIdentifier(block_root: request_item.block_root,
index: cindex)
if lookup notin vcus.global_refill_list and cindex in remoteCustodyColumns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often is this cindex in RemoteCustodyColumns expected to occur? Each one is an O(n) lookup, in a for cindex in request_items.indices loop, due to

func resolve_columns_from_custody_groups*(node_id: NodeId,
custody_group_count: CustodyIndex):
seq[ColumnIndex] =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3dc502b

How does this commit affect checkIntersectingCustody()?

colIds.add cindex
columnList.add DataColumnsByRootIdentifier(block_root: request_item.block_root,
indices: DataColumnIndices.init(colIds))

columnList

proc refillDataColumnsFromNetwork(vcus: ValidatorCustodyRef)
{.async: (raises: [CancelledError]).} =
var peer = await vcus.network.peerPool.acquire()
let colIdList = vcus.checkIntersectingCustody(peer)
try:
if colIdList.len > 0:
debug "Requesting data columns by root for refill", peer = peer,
columns = shortLog(colIdList), peer_score = peer.getScore()
let columns =
await dataColumnSidecarsByRoot(peer, DataColumnsByRootIdentifierList colIdList)
if columns.isOk:
var ucolumns = columns.get().asSeq()
ucolumns.sort(cmpSidecarIndexes)
if not checkColumnResponse(colIdList, ucolumns):
debug "Response to columns by root is not a subset",
peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns)
peer.updateScore(PeerScoreBadResponse)
return
for col in ucolumns:
let
block_root =
hash_tree_root(col[].signed_block_header.message)
exclude =
DataColumnIdentifier(block_root: block_root,
index: col[].index)
vcus.global_refill_list.excl(exclude)
# write new columns to database, no need of BlockVerifier
# in this scenario as the columns historically did pass DA,
# and did meet the historical custody requirements
vcus.dag.db.putDataColumnSidecar(col[])

else:
debug "Data columns by root request not done, peer doesn't have custody column",
peer = peer, columns = shortLog(colIdList), err = columns.error()
peer.updateScore(PeerScoreNoValues)

finally:
if not(isNil(peer)):
vcus.network.peerPool.release(peer)

proc validatorCustodyColumnLoop(
vcus: ValidatorCustodyRef) {.async: (raises: [CancelledError]).} =
var cache = StateCache()
while true:
let diff = vcus.detectNewValidatorCustody(cache)

await sleepAsync(POLL_INTERVAL)
if diff.len == 0:
# Validator custody same as previous interval
continue

if vcus.inhibit():
continue

vcus.makeRefillList(diff)

if vcus.global_refill_list.len != 0:
debug "Requesting detected missing data columns for refill",
columns = shortLog(vcus.requested_columns)
let start = SyncMoment.now(0)
var workers:
array[PARALLEL_REFILL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0..<PARALLEL_REFILL_REQUESTS:
workers[i] = vcus.refillDataColumnsFromNetwork()

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(vcus.global_refill_list)))

debug "Validator custody backfill tick",
backfill_speed = speed(start, finish)

else:
## Done with column refilling
## hence now advertise the updated cgc count
## in ENR and metadata.
if vcus.older_column_set.len != vcus.newer_column_set.len:
# Newer cgc count can also drop from previous if validators detach
vcus.network.loadCgcnetMetadataAndEnr(CgcCount vcus.newer_column_set.lenu64)
# Make the newer set older
vcus.older_column_set = vcus.newer_column_set
# Clear the newer for future validator custody detection
vcus.newer_column_set.clear()

proc start*(vcus: ValidatorCustodyRef) =
## Start Validator Custody detection loop
vcus.validatorCustodyLoopFuture = vcus.validatorCustodyColumnLoop()

proc stop*(vcus: ValidatorCustodyRef) =
## Stop Request Manager's loop.
if not(isNil(vcus.validatorCustodyLoopFuture)):
vcus.validatorCustodyLoopFuture.cancelSoon()