From 67e0fd492f015131503dc6d8fd563e9e29e1029a Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 5 Mar 2025 10:36:12 +0100 Subject: [PATCH] feat(sentry): add support for electra single attestation events (#448) * feat: upgrade beacon dependency to v0.49.0 feat(sentry): add support for electra single attestation events * fix(events_single_attestation.go): remove empty line in getData method --- go.mod | 2 +- go.sum | 4 +- .../eth/v1/events_single_attestation.go | 187 ++++++++++++++++++ pkg/sentry/sentry.go | 31 +++ 4 files changed, 221 insertions(+), 3 deletions(-) create mode 100644 pkg/sentry/event/beacon/eth/v1/events_single_attestation.go diff --git a/go.mod b/go.mod index 0d2decc3..0f087dc4 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9 github.com/creasty/defaults v1.8.0 github.com/ethereum/go-ethereum v1.15.0 - github.com/ethpandaops/beacon v0.48.0 + github.com/ethpandaops/beacon v0.49.0 github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 github.com/ethpandaops/ethwallclock v0.3.0 github.com/ferranbt/fastssz v0.1.4 diff --git a/go.sum b/go.sum index 69d0994d..f911d0ca 100644 --- a/go.sum +++ b/go.sum @@ -225,8 +225,8 @@ github.com/ethereum/go-ethereum v1.15.0 h1:LLb2jCPsbJZcB4INw+E/MgzUX5wlR6SdwXcv0 github.com/ethereum/go-ethereum v1.15.0/go.mod h1:4q+4t48P2C03sjqGvTXix5lEOplf5dz4CTosbjt5tGs= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/ethpandaops/beacon v0.48.0 h1:rnlTrDyaaIE2qS/7Xtde5qSv8Os39Vgj25gwKJGdNAU= -github.com/ethpandaops/beacon v0.48.0/go.mod h1:NklxAyo+7cmTORZb/sOYcU8sS9bpTgGCfGZr2ZUjdSM= +github.com/ethpandaops/beacon v0.49.0 h1:L0DhsCdImD6WUuzyyr5li+Qi0CZvYcu68y+CNbmMqQw= +github.com/ethpandaops/beacon v0.49.0/go.mod h1:NklxAyo+7cmTORZb/sOYcU8sS9bpTgGCfGZr2ZUjdSM= github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m0oxOk03m11n/xgdY5ceyUf/ZxYdOs5gE= github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8= github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4= diff --git a/pkg/sentry/event/beacon/eth/v1/events_single_attestation.go b/pkg/sentry/event/beacon/eth/v1/events_single_attestation.go new file mode 100644 index 00000000..d069b0f3 --- /dev/null +++ b/pkg/sentry/event/beacon/eth/v1/events_single_attestation.go @@ -0,0 +1,187 @@ +package event + +import ( + "context" + "fmt" + "time" + + "github.com/attestantio/go-eth2-client/spec/electra" + "github.com/attestantio/go-eth2-client/spec/phase0" + xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" + "github.com/ethpandaops/xatu/pkg/proto/xatu" + "github.com/ethpandaops/xatu/pkg/sentry/ethereum" + "github.com/google/uuid" + ttlcache "github.com/jellydator/ttlcache/v3" + hashstructure "github.com/mitchellh/hashstructure/v2" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +type EventsSingleAttestation struct { + log logrus.FieldLogger + + now time.Time + + event *electra.SingleAttestation + beacon *ethereum.BeaconNode + duplicateCache *ttlcache.Cache[string, time.Time] + clientMeta *xatu.ClientMeta + id uuid.UUID +} + +func NewEventsSingleAttestation(log logrus.FieldLogger, event *electra.SingleAttestation, now time.Time, beacon *ethereum.BeaconNode, duplicateCache *ttlcache.Cache[string, time.Time], clientMeta *xatu.ClientMeta) (*EventsSingleAttestation, error) { + if event == nil { + return nil, fmt.Errorf("event is nil") + } + + return &EventsSingleAttestation{ + log: log.WithField("event", "BEACON_API_ETH_V1_EVENTS_ATTESTATION_V2/SINGLE"), + now: now, + event: event, + beacon: beacon, + duplicateCache: duplicateCache, + clientMeta: clientMeta, + id: uuid.New(), + }, nil +} + +func (e *EventsSingleAttestation) AttestationData() *phase0.AttestationData { + return e.event.Data +} + +func (e *EventsSingleAttestation) getData() (*xatuethv1.AttestationV2, error) { + attestation := e.event + if attestation == nil { + return nil, fmt.Errorf("electra attestation is nil") + } + + return &xatuethv1.AttestationV2{ + AggregationBits: "", + Data: &xatuethv1.AttestationDataV2{ + Slot: &wrapperspb.UInt64Value{Value: uint64(attestation.Data.Slot)}, + Index: &wrapperspb.UInt64Value{Value: uint64(attestation.Data.Index)}, + BeaconBlockRoot: xatuethv1.RootAsString(attestation.Data.BeaconBlockRoot), + Source: &xatuethv1.CheckpointV2{ + Epoch: &wrapperspb.UInt64Value{Value: uint64(attestation.Data.Source.Epoch)}, + Root: xatuethv1.RootAsString(attestation.Data.Source.Root), + }, + Target: &xatuethv1.CheckpointV2{ + Epoch: &wrapperspb.UInt64Value{Value: uint64(attestation.Data.Target.Epoch)}, + Root: xatuethv1.RootAsString(attestation.Data.Target.Root), + }, + }, + }, nil +} + +func (e *EventsSingleAttestation) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error) { + data, err := e.getData() + if err != nil { + return nil, err + } + + decoratedEvent := &xatu.DecoratedEvent{ + Event: &xatu.Event{ + Name: xatu.Event_BEACON_API_ETH_V1_EVENTS_ATTESTATION_V2, + DateTime: timestamppb.New(e.now), + Id: e.id.String(), + }, + Meta: &xatu.Meta{ + Client: e.clientMeta, + }, + Data: &xatu.DecoratedEvent_EthV1EventsAttestationV2{ + EthV1EventsAttestationV2: data, + }, + } + + additionalData, err := e.getAdditionalData(ctx) + if err != nil { + e.log.WithError(err).Error("Failed to get extra attestation data") + } else { + decoratedEvent.Meta.Client.AdditionalData = &xatu.ClientMeta_EthV1EventsAttestationV2{ + EthV1EventsAttestationV2: additionalData, + } + } + + return decoratedEvent, nil +} + +func (e *EventsSingleAttestation) ShouldIgnore(ctx context.Context) (bool, error) { + if err := e.beacon.Synced(ctx); err != nil { + return true, err + } + + hash, err := hashstructure.Hash(e.event, hashstructure.FormatV2, nil) + if err != nil { + return true, err + } + + item, retrieved := e.duplicateCache.GetOrSet(fmt.Sprint(hash), e.now, ttlcache.WithTTL[string, time.Time](ttlcache.DefaultTTL)) + if retrieved { + e.log.WithFields(logrus.Fields{ + "hash": hash, + "time_since_first_item": time.Since(item.Value()), + }).Debug("Duplicate attestation event received") + + return true, nil + } + + return false, nil +} + +func (e *EventsSingleAttestation) getAdditionalData(_ context.Context) (*xatu.ClientMeta_AdditionalEthV1EventsAttestationV2Data, error) { + extra := &xatu.ClientMeta_AdditionalEthV1EventsAttestationV2Data{} + + attestationData := e.AttestationData() + + slot := attestationData.Slot + + attestionSlot := e.beacon.Metadata().Wallclock().Slots().FromNumber(uint64(slot)) + epoch := e.beacon.Metadata().Wallclock().Epochs().FromSlot(uint64(slot)) + + extra.Slot = &xatu.SlotV2{ + Number: &wrapperspb.UInt64Value{Value: attestionSlot.Number()}, + StartDateTime: timestamppb.New(attestionSlot.TimeWindow().Start()), + } + + extra.Epoch = &xatu.EpochV2{ + Number: &wrapperspb.UInt64Value{Value: epoch.Number()}, + StartDateTime: timestamppb.New(epoch.TimeWindow().Start()), + } + + extra.Propagation = &xatu.PropagationV2{ + SlotStartDiff: &wrapperspb.UInt64Value{ + //nolint:gosec // not concerned in reality + Value: uint64(e.now.Sub(attestionSlot.TimeWindow().Start()).Milliseconds()), + }, + } + + target := attestationData.Target + + // Build out the target section + targetEpoch := e.beacon.Metadata().Wallclock().Epochs().FromNumber(uint64(target.Epoch)) + extra.Target = &xatu.ClientMeta_AdditionalEthV1AttestationTargetV2Data{ + Epoch: &xatu.EpochV2{ + Number: &wrapperspb.UInt64Value{Value: targetEpoch.Number()}, + StartDateTime: timestamppb.New(targetEpoch.TimeWindow().Start()), + }, + } + + source := attestationData.Source + + // Build out the source section + sourceEpoch := e.beacon.Metadata().Wallclock().Epochs().FromNumber(uint64(source.Epoch)) + extra.Source = &xatu.ClientMeta_AdditionalEthV1AttestationSourceV2Data{ + Epoch: &xatu.EpochV2{ + Number: &wrapperspb.UInt64Value{Value: sourceEpoch.Number()}, + StartDateTime: timestamppb.New(sourceEpoch.TimeWindow().Start()), + }, + } + + extra.AttestingValidator = &xatu.AttestingValidatorV2{ + CommitteeIndex: &wrapperspb.UInt64Value{Value: uint64(e.event.CommitteeIndex)}, + Index: &wrapperspb.UInt64Value{Value: uint64(e.event.AttesterIndex)}, + } + + return extra, nil +} diff --git a/pkg/sentry/sentry.go b/pkg/sentry/sentry.go index 0d20638e..fc780d4f 100644 --- a/pkg/sentry/sentry.go +++ b/pkg/sentry/sentry.go @@ -18,6 +18,7 @@ import ( eth2v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/altair" + "github.com/attestantio/go-eth2-client/spec/electra" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/beevik/ntp" "github.com/ethpandaops/beacon/pkg/beacon" @@ -279,6 +280,36 @@ func (s *Sentry) Start(ctx context.Context) error { s.log.Fatal("Unable to determine Ethereum network. Provide an override network name via ethereum.overrideNetworkName") } + s.beacon.Node().OnSingleAttestation(ctx, func(ctx context.Context, ev *electra.SingleAttestation) error { + now := time.Now().Add(s.clockDrift) + + meta, err := s.createNewClientMeta(ctx) + if err != nil { + return err + } + + event, err := v1.NewEventsSingleAttestation(s.log, ev, now, s.beacon, s.duplicateCache.BeaconETHV1EventsAttestation, meta) + if err != nil { + return err + } + + ignore, err := event.ShouldIgnore(ctx) + if err != nil { + return err + } + + if ignore { + return nil + } + + decoratedEvent, err := event.Decorate(ctx) + if err != nil { + return err + } + + return s.handleNewDecoratedEvent(ctx, decoratedEvent) + }) + s.beacon.Node().OnAttestation(ctx, func(ctx context.Context, ev *spec.VersionedAttestation) error { now := time.Now().Add(s.clockDrift)