Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4c80bc2
fix: missing pivot slot in block roots
vgorkavenko Jul 26, 2025
a11e169
fix: review
vgorkavenko Jul 28, 2025
d40d72b
feat: test
vgorkavenko Jul 28, 2025
0cef6e8
fix: prysm error on `get_sync_committee`
vgorkavenko Jul 28, 2025
3b07965
feat: add test> add call to check module
vgorkavenko Jul 28, 2025
cc1860c
fix: handle zero block roots
vgorkavenko Jul 28, 2025
2230144
fix: linter
vgorkavenko Jul 28, 2025
72de383
Merge pull request #736 from lidofinance/fix/prysm-error-on-sync-comm…
F4ever Jul 29, 2025
168a05d
feat: csm state versions
madlabman Jul 29, 2025
f895e8e
chore: update pr template
madlabman Jul 29, 2025
a90cbf6
fix: init _version
madlabman Jul 29, 2025
a4943cb
test: fix state tests
madlabman Jul 29, 2025
dfba269
chore: remove _consensus_version usage
madlabman Jul 29, 2025
124e655
refactor: extract CSM_STATE_VERSION
madlabman Jul 29, 2025
b805734
Merge pull request #737 from lidofinance/cache-version
F4ever Jul 29, 2025
ff2af8a
Merge pull request #735 from lidofinance/fix/csm-v2/missing-pivot-slo…
F4ever Jul 29, 2025
ba26976
fix: fetch used module operators keys and add sanity checks
madlabman Aug 7, 2025
3247dac
chore: use list comprehension
madlabman Aug 7, 2025
c6f98be
test: add duplicated keys test
madlabman Aug 7, 2025
7941ba0
Merge pull request #746 from lidofinance/fix-740-port
F4ever Aug 8, 2025
cad9900
chore(log): add missing log to future_withdrawals
chasingrainbows Aug 8, 2025
b39e00d
Merge pull request #752 from lidofinance/feature/add-future_withdrawa…
F4ever Aug 11, 2025
b9a7a57
Merge branch 'develop' into feat/vaults-pectra-ipfs-upgrade
hweawer Aug 26, 2025
9648918
feat: Integrate new web3py-multi-provider
hweawer Aug 26, 2025
ba365ad
feat: Fix stream consumption
hweawer Aug 26, 2025
d8e45f7
feat: reorder call to init_metrics
hweawer Aug 26, 2025
4a89f9b
Remove empty
hweawer Aug 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ Describe how you tested the changes:
## Checklist
- [ ] Documentation updated (if required)
- [ ] New tests added (if applicable)
- [ ] `CSM_STATE_VERSION` is bumped (if the new version affects data in the cache)
2,255 changes: 1,134 additions & 1,121 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pytest = "^7.2.1"
pytest-xdist = "^3.2.1"
more-itertools = "^10.1.0"
web3 = "^7.8.0"
web3-multi-provider = { version = "^2.2.1", extras = ["metrics"] }
web3-multi-provider = { version = "^2.2.2", extras = ["metrics"] }
json-stream = "^2.3.2"
oz-merkle-tree = { git = "https://github.com/lidofinance/oz-merkle-tree" }
multiformats = "^0.3.1"
Expand Down
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@
ALLOWED_KAPI_VERSION = Version('1.5.0')

GENESIS_VALIDATORS_ROOT = bytes([0] * 32) # all zeros for deposits
CSM_STATE_VERSION = 1
2 changes: 1 addition & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def main(module_name: OracleModule):

logger.info({'msg': f'Start http server with prometheus metrics on port {variables.PROMETHEUS_PORT}'})
start_http_server(variables.PROMETHEUS_PORT)
init_metrics()

logger.info({'msg': 'Initialize multi web3 provider.'})
web3 = Web3(FallbackProviderModule(
Expand Down Expand Up @@ -92,7 +93,6 @@ def main(module_name: OracleModule):
})

logger.info({'msg': 'Initialize prometheus metrics.'})
init_metrics()

instance: Accounting | Ejector | CSOracle
if module_name == OracleModule.ACCOUNTING:
Expand Down
8 changes: 8 additions & 0 deletions src/modules/checks/suites/consensus_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ def check_attestation_committees(web3: Web3, blockstamp):
assert web3.cc.get_attestation_committees(blockstamp, epoch), "consensus-client provide no attestation committees"


def check_sync_committee(web3: Web3, blockstamp):
"""Check that consensus-client able to provide sync committee"""
cc_config = web3.cc.get_config_spec()
slots_per_epoch = cc_config.SLOTS_PER_EPOCH
epoch = blockstamp.slot_number // slots_per_epoch
assert web3.cc.get_sync_committee(blockstamp, epoch), "consensus-client provide no sync committee"


def check_block_attestations_and_sync(web3: Web3, blockstamp):
"""Check that consensus-client able to provide block attestations"""
assert web3.cc.get_block_attestations_and_sync(blockstamp.slot_number), "consensus-client provide no block attestations and sync"
25 changes: 22 additions & 3 deletions src/modules/csm/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
from threading import Lock
from typing import Iterable, Sequence

from hexbytes import HexBytes

from src import variables
from src.constants import SLOTS_PER_HISTORICAL_ROOT, EPOCHS_PER_SYNC_COMMITTEE_PERIOD
from src.metrics.prometheus.csm import CSM_UNPROCESSED_EPOCHS_COUNT, CSM_MIN_UNPROCESSED_EPOCH
from src.modules.csm.state import State
from src.modules.submodules.types import ZERO_HASH
from src.providers.consensus.client import ConsensusClient
from src.providers.consensus.types import SyncCommittee, SyncAggregate
from src.utils.blockstamp import build_blockstamp
Expand All @@ -21,6 +24,8 @@
from src.utils.types import hex_str_to_bytes
from src.utils.web3converter import Web3Converter

ZERO_BLOCK_ROOT = HexBytes(ZERO_HASH).to_0x_hex()

logger = logging.getLogger(__name__)
lock = Lock()

Expand Down Expand Up @@ -165,13 +170,27 @@ def exec(self, checkpoint: FrameCheckpoint) -> int:
def _get_block_roots(self, checkpoint_slot: SlotNumber):
logger.info({"msg": f"Get block roots for slot {checkpoint_slot}"})
# Checkpoint for us like a time point, that's why we use slot, not root.
br = self.cc.get_state_block_roots(checkpoint_slot)
# `s % 8192 = i` is the index where slot `s` will be located.
# If `s` is `checkpoint_slot -> state.slot`, then it cannot yet be in `block_roots`.
# So it is the index that will be overwritten in the next slot, i.e. the index of the oldest root.
pivot_index = checkpoint_slot % SLOTS_PER_HISTORICAL_ROOT
br = self.cc.get_state_block_roots(checkpoint_slot)
# Replace duplicated roots to None to mark missed slots
return [br[i] if i == pivot_index or br[i] != br[i - 1] else None for i in range(len(br))]
# The oldest root can be missing, so we need to check it and mark it as well as other missing slots
pivot_block_root = br[pivot_index]
slot_by_pivot_block_root = self.cc.get_block_header(pivot_block_root).data.header.message.slot
calculated_pivot_slot = max(checkpoint_slot - SLOTS_PER_HISTORICAL_ROOT, 0)
is_pivot_missing = slot_by_pivot_block_root != calculated_pivot_slot

# Replace duplicated roots with `None` to mark missing slots
br = [
br[i] if br[i] != ZERO_BLOCK_ROOT and (i == pivot_index or br[i] != br[i - 1])
else None
for i in range(len(br))
]
if is_pivot_missing:
br[pivot_index] = None

return br

def _select_block_roots(
self, block_roots: list[BlockRoot | None], duty_epoch: EpochNumber, checkpoint_slot: SlotNumber
Expand Down
4 changes: 1 addition & 3 deletions src/modules/csm/csm.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ def validate_state(self, blockstamp: ReferenceBlockStamp) -> None:
def collect_data(self, blockstamp: BlockStamp) -> bool:
"""Ongoing report data collection for the estimated reference slot"""

consensus_version = self.get_consensus_version(blockstamp)

logger.info({"msg": "Collecting data for the report"})

converter = self.converter(blockstamp)
Expand Down Expand Up @@ -189,7 +187,7 @@ def collect_data(self, blockstamp: BlockStamp) -> bool:
logger.info({"msg": "The starting epoch of the epochs range is not finalized yet"})
return False

self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame, consensus_version)
self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame)
self.state.log_progress()

if self.state.is_fulfilled:
Expand Down
2 changes: 1 addition & 1 deletion src/modules/csm/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _get_ref_blockstamp_for_frame(
)

def _get_module_validators(self, blockstamp: ReferenceBlockStamp) -> ValidatorsByNodeOperator:
return self.w3.lido_validators.get_module_validators_by_node_operators(
return self.w3.lido_validators.get_used_module_validators_by_node_operators(
StakingModuleAddress(self.w3.csm.module.address), blockstamp
)

Expand Down
33 changes: 18 additions & 15 deletions src/modules/csm/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Self

from src import variables
from src.constants import CSM_STATE_VERSION
from src.types import EpochNumber, ValidatorIndex
from src.utils.range import sequence

Expand Down Expand Up @@ -84,15 +85,19 @@ class State:
_epochs_to_process: tuple[EpochNumber, ...]
_processed_epochs: set[EpochNumber]

_consensus_version: int
_version: int

EXTENSION = ".pkl"

def __init__(self) -> None:
self.data = {}
self._epochs_to_process = tuple()
self._processed_epochs = set()
self._consensus_version = 0
self._version = CSM_STATE_VERSION

EXTENSION = ".pkl"
@property
def version(self) -> int | None:
return getattr(self, "_version", None)

@classmethod
def load(cls) -> Self:
Expand Down Expand Up @@ -156,7 +161,6 @@ def clear(self) -> None:
self.data = {}
self._epochs_to_process = tuple()
self._processed_epochs.clear()
self._consensus_version = 0
assert self.is_empty

@lru_cache(variables.CSM_ORACLE_MAX_CONCURRENCY)
Expand Down Expand Up @@ -185,16 +189,15 @@ def add_processed_epoch(self, epoch: EpochNumber) -> None:
def log_progress(self) -> None:
logger.info({"msg": f"Processed {len(self._processed_epochs)} of {len(self._epochs_to_process)} epochs"})

def migrate(
self, l_epoch: EpochNumber, r_epoch: EpochNumber, epochs_per_frame: int, consensus_version: int
) -> None:
if self._consensus_version and consensus_version != self._consensus_version:
logger.warning(
{
"msg": f"Cache was built for consensus version {self._consensus_version}. "
f"Discarding data to migrate to consensus version {consensus_version}"
}
)
def migrate(self, l_epoch: EpochNumber, r_epoch: EpochNumber, epochs_per_frame: int) -> None:
if self.version != CSM_STATE_VERSION:
if self.version is not None:
logger.warning(
{
"msg": f"Cache was built with version={self.version}. "
f"Discarding data to migrate to cache version={CSM_STATE_VERSION}"
}
)
self.clear()

new_frames = self._calculate_frames(tuple(sequence(l_epoch, r_epoch)), epochs_per_frame)
Expand All @@ -205,7 +208,7 @@ def migrate(

self.find_frame.cache_clear()
self._epochs_to_process = tuple(sequence(l_epoch, r_epoch))
self._consensus_version = consensus_version
self._version = CSM_STATE_VERSION
self.commit()

def _migrate_frames_data(self, new_frames: list[Frame]):
Expand Down
1 change: 1 addition & 0 deletions src/modules/ejector/ejector.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def _get_total_expected_balance(self, vals_to_exit: list[Validator], blockstamp:
EJECTOR_MAX_WITHDRAWAL_EPOCH.set(withdrawal_epoch)

future_withdrawals = self._get_withdrawable_lido_validators_balance(withdrawal_epoch, blockstamp)
logger.info({'msg': 'Calculate future withdrawals sum.', 'value': future_withdrawals})
future_rewards = (withdrawal_epoch + epochs_to_sweep - blockstamp.ref_epoch) * rewards_speed_per_epoch
logger.info({'msg': 'Calculate future rewards.', 'value': future_rewards})

Expand Down
4 changes: 3 additions & 1 deletion src/modules/submodules/oracle_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from src.modules.submodules.exceptions import IsNotMemberException, IncompatibleOracleVersion, ContractVersionMismatch
from src.providers.http_provider import NotOkResponse
from src.providers.ipfs import IPFSError
from src.providers.keys.client import KeysOutdatedException
from src.providers.keys.client import KAPIInconsistentData, KeysOutdatedException
from src.utils.cache import clear_global_cache
from src.web3py.extensions.lido_validators import CountOfKeysDiffersException
from src.utils.blockstamp import build_blockstamp
Expand Down Expand Up @@ -102,6 +102,8 @@ def _cycle(self):
logger.error({'msg': ''.join(traceback.format_exception(error))})
except (NoSlotsAvailable, SlotNotFinalized, InconsistentData) as error:
logger.error({'msg': 'Inconsistent response from consensus layer node.', 'error': str(error)})
except KAPIInconsistentData as error:
logger.error({'msg': 'Inconsistent response from Keys API service', 'error': str(error)})
except KeysOutdatedException as error:
logger.error({'msg': 'Keys API service returns outdated data.', 'error': str(error)})
except CountOfKeysDiffersException as error:
Expand Down
40 changes: 32 additions & 8 deletions src/providers/consensus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,22 @@ def get_attestation_committees(

def get_sync_committee(self, blockstamp: BlockStamp, epoch: EpochNumber) -> SyncCommittee:
"""Spec: https://ethereum.github.io/beacon-APIs/#/Beacon/getEpochSyncCommittees"""
data, _ = self._get(
self.API_GET_SYNC_COMMITTEE,
path_params=(blockstamp.state_root,),
query_params={'epoch': epoch},
force_raise=self.__raise_on_prysm_error,
retval_validator=data_is_dict,
)
try:
data, _ = self._get(
self.API_GET_SYNC_COMMITTEE,
path_params=(blockstamp.state_root,),
query_params={'epoch': epoch},
force_raise=self.__raise_on_prysm_error,
retval_validator=data_is_dict,
)
except NotOkResponse as error:
if self.PRYSM_STATE_NOT_FOUND_ERROR in error.text:
data = self._get_sync_committee_with_prysm(
blockstamp,
epoch,
)
else:
raise error
return SyncCommittee.from_response(**data)

@list_of_dataclasses(ProposerDuties.from_response)
Expand Down Expand Up @@ -290,6 +299,21 @@ def _get_attestation_committees_with_prysm(
)
return data

def _get_sync_committee_with_prysm(
self,
blockstamp: BlockStamp,
epoch: EpochNumber,
) -> list[dict]:
# Avoid Prysm issue with state root - https://github.com/prysmaticlabs/prysm/issues/12053
# Trying to get committees by slot number
data, _ = self._get(
self.API_GET_SYNC_COMMITTEE,
path_params=(blockstamp.slot_number,),
query_params={'epoch': epoch},
retval_validator=data_is_dict,
)
return data

def __raise_last_missed_slot_error(self, errors: list[Exception]) -> Exception | None:
"""
Prioritize NotOkResponse before other exceptions (ConnectionError, TimeoutError).
Expand All @@ -304,7 +328,7 @@ def __raise_last_missed_slot_error(self, errors: list[Exception]) -> Exception |

def _get_chain_id_with_provider(self, provider_index: int) -> int:
data, _ = self._get_without_fallbacks(
self.hosts[provider_index],
self.managers[provider_index],
self.API_GET_SPEC,
retval_validator=data_is_dict,
)
Expand Down
Loading
Loading