Skip to content

Commit

Permalink
beacon-client: attestation data request caching
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Oct 16, 2024
1 parent ec5e0ad commit 04f99aa
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 112 deletions.
106 changes: 95 additions & 11 deletions beacon/goclient/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,106 @@ func (gc *GoClient) AttesterDuties(ctx context.Context, epoch phase0.Epoch, vali
return resp.Data, nil
}

func (gc *GoClient) GetAttestationData(slot phase0.Slot, committeeIndex phase0.CommitteeIndex) (*phase0.AttestationData, spec.DataVersion, error) {
// GetAttestationData returns attestation data for a given slot (which is same for all 64 committeeIndex
// values that identify Ethereum committees chosen to attest on this slot).
// Note, committeeIndex is an optional parameter that will be used to set AttestationData.Index
// in the resulting data returned from this function.
// Note, result returned is meant to be read-only, it's not safe to modify it (because it will be
// accessed by multiple concurrent readers).
func (gc *GoClient) GetAttestationData(slot phase0.Slot, committeeIndex phase0.CommitteeIndex) (
result *phase0.AttestationData,
version spec.DataVersion,
err error,
) {
// Final processing for result returned.
defer func() {
if err != nil {
// Nothing to process, just propagate error.
return
}

// Assign committeeIndex passed to GetAttestationData call, the rest of attestation data stays
// unchanged.
// Note, we cannot return result object directly here modifying its Index value because it
// would be unsynchronised concurrent write (since it's cached for other concurrent readers
// to access). Hence, we return shallow copy here. We don't need to return deep copy because
// the callers of GetAttestationData will only use this data to read it (they won't update it).
result = &phase0.AttestationData{
Slot: result.Slot,
Index: committeeIndex,
BeaconBlockRoot: result.BeaconBlockRoot,
Source: result.Source,
Target: result.Target,
}
}()

// Check cache.
cachedResult, ok := gc.attestationDataCache.Get(slot)
if ok {
return cachedResult, spec.DataVersionPhase0, nil
}

// Have to make beacon node request and cache the result.
attDataReqStart := time.Now()
resp, err := gc.client.AttestationData(gc.ctx, &api.AttestationDataOpts{
Slot: slot,
CommitteeIndex: committeeIndex,
})
result, err = func() (*phase0.AttestationData, error) {
// Requests with the same slot number must lock on the same mutex.
reqMu := &gc.attestationReqMuPool[int64(slot)%int64(len(gc.attestationReqMuPool))]
reqMu.Lock()
defer reqMu.Unlock()

// Prevent making more than 1 beacon node requests in case somebody has already made this
// request concurrently and succeeded.
cachedResult, ok := gc.attestationDataCache.Get(slot)
if ok {
return cachedResult, nil
}

resp, err := gc.client.AttestationData(gc.ctx, &api.AttestationDataOpts{
Slot: slot,
})
if err != nil {
return nil, fmt.Errorf("failed to get attestation data: %w", err)
}
if resp == nil {
return nil, fmt.Errorf("attestation data response is nil")
}

gc.attestationDataCache.Set(slot, resp.Data)
gc.recentAttestationSlot.Store(uint64(slot))

return resp.Data, nil
}()
metricsAttesterDataRequest.Observe(time.Since(attDataReqStart).Seconds())
if err != nil {
return nil, DataVersionNil, fmt.Errorf("failed to get attestation data: %w", err)
}
if resp == nil {
return nil, DataVersionNil, fmt.Errorf("attestation data response is nil")
return nil, DataVersionNil, err
}

metricsAttesterDataRequest.Observe(time.Since(attDataReqStart).Seconds())
return result, spec.DataVersionPhase0, nil
}

// pruneStaleAttestationDataRunner will periodically prune attestationDataCache to keep it from growing
// perpetually.
func (gc *GoClient) pruneStaleAttestationDataRunner() {
pruneStaleAttestationData := func() {
// slotRetainCnt defines how many recent slots we want to preserve in attestation data cache.
slotRetainCnt := 5 * gc.network.SlotsPerEpoch()
gc.attestationDataCache.Range(func(slot phase0.Slot, data *phase0.AttestationData) bool {
if uint64(slot) < (gc.recentAttestationSlot.Load() - slotRetainCnt) {
gc.attestationDataCache.Delete(slot)
}
return true
})
}

return resp.Data, spec.DataVersionPhase0, nil
ticker := time.NewTicker(10 * time.Minute)
for {
select {
case <-gc.ctx.Done():
return
case <-ticker.C:
pruneStaleAttestationData()
}
}
}

// SubmitAttestations implements Beacon interface
Expand Down
67 changes: 46 additions & 21 deletions beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

eth2client "github.com/attestantio/go-eth2-client"
Expand All @@ -17,13 +18,13 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/logging/fields"
operatordatastore "github.com/ssvlabs/ssv/operator/datastore"
"github.com/ssvlabs/ssv/operator/slotticker"
beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
"github.com/ssvlabs/ssv/utils/casts"
"github.com/ssvlabs/ssv/utils/hashmap"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -141,19 +142,40 @@ var _ NodeClientProvider = (*GoClient)(nil)

// GoClient implementing Beacon struct
type GoClient struct {
log *zap.Logger
ctx context.Context
network beaconprotocol.Network
client Client
nodeVersion string
nodeClient NodeClient
gasLimit uint64
operatorDataStore operatordatastore.OperatorDataStore
log *zap.Logger
ctx context.Context
network beaconprotocol.Network
client Client
nodeVersion string
nodeClient NodeClient
gasLimit uint64

operatorDataStore operatordatastore.OperatorDataStore

registrationMu sync.Mutex
registrationLastSlot phase0.Slot
registrationCache map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration
commonTimeout time.Duration
longTimeout time.Duration

// attestationReqMuPool helps us prevent the sending of multiple attestation data requests
// for the same slot number (to avoid doing unnecessary work).
// It's a pool of mutexes (not a single mutex) to allow for some parallelism when requests
// targeting different slots are made.
attestationReqMuPool [32]sync.Mutex
// attestationDataCache stores attestation data from Beacon node for a bunch of recently made
// requests (by slot number). It allows for requesting attestation data once per slot from
// Beacon node as well as always having/observing the same consistent data in any given slot
// (eg, Beacon node might return different merkle root for the same slot for different requests,
// and this can lead to undesirable effects for GoClient users).
// attestationDataCache is used concurrently, hence thread-safe map.
// Note, we cache attestation data by slot (and not slot+committee_index) because it's the same
// data across all 64 Ethereum committees assigned for each slot.
attestationDataCache *hashmap.Map[phase0.Slot, *phase0.AttestationData]
// recentAttestationSlot keeps track of recent (not necessarily the latest) slot attestation
// data was requested for.
recentAttestationSlot atomic.Uint64

commonTimeout time.Duration
longTimeout time.Duration
}

// New init new client and go-client instance
Expand Down Expand Up @@ -187,15 +209,17 @@ func New(
}

client := &GoClient{
log: logger,
ctx: opt.Context,
network: opt.Network,
client: httpClient.(*eth2clienthttp.Service),
gasLimit: opt.GasLimit,
operatorDataStore: operatorDataStore,
registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{},
commonTimeout: commonTimeout,
longTimeout: longTimeout,
log: logger,
ctx: opt.Context,
network: opt.Network,
client: httpClient.(*eth2clienthttp.Service),
gasLimit: opt.GasLimit,
operatorDataStore: operatorDataStore,
registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{},
attestationDataCache: hashmap.New[phase0.Slot, *phase0.AttestationData](),
recentAttestationSlot: atomic.Uint64{}, // 0 is appropriate starting value
commonTimeout: commonTimeout,
longTimeout: longTimeout,
}

nodeVersionResp, err := client.client.NodeVersion(opt.Context, &api.NodeVersionOpts{})
Expand All @@ -216,6 +240,7 @@ func New(
)

go client.registrationSubmitter(slotTickerProvider)
go client.pruneStaleAttestationDataRunner()

return client, nil
}
Expand Down
77 changes: 0 additions & 77 deletions protocol/v2/blockchain/beacon/duty_data.go

This file was deleted.

6 changes: 3 additions & 3 deletions protocol/v2/ssv/runner/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ import (
"github.com/prysmaticlabs/go-bitfield"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/networkconfig"
"github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
"github.com/ssvlabs/ssv/protocol/v2/qbft/controller"
"github.com/ssvlabs/ssv/protocol/v2/ssv/runner/metrics"
ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -667,11 +666,12 @@ func (cr *CommitteeRunner) expectedPostConsensusRootsAndBeaconObjects(logger *za
func (cr *CommitteeRunner) executeDuty(logger *zap.Logger, duty spectypes.Duty) error {
start := time.Now()
slot := duty.DutySlot()
// We set committeeIndex to 0 for simplicity, there is no need to specify it exactly because
// all 64 Ethereum committees assigned to this slot will get the same data to attest for.
attData, _, err := cr.GetBeaconNode().GetAttestationData(slot, 0)
if err != nil {
return errors.Wrap(err, "failed to get attestation data")
}
//TODO committeeIndex is 0, is this correct?
logger = logger.With(
zap.Duration("attestation_data_time", time.Since(start)),
fields.Slot(slot),
Expand Down

0 comments on commit 04f99aa

Please sign in to comment.