Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0f45ec5
feat: separate performance collection and distribution
vgorkavenko Oct 17, 2025
962a076
Merge remote-tracking branch 'origin/csm-next' into feat/performance-…
vgorkavenko Oct 21, 2025
453effb
fix: lock
vgorkavenko Oct 21, 2025
485fe91
fix: black
vgorkavenko Oct 21, 2025
d9e5888
Merge remote-tracking branch 'origin/csm-next' into feat/performance-…
vgorkavenko Oct 22, 2025
8d86ca7
fix: already processed epochs
vgorkavenko Oct 22, 2025
9e2c951
feat: ChainConverter
vgorkavenko Oct 23, 2025
6e95c01
refactor: types
vgorkavenko Oct 27, 2025
46decb1
feat: better logging
vgorkavenko Oct 27, 2025
39d50d2
feat: additional validation
vgorkavenko Oct 27, 2025
d689979
refactor: db
vgorkavenko Oct 28, 2025
fa5ed3c
feat: `epochs_demand`
vgorkavenko Oct 29, 2025
a5d07f7
fix: `missing_epochs_in`
vgorkavenko Nov 4, 2025
1653943
fix: logic, logging
vgorkavenko Nov 4, 2025
aa871b3
fix: csm.execute_module
vgorkavenko Nov 4, 2025
2a70199
feat: add `_post` for http_provider
vgorkavenko Nov 4, 2025
db10267
fix: SafeBorder SafeBorder inheritance issue
vgorkavenko Nov 4, 2025
0925e5f
fix: remove TODOs
vgorkavenko Nov 4, 2025
84b2e47
feat: add `PERFORMANCE_COLLECTOR_DB_CONNECTION_TIMEOUT`
vgorkavenko Nov 4, 2025
b08865a
feat: use finalized epoch if no demands
vgorkavenko Nov 7, 2025
baf8c83
fix: `validate_state`
vgorkavenko Nov 10, 2025
e72cff2
WIP: `test_csm_module.py`
vgorkavenko Nov 10, 2025
1d2fb60
WIP: `test_csm_module.py`
vgorkavenko Nov 10, 2025
e9758b0
fix: linter
vgorkavenko Nov 10, 2025
f932b99
fix: `define_epochs_to_process_range`
vgorkavenko Nov 11, 2025
13330ea
fix: `define_epochs_to_process_range`. Simple AI tests
vgorkavenko Nov 11, 2025
84ff68e
fix: remove `DEFAULT_EPOCHS_STEP_TO_COLLECT`
vgorkavenko Nov 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
343 changes: 318 additions & 25 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ oz-merkle-tree = { git = "https://github.com/lidofinance/oz-merkle-tree", rev =
multiformats = "^0.3.1"
protobuf="^6.31.1"
dag-cbor="^0.3.3"
flask = "^3.0.0"
waitress = "^3.0.2"
pyroaring = "^1.0.3"

[tool.poetry.group.dev.dependencies]
base58 = "^2.1.1"
Expand Down
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#time-parameters
SLOTS_PER_HISTORICAL_ROOT = 2**13 # 8192
# https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/beacon-chain.md#sync-committee
SYNC_COMMITTEE_SIZE = 512
EPOCHS_PER_SYNC_COMMITTEE_PERIOD = 256
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#domain-types
DOMAIN_DEPOSIT_TYPE = bytes.fromhex("03000000") # 0x03000000
Expand Down
13 changes: 11 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from src.modules.csm.csm import CSOracle
from src.modules.ejector.ejector import Ejector
from src.providers.ipfs import IPFSProvider, Kubo, LidoIPFS, Pinata, Storacha
from src.modules.performance_collector.performance_collector import PerformanceCollector
from src.types import OracleModule
from src.utils.build import get_build_info
from src.utils.exception import IncompatibleException
Expand All @@ -29,6 +30,7 @@
LidoValidatorsProvider,
TransactionUtils,
)
from src.web3py.extensions.performance import PerformanceClientModule
from src.web3py.types import Web3
from decimal import getcontext

Expand Down Expand Up @@ -75,6 +77,9 @@ def main(module_name: OracleModule):
logger.info({'msg': 'Initialize IPFS providers.'})
ipfs = IPFS(web3, ipfs_providers(), retries=variables.HTTP_REQUEST_RETRY_COUNT_IPFS)

logger.info({'msg': 'Initialize Performance Collector client.'})
performance = PerformanceClientModule(variables.PERFORMANCE_COLLECTOR_URI)

logger.info({'msg': 'Check configured providers.'})
if Version(kac.get_status().appVersion) < constants.ALLOWED_KAPI_VERSION:
raise IncompatibleException(f'Incompatible KAPI version. Required >= {constants.ALLOWED_KAPI_VERSION}.')
Expand All @@ -89,12 +94,13 @@ def main(module_name: OracleModule):
'cc': lambda: cc, # type: ignore[dict-item]
'kac': lambda: kac, # type: ignore[dict-item]
'ipfs': lambda: ipfs, # type: ignore[dict-item]
'performance': lambda: performance, # type: ignore[dict-item]
})

logger.info({'msg': 'Initialize prometheus metrics.'})
init_metrics()

instance: Accounting | Ejector | CSOracle
instance: Accounting | Ejector | CSOracle | PerformanceCollector
if module_name == OracleModule.ACCOUNTING:
logger.info({'msg': 'Initialize Accounting module.'})
instance = Accounting(web3)
Expand All @@ -104,10 +110,13 @@ def main(module_name: OracleModule):
elif module_name == OracleModule.CSM:
logger.info({'msg': 'Initialize CSM performance oracle module.'})
instance = CSOracle(web3)
elif module_name == OracleModule.PERFORMANCE_COLLECTOR:
instance = PerformanceCollector(web3)
else:
raise ValueError(f'Unexpected arg: {module_name=}.')

instance.check_contract_configs()
if module_name != OracleModule.PERFORMANCE_COLLECTOR:
instance.check_contract_configs()

if variables.DAEMON:
instance.run_as_daemon()
Expand Down
8 changes: 8 additions & 0 deletions src/metrics/prometheus/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ class Status(Enum):
buckets=requests_buckets,
)

PERFORMANCE_REQUESTS_DURATION = Histogram(
'performance_requests_duration',
'Duration of requests to Performance Collector API',
['endpoint', 'code', 'domain'],
namespace=PROMETHEUS_PREFIX,
buckets=requests_buckets,
)

KEYS_API_REQUESTS_DURATION = Histogram(
'keys_api_requests_duration',
'Duration of requests to Keys API',
Expand Down
202 changes: 126 additions & 76 deletions src/modules/csm/csm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,7 @@
CSM_CURRENT_FRAME_RANGE_R_EPOCH,
)
from src.metrics.prometheus.duration_meter import duration_meter
from src.modules.csm.checkpoint import (
FrameCheckpointProcessor,
FrameCheckpointsIterator,
MinStepIsNotReached,
)
from src.modules.csm.distribution import (
Distribution,
DistributionResult,
StrikesValidator,
)
from src.modules.csm.distribution import Distribution, DistributionResult, StrikesValidator
from src.modules.csm.helpers.last_report import LastReport
from src.modules.csm.log import FramePerfLog
from src.modules.csm.state import State
Expand All @@ -35,8 +26,11 @@
EpochNumber,
ReferenceBlockStamp,
SlotNumber,
ValidatorIndex,
)
from src.utils.cache import global_lru_cache as lru_cache
from src.utils.range import sequence
from src.utils.validator_state import is_active_validator
from src.utils.web3converter import Web3Converter
from src.web3py.extensions.lido_validators import NodeOperatorId
from src.web3py.types import Web3
Expand Down Expand Up @@ -77,20 +71,63 @@ def execute_module(self, last_finalized_blockstamp: BlockStamp) -> ModuleExecute
if not self._check_compatability(last_finalized_blockstamp):
return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH

collected = self.collect_data(last_finalized_blockstamp)
if not collected:
logger.info(
{"msg": "Data required for the report is not fully collected yet. Waiting for the next finalized epoch"}
)
return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH
self.set_epochs_range_to_collect(last_finalized_blockstamp)

report_blockstamp = self.get_blockstamp_for_report(last_finalized_blockstamp)
if not report_blockstamp:
return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH

collected = self.collect_data()
if not collected:
return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH

self.process_report(report_blockstamp)
return ModuleExecuteDelay.NEXT_SLOT

@duration_meter()
def set_epochs_range_to_collect(self, blockstamp: BlockStamp):
consumer = self.__class__.__name__
converter = self.converter(blockstamp)

l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp)
self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame)
self.state.log_progress()

current_demands = self.w3.performance.get_epochs_demand()
current_demand = current_demands.get(consumer)
if current_demand != (l_epoch, r_epoch):
logger.info({
"msg": f"Updating {consumer} epochs demand for Performance Collector",
"old": current_demand,
"new": (l_epoch, r_epoch)
})
self.w3.performance.post_epochs_demand(consumer, l_epoch, r_epoch)

@duration_meter()
def collect_data(self) -> bool:
logger.info({"msg": "Collecting data for the report from Performance Collector"})

if not self.state.is_fulfilled:
for l_epoch, r_epoch in self.state.frames:
is_data_range_available = self.w3.performance.is_range_available(
l_epoch, r_epoch
)
if not is_data_range_available:
logger.warning({
"msg": "Performance data range is not available yet",
"start_epoch": l_epoch,
"end_epoch": r_epoch
})
return False
logger.info({
"msg": "Performance data range is available",
"start_epoch": l_epoch,
"end_epoch": r_epoch
})
self.fulfill_state()

return self.state.is_fulfilled

@lru_cache(maxsize=1)
@duration_meter()
def build_report(self, blockstamp: ReferenceBlockStamp) -> tuple:
Expand Down Expand Up @@ -162,66 +199,73 @@ def validate_state(self, blockstamp: ReferenceBlockStamp) -> None:

self.state.validate(l_epoch, r_epoch)

def collect_data(self, blockstamp: BlockStamp) -> bool:
"""Ongoing report data collection for the estimated reference slot"""

logger.info({"msg": "Collecting data for the report"})

converter = self.converter(blockstamp)

l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp)
logger.info({"msg": f"Epochs range for performance data collect: [{l_epoch};{r_epoch}]"})

# NOTE: Finalized slot is the first slot of justifying epoch, so we need to take the previous. But if the first
# slot of the justifying epoch is empty, blockstamp.slot_number will point to the slot where the last finalized
# block was created. As a result, finalized_epoch in this case will be less than the actual number of the last
# finalized epoch. As a result we can have a delay in frame finalization.
finalized_epoch = EpochNumber(converter.get_epoch_by_slot(blockstamp.slot_number) - 1)

report_blockstamp = self.get_blockstamp_for_report(blockstamp)

if not report_blockstamp:
logger.info({"msg": "No report blockstamp available, using pre-computed one for collecting data"})

if report_blockstamp and report_blockstamp.ref_epoch != r_epoch:
logger.warning(
{
"msg": f"Epochs range has been changed, but the change is not yet observed on finalized epoch {finalized_epoch}"
}
)
return False

if l_epoch > finalized_epoch:
logger.info({"msg": "The starting epoch of the epochs range is not finalized yet"})
return False

self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame)
self.state.log_progress()

if self.state.is_fulfilled:
logger.info({"msg": "All epochs are already processed. Nothing to collect"})
return True

try:
checkpoints = FrameCheckpointsIterator(
converter,
min(self.state.unprocessed_epochs),
r_epoch,
finalized_epoch,
)
except MinStepIsNotReached:
return False

processor = FrameCheckpointProcessor(self.w3.cc, self.state, converter, blockstamp)

for checkpoint in checkpoints:
if self.get_epochs_range_to_process(self._receive_last_finalized_slot()) != (l_epoch, r_epoch):
logger.info({"msg": "Checkpoints were prepared for an outdated epochs range, stop processing"})
raise ValueError("Outdated checkpoint")
processor.exec(checkpoint)
# Reset BaseOracle cycle timeout to avoid timeout errors during long checkpoints processing
self._reset_cycle_timeout()
return self.state.is_fulfilled
def fulfill_state(self):
finalized_blockstamp = self._receive_last_finalized_slot()
validators = self.w3.cc.get_validators(finalized_blockstamp)

logger.info({
"msg": "Starting state fulfillment",
"total_frames": len(self.state.frames),
"total_validators": len(validators)
})

for l_epoch, r_epoch in self.state.frames:
logger.info({
"msg": "Processing frame",
"start_epoch": l_epoch,
"end_epoch": r_epoch,
"total_epochs": r_epoch - l_epoch + 1
})

for epoch in sequence(l_epoch, r_epoch):
if epoch not in self.state.unprocessed_epochs:
logger.debug({"msg": f"Epoch {epoch} is already processed"})
continue

logger.info({
"msg": "Requesting performance data from collector",
"epoch": epoch
})
epoch_data = self.w3.performance.get_epoch(epoch)
if epoch_data is None:
logger.warning({"msg": f"Epoch {epoch} is missing in Performance Collector"})
continue

misses, props, syncs = epoch_data
logger.info({
"msg": "Performance data received",
"epoch": epoch,
"misses_count": len(misses),
"proposals_count": len(props),
"sync_duties_count": len(syncs)
})

for validator in validators:
missed_att = validator.index in misses
included_att = validator.index not in misses
is_active = is_active_validator(validator, EpochNumber(epoch))
if not is_active and missed_att:
raise ValueError(f"Validator {validator.index} missed attestation in epoch {epoch}, but was not active")
self.state.save_att_duty(EpochNumber(epoch), validator.index, included=included_att)

blocks_in_epoch = 0

for p in props:
vid = ValidatorIndex(p.validator_index)
self.state.save_prop_duty(EpochNumber(epoch), vid, included=bool(p.is_proposed))
blocks_in_epoch += p.is_proposed

if blocks_in_epoch:
for rec in syncs:
vid = ValidatorIndex(rec.validator_index)
fulfilled = max(0, blocks_in_epoch - rec.missed_count)
for _ in range(fulfilled):
self.state.save_sync_duty(EpochNumber(epoch), vid, included=True)
for _ in range(rec.missed_count):
self.state.save_sync_duty(EpochNumber(epoch), vid, included=False)

self.state.add_processed_epoch(EpochNumber(epoch))
self.state.log_progress()

def make_rewards_tree(self, shares: dict[NodeOperatorId, RewardsShares]) -> RewardsTree:
if not shares:
Expand Down Expand Up @@ -301,6 +345,12 @@ def get_epochs_range_to_process(self, blockstamp: BlockStamp) -> tuple[EpochNumb
CSM_CURRENT_FRAME_RANGE_L_EPOCH.set(l_epoch)
CSM_CURRENT_FRAME_RANGE_R_EPOCH.set(r_epoch)

logger.info({
"msg": "Epochs range for the report",
"l_epoch": l_epoch,
"r_epoch": r_epoch
})

return l_epoch, r_epoch

def converter(self, blockstamp: BlockStamp) -> Web3Converter:
Expand Down
Empty file.
Loading
Loading