Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: duty scheduler indices change & reorg bleeps #1725

Open
wants to merge 15 commits into
base: stage
Choose a base branch
from
15 changes: 14 additions & 1 deletion logging/fields/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,17 @@ const (
FieldRole = "role"
FieldRound = "round"
FieldSlot = "slot"
FieldSlotTickerID = "slot_ticker_id"
FieldStartTimeUnixMilli = "start_time_unix_milli"
FieldSubmissionTime = "submission_time"
FieldTotalConsensusTime = "total_consensus_time"
FieldSubnets = "subnets"
FieldSyncOffset = "sync_offset"
FieldSyncResults = "sync_results"
FieldTargetNodeENR = "target_node_enr"
FieldToBlock = "to_block"
FieldTook = "took"
FieldTopic = "topic"
FieldTotalConsensusTime = "total_consensus_time"
FieldTxHash = "tx_hash"
FieldType = "type"
FieldUpdatedENRLocalNode = "updated_enr"
Expand Down Expand Up @@ -408,3 +409,15 @@ func Type(v any) zapcore.Field {
func FormatDuration(val time.Duration) string {
return strconv.FormatFloat(val.Seconds(), 'f', 5, 64)
}

func FormatSlotTickerID(epoch phase0.Epoch, slot phase0.Slot) string {
return fmt.Sprintf("e%v-s%v-#%v", epoch, slot, slot%32+1)
}

func FormatSlotTickerCommitteeID(period uint64, epoch phase0.Epoch, slot phase0.Slot) string {
return fmt.Sprintf("p%v-%s", period, FormatSlotTickerID(epoch, slot))
}

func SlotTickerID(val string) zap.Field {
return zap.String(FieldSlotTickerID, val)
}
190 changes: 88 additions & 102 deletions operator/duties/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,51 +21,27 @@ type AttesterHandler struct {
duties *dutystore.Duties[eth2apiv1.AttesterDuty]
fetchCurrentEpoch bool
fetchNextEpoch bool

firstRun bool
}

func NewAttesterHandler(duties *dutystore.Duties[eth2apiv1.AttesterDuty]) *AttesterHandler {
h := &AttesterHandler{
duties: duties,
duties: duties,
firstRun: true,
}
h.fetchCurrentEpoch = true
return h
}

func (h *AttesterHandler) Name() string {
return spectypes.BNRoleAttester.String()
}

// HandleDuties manages the duty lifecycle, handling different cases:
//
// On First Run:
// 1. Fetch duties for the current epoch.
// 2. If necessary, fetch duties for the next epoch.
// 3. Execute duties.
//
// On Re-org:
//
// If the previous dependent root changed:
// 1. Fetch duties for the current epoch.
// 2. Execute duties.
// If the current dependent root changed:
// 1. Execute duties.
// 2. If necessary, fetch duties for the next epoch.
//
// On Indices Change:
// 1. Execute duties.
// 2. ResetEpoch duties for the current epoch.
// 3. Fetch duties for the current epoch.
// 4. If necessary, fetch duties for the next epoch.
//
// On Ticker event:
// 1. Execute duties.
// 2. If necessary, fetch duties for the next epoch.
// HandleDuties manages the duty lifecycle
func (h *AttesterHandler) HandleDuties(ctx context.Context) {
h.logger.Info("starting duty handler")
defer h.logger.Info("duty handler exited")

h.fetchNextEpoch = true

next := h.ticker.Next()
for {
select {
Expand All @@ -75,79 +51,52 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) {
case <-next:
slot := h.ticker.Slot()
next = h.ticker.Next()
currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(slot)
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1)
h.logger.Debug("🛠 ticker event", zap.String("epoch_slot_pos", buildStr))

h.processExecution(currentEpoch, slot)
if h.indicesChanged {
h.duties.ResetEpoch(currentEpoch)
h.indicesChanged = false
}
h.processFetching(ctx, currentEpoch, slot)

slotsPerEpoch := h.network.Beacon.SlotsPerEpoch()
epoch := h.network.Beacon.EstimatedEpochAtSlot(slot)
tickerID := fields.FormatSlotTickerID(epoch, slot)
h.logger.Debug("🛠 ticker event", fields.SlotTickerID(tickerID))

// If we have reached the mid-point of the epoch, fetch the duties for the next epoch in the next slot.
// This allows us to set them up at a time when the beacon node should be less busy.
if uint64(slot)%slotsPerEpoch == slotsPerEpoch/2-1 {
h.fetchNextEpoch = true
}

// last slot of epoch
if uint64(slot)%slotsPerEpoch == slotsPerEpoch-1 {
h.duties.ResetEpoch(currentEpoch - 1)
if !h.network.PastAlanForkAtEpoch(epoch) {
if h.firstRun {
h.processFirstRun(ctx, epoch, slot)
}
h.processExecution(epoch, slot)
if h.indicesChanged {
h.processIndicesChange(epoch, slot)
}
h.processFetching(ctx, epoch, slot)
h.processSlotTransition(epoch, slot)
}

case reorgEvent := <-h.reorg:
currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(reorgEvent.Slot)
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, reorgEvent.Slot, reorgEvent.Slot%32+1)
h.logger.Info("🔀 reorg event received", zap.String("epoch_slot_pos", buildStr), zap.Any("event", reorgEvent))

// reset current epoch duties
if reorgEvent.Previous {
h.duties.ResetEpoch(currentEpoch)
h.fetchCurrentEpoch = true
if h.shouldFetchNexEpoch(reorgEvent.Slot) {
h.duties.ResetEpoch(currentEpoch + 1)
h.fetchNextEpoch = true
}
epoch := h.network.Beacon.EstimatedEpochAtSlot(reorgEvent.Slot)
tickerID := fields.FormatSlotTickerID(epoch, reorgEvent.Slot)
h.logger.Info("🔀 reorg event received", fields.SlotTickerID(tickerID), zap.Any("event", reorgEvent))

h.processFetching(ctx, currentEpoch, reorgEvent.Slot)
} else if reorgEvent.Current {
// reset & re-fetch next epoch duties if in appropriate slot range,
// otherwise they will be fetched by the appropriate slot tick.
if h.shouldFetchNexEpoch(reorgEvent.Slot) {
h.duties.ResetEpoch(currentEpoch + 1)
h.fetchNextEpoch = true
}
if !h.network.PastAlanForkAtEpoch(epoch) {
h.processReorg(ctx, epoch, reorgEvent)
}

case <-h.indicesChange:
slot := h.network.Beacon.EstimatedCurrentSlot()
currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(slot)
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1)
h.logger.Info("🔁 indices change received", zap.String("epoch_slot_pos", buildStr))

h.indicesChanged = true
h.fetchCurrentEpoch = true
epoch := h.network.Beacon.EstimatedEpochAtSlot(slot)
tickerID := fields.FormatSlotTickerID(epoch, slot)
h.logger.Info("🔁 indices change received", fields.SlotTickerID(tickerID))

// reset next epoch duties if in appropriate slot range
if h.shouldFetchNexEpoch(slot) {
h.duties.ResetEpoch(currentEpoch + 1)
h.fetchNextEpoch = true
if !h.network.PastAlanForkAtEpoch(epoch) {
h.indicesChanged = true
}
}
}
}

func (h *AttesterHandler) HandleInitialDuties(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, h.network.Beacon.SlotDurationSec()/2)
defer cancel()

slot := h.network.Beacon.EstimatedCurrentSlot()
epoch := h.network.Beacon.EstimatedEpochAtSlot(slot)
func (h *AttesterHandler) processFirstRun(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) {
h.fetchCurrentEpoch = true
h.processFetching(ctx, epoch, slot)

if uint64(slot)%h.network.Beacon.SlotsPerEpoch() > h.network.Beacon.SlotsPerEpoch()/2-1 {
h.fetchNextEpoch = true
}
Comment on lines +96 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use if h.shouldFetchNexEpoch(... here ? Although it uses >= (would be nice to clarify which is the correct one).

h.firstRun = false
}

func (h *AttesterHandler) processFetching(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) {
Expand All @@ -162,7 +111,7 @@ func (h *AttesterHandler) processFetching(ctx context.Context, epoch phase0.Epoc
h.fetchCurrentEpoch = false
}

if h.fetchNextEpoch && h.shouldFetchNexEpoch(slot) {
if h.fetchNextEpoch {
if err := h.fetchAndProcessDuties(ctx, epoch+1); err != nil {
h.logger.Error("failed to fetch duties for next epoch", zap.Error(err))
return
Expand All @@ -177,27 +126,64 @@ func (h *AttesterHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot)
return
}

if !h.network.PastAlanForkAtEpoch(h.network.Beacon.EstimatedEpochAtSlot(slot)) {
toExecute := make([]*genesisspectypes.Duty, 0, len(duties)*2)
for _, d := range duties {
if h.shouldExecute(d) {
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAttester))
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAggregator))
}
toExecute := make([]*genesisspectypes.Duty, 0, len(duties)*2)
for _, d := range duties {
if h.shouldExecute(d) {
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAttester))
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAggregator))
}
}

h.dutiesExecutor.ExecuteGenesisDuties(h.logger, toExecute)
return
h.dutiesExecutor.ExecuteGenesisDuties(h.logger, toExecute)
}

func (h *AttesterHandler) processIndicesChange(epoch phase0.Epoch, slot phase0.Slot) {
h.duties.Reset(epoch)
h.fetchCurrentEpoch = true

// reset next epoch duties if in appropriate slot range
if h.shouldFetchNexEpoch(slot) {
h.duties.Reset(epoch + 1)
h.fetchNextEpoch = true
}

toExecute := make([]*spectypes.ValidatorDuty, 0, len(duties))
for _, d := range duties {
if h.shouldExecute(d) {
toExecute = append(toExecute, h.toSpecDuty(d, spectypes.BNRoleAggregator))
h.indicesChanged = false
}

func (h *AttesterHandler) processReorg(ctx context.Context, epoch phase0.Epoch, reorgEvent ReorgEvent) {
// reset current epoch duties
if reorgEvent.Previous {
h.duties.Reset(epoch)
h.fetchCurrentEpoch = true
if h.shouldFetchNexEpoch(reorgEvent.Slot) {
h.duties.Reset(epoch + 1)
h.fetchNextEpoch = true
}

h.processFetching(ctx, epoch, reorgEvent.Slot)
} else if reorgEvent.Current {
// reset & re-fetch next epoch duties if in appropriate slot range,
// otherwise they will be fetched by the appropriate slot tick.
if h.shouldFetchNexEpoch(reorgEvent.Slot) {
h.duties.Reset(epoch + 1)
h.fetchNextEpoch = true
}
}
}

h.dutiesExecutor.ExecuteDuties(h.logger, toExecute)
func (h *AttesterHandler) processSlotTransition(epoch phase0.Epoch, slot phase0.Slot) {
slotsPerEpoch := h.network.Beacon.SlotsPerEpoch()

// If we have reached the mid-point of the epoch, fetch the duties for the next epoch in the next slot.
// This allows us to set them up at a time when the beacon node should be less busy.
if uint64(slot)%slotsPerEpoch == slotsPerEpoch/2-1 {
h.fetchNextEpoch = true
}
Comment on lines +176 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use if h.shouldFetchNexEpoch(... here ? I guess applies to other Duties changed in this PR too.


// last slot of epoch
if uint64(slot)%slotsPerEpoch == slotsPerEpoch-1 {
h.duties.Reset(epoch - 1)
}
Comment on lines +182 to +185
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have helper func similar to shouldFetchNexEpoch for uint64(slot)%slotsPerEpoch == slotsPerEpoch-1.

}

func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, epoch phase0.Epoch) error {
Expand Down Expand Up @@ -309,5 +295,5 @@ func toBeaconCommitteeSubscription(duty *eth2apiv1.AttesterDuty, role spectypes.
}

func (h *AttesterHandler) shouldFetchNexEpoch(slot phase0.Slot) bool {
return uint64(slot)%h.network.Beacon.SlotsPerEpoch() > h.network.Beacon.SlotsPerEpoch()/2-2
return uint64(slot)%h.network.Beacon.SlotsPerEpoch() >= h.network.Beacon.SlotsPerEpoch()/2-1
}
Comment on lines 303 to 305
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have this clarifying comment (for the reason for why we are doing this) here:

			// If we have reached the mid-point of the epoch, fetch the duties for the next epoch in the next slot.
			// This allows us to set them up at a time when the beacon node should be less busy.

also maybe fix typo shouldFetchNexEpoch -> shouldFetchNextEpoch

Loading
Loading