Skip to content

Commit

Permalink
feat(cannon): Canonical Beacon Committees (#351)
Browse files Browse the repository at this point in the history
* feat(cannon): Canonical Beacon Committees

* feat(cannon): Canonical Beacon Committees

* Update deploy/migrations/clickhouse/045_canonical_beacon_committee.up.sql

Co-authored-by: Andrew Davis <[email protected]>
Signed-off-by: Sam Calder-Mason <[email protected]>

---------

Signed-off-by: Sam Calder-Mason <[email protected]>
Co-authored-by: Andrew Davis <[email protected]>
  • Loading branch information
samcm and Savid authored Jul 24, 2024
1 parent c0419c3 commit 5931041
Show file tree
Hide file tree
Showing 40 changed files with 1,934 additions and 1,374 deletions.
62 changes: 61 additions & 1 deletion deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ transforms:
canonical_beacon_block_withdrawal: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_WITHDRAWAL"
canonical_beacon_block: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_V2" && .meta.client.additional_data.finalized_when_requested == true
canonical_beacon_proposer_duty: .event.name == "BEACON_API_ETH_V1_PROPOSER_DUTY" && .meta.client.additional_data.state_id == "finalized"
eth_v1_beacon_committee: .event.name == "BEACON_API_ETH_V1_BEACON_COMMITTEE"
canonical_beacon_committee: .event.name == "BEACON_API_ETH_V1_BEACON_COMMITTEE" && .meta.client.additional_data.state_id == "finalized"
eth_v1_beacon_committee: .event.name == "BEACON_API_ETH_V1_BEACON_COMMITTEE" && .meta.client.additional_data.state_id != "finalized"
eth_v1_proposer_duty: .event.name == "BEACON_API_ETH_V1_PROPOSER_DUTY" && .meta.client.additional_data.state_id == "head"
eth_v1_events_attestation_v2: .event.name == "BEACON_API_ETH_V1_EVENTS_ATTESTATION_V2"
eth_v1_events_attestation: .event.name == "BEACON_API_ETH_V1_EVENTS_ATTESTATION"
Expand Down Expand Up @@ -355,6 +356,7 @@ transforms:
- xatu_server_events_router.canonical_beacon_block_voluntary_exit
- xatu_server_events_router.canonical_beacon_block_withdrawal
- xatu_server_events_router.canonical_beacon_proposer_duty
- xatu_server_events_router.canonical_beacon_committee
- xatu_server_events_router.eth_v1_beacon_committee
- xatu_server_events_router.eth_v1_proposer_duty
- xatu_server_events_router.eth_v1_events_attestation
Expand Down Expand Up @@ -1815,6 +1817,44 @@ transforms:
.proposer_validator_index = .data.validator_index
.proposer_pubkey = .data.pubkey
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
canonical_beacon_committee_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_committee
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
.slot = .data.slot
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.validators = .data.validators
.committee_index = .data.index
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
Expand Down Expand Up @@ -2066,6 +2106,26 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
canonical_beacon_committee_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_committee_formatted
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_committee
batch:
max_bytes: 52428800
max_events: 1000000
timeout_secs: 1
buffer:
max_events: 1000000
healthcheck:
enabled: true
skip_unknown_fields: false
canonical_beacon_block_clickhouse:
type: clickhouse
inputs:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS default.canonical_beacon_committee ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS default.canonical_beacon_committee_local ON cluster '{cluster}' SYNC;
53 changes: 53 additions & 0 deletions deploy/migrations/clickhouse/045_canonical_beacon_committee.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
CREATE TABLE default.canonical_beacon_committee_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'When the sentry received the event from a beacon node' CODEC(DoubleDelta, ZSTD(1)),
`slot` UInt32 COMMENT 'Slot number in the beacon API committee payload' CODEC(DoubleDelta, ZSTD(1)),
`slot_start_date_time` DateTime COMMENT 'The wall clock time when the slot started' CODEC(DoubleDelta, ZSTD(1)),
`committee_index` LowCardinality(String) COMMENT 'The committee index in the beacon API committee payload',
`validators` Array(UInt32) COMMENT 'The validator indices in the beacon API committee payload' CODEC(ZSTD(1)),
`epoch` UInt32 COMMENT 'The epoch number in the beacon API committee payload' CODEC(DoubleDelta, ZSTD(1)),
`epoch_start_date_time` DateTime COMMENT 'The wall clock time when the epoch started' CODEC(DoubleDelta, ZSTD(1)),
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_id` String COMMENT 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.' CODEC(ZSTD(1)),
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_id` Int32 COMMENT 'Ethereum network ID' CODEC(DoubleDelta, ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name',
`meta_consensus_version` LowCardinality(String) COMMENT 'Ethereum consensus client version that generated the event',
`meta_consensus_version_major` LowCardinality(String) COMMENT 'Ethereum consensus client major version that generated the event',
`meta_consensus_version_minor` LowCardinality(String) COMMENT 'Ethereum consensus client minor version that generated the event',
`meta_consensus_version_patch` LowCardinality(String) COMMENT 'Ethereum consensus client patch version that generated the event',
`meta_consensus_implementation` LowCardinality(String) COMMENT 'Ethereum consensus client implementation that generated the event',
`meta_labels` Map(String, String) COMMENT 'Labels associated with the event' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/{database}/tables/{table}/{shard}',
'{replica}',
updated_date_time
) PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY
(
slot_start_date_time,
meta_network_name,
committee_index,
) COMMENT 'Contains canonical beacon API /eth/v1/beacon/committees data.';

CREATE TABLE default.canonical_beacon_committee ON CLUSTER '{cluster}' AS default.canonical_beacon_committee_local ENGINE = Distributed(
'{cluster}',
default,
canonical_beacon_committee_local,
cityHash64(
slot_start_date_time,
meta_network_name,
committee_index
)
);
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9
github.com/creasty/defaults v1.7.0
github.com/ethereum/go-ethereum v1.13.15
github.com/ethpandaops/beacon v0.37.0
github.com/ethpandaops/beacon v0.38.0
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756
github.com/ethpandaops/ethwallclock v0.3.0
github.com/go-co-op/gocron v1.27.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R
github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/go-ethereum v1.13.15 h1:U7sSGYGo4SPjP6iNIifNoyIAiNjrmQkz6EwQG+/EZWo=
github.com/ethereum/go-ethereum v1.13.15/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/ethpandaops/beacon v0.37.0 h1:T+F0IEjkSrevAbGA4zsqvqjnm4IRp+JKLsd8DyAO8ZQ=
github.com/ethpandaops/beacon v0.37.0/go.mod h1:B+SLxj1gnDd/Ia7cl/uuhzo1wyVf2p2puL6lmzPdPro=
github.com/ethpandaops/beacon v0.38.0 h1:sMFlq49t/PIrp7DlSWgM+OgPAyblvMeV+jr2AOW6ls0=
github.com/ethpandaops/beacon v0.38.0/go.mod h1:B+SLxj1gnDd/Ia7cl/uuhzo1wyVf2p2puL6lmzPdPro=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m0oxOk03m11n/xgdY5ceyUf/ZxYdOs5gE=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
Expand Down
18 changes: 18 additions & 0 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,24 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
c.beacon,
clientMeta,
),
v1.NewBeaconCommitteeDeriver(
c.log,
&c.Config.Derivers.BeaconCommitteeConfig,
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V1_BEACON_COMMITTEE,
c.coordinatorClient,
wallclock,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
2,
),
c.beacon,
clientMeta,
),
}

c.eventDerivers = eventDerivers
Expand Down
Loading

0 comments on commit 5931041

Please sign in to comment.