-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: duty scheduler indices change & reorg bleeps #1725
base: stage
Are you sure you want to change the base?
Changes from 12 commits
adece06
a265022
33e76c8
caeeb87
276ab6d
2c15442
f52c444
c4694e8
00b3aa8
c7d6778
7edf712
6cee156
c142342
9fa010d
d337488
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,51 +21,27 @@ type AttesterHandler struct { | |
duties *dutystore.Duties[eth2apiv1.AttesterDuty] | ||
fetchCurrentEpoch bool | ||
fetchNextEpoch bool | ||
|
||
firstRun bool | ||
} | ||
|
||
func NewAttesterHandler(duties *dutystore.Duties[eth2apiv1.AttesterDuty]) *AttesterHandler { | ||
h := &AttesterHandler{ | ||
duties: duties, | ||
duties: duties, | ||
firstRun: true, | ||
} | ||
h.fetchCurrentEpoch = true | ||
return h | ||
} | ||
|
||
func (h *AttesterHandler) Name() string { | ||
return spectypes.BNRoleAttester.String() | ||
} | ||
|
||
// HandleDuties manages the duty lifecycle, handling different cases: | ||
// | ||
// On First Run: | ||
// 1. Fetch duties for the current epoch. | ||
// 2. If necessary, fetch duties for the next epoch. | ||
// 3. Execute duties. | ||
// | ||
// On Re-org: | ||
// | ||
// If the previous dependent root changed: | ||
// 1. Fetch duties for the current epoch. | ||
// 2. Execute duties. | ||
// If the current dependent root changed: | ||
// 1. Execute duties. | ||
// 2. If necessary, fetch duties for the next epoch. | ||
// | ||
// On Indices Change: | ||
// 1. Execute duties. | ||
// 2. ResetEpoch duties for the current epoch. | ||
// 3. Fetch duties for the current epoch. | ||
// 4. If necessary, fetch duties for the next epoch. | ||
// | ||
// On Ticker event: | ||
// 1. Execute duties. | ||
// 2. If necessary, fetch duties for the next epoch. | ||
// HandleDuties manages the duty lifecycle | ||
func (h *AttesterHandler) HandleDuties(ctx context.Context) { | ||
h.logger.Info("starting duty handler") | ||
defer h.logger.Info("duty handler exited") | ||
|
||
h.fetchNextEpoch = true | ||
|
||
next := h.ticker.Next() | ||
for { | ||
select { | ||
|
@@ -75,79 +51,52 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) { | |
case <-next: | ||
slot := h.ticker.Slot() | ||
next = h.ticker.Next() | ||
currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(slot) | ||
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1) | ||
h.logger.Debug("🛠 ticker event", zap.String("epoch_slot_pos", buildStr)) | ||
|
||
h.processExecution(currentEpoch, slot) | ||
if h.indicesChanged { | ||
h.duties.ResetEpoch(currentEpoch) | ||
h.indicesChanged = false | ||
} | ||
h.processFetching(ctx, currentEpoch, slot) | ||
|
||
slotsPerEpoch := h.network.Beacon.SlotsPerEpoch() | ||
epoch := h.network.Beacon.EstimatedEpochAtSlot(slot) | ||
tickerID := fields.FormatSlotTickerID(epoch, slot) | ||
h.logger.Debug("🛠 ticker event", fields.SlotTickerID(tickerID)) | ||
|
||
// If we have reached the mid-point of the epoch, fetch the duties for the next epoch in the next slot. | ||
// This allows us to set them up at a time when the beacon node should be less busy. | ||
if uint64(slot)%slotsPerEpoch == slotsPerEpoch/2-1 { | ||
h.fetchNextEpoch = true | ||
} | ||
|
||
// last slot of epoch | ||
if uint64(slot)%slotsPerEpoch == slotsPerEpoch-1 { | ||
h.duties.ResetEpoch(currentEpoch - 1) | ||
if !h.network.PastAlanForkAtEpoch(epoch) { | ||
if h.firstRun { | ||
h.processFirstRun(ctx, epoch, slot) | ||
} | ||
h.processExecution(epoch, slot) | ||
if h.indicesChanged { | ||
h.processIndicesChange(epoch, slot) | ||
} | ||
h.processFetching(ctx, epoch, slot) | ||
h.processSlotTransition(epoch, slot) | ||
} | ||
|
||
case reorgEvent := <-h.reorg: | ||
currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(reorgEvent.Slot) | ||
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, reorgEvent.Slot, reorgEvent.Slot%32+1) | ||
h.logger.Info("🔀 reorg event received", zap.String("epoch_slot_pos", buildStr), zap.Any("event", reorgEvent)) | ||
|
||
// reset current epoch duties | ||
if reorgEvent.Previous { | ||
h.duties.ResetEpoch(currentEpoch) | ||
h.fetchCurrentEpoch = true | ||
if h.shouldFetchNexEpoch(reorgEvent.Slot) { | ||
h.duties.ResetEpoch(currentEpoch + 1) | ||
h.fetchNextEpoch = true | ||
} | ||
epoch := h.network.Beacon.EstimatedEpochAtSlot(reorgEvent.Slot) | ||
tickerID := fields.FormatSlotTickerID(epoch, reorgEvent.Slot) | ||
h.logger.Info("🔀 reorg event received", fields.SlotTickerID(tickerID), zap.Any("event", reorgEvent)) | ||
|
||
h.processFetching(ctx, currentEpoch, reorgEvent.Slot) | ||
} else if reorgEvent.Current { | ||
// reset & re-fetch next epoch duties if in appropriate slot range, | ||
// otherwise they will be fetched by the appropriate slot tick. | ||
if h.shouldFetchNexEpoch(reorgEvent.Slot) { | ||
h.duties.ResetEpoch(currentEpoch + 1) | ||
h.fetchNextEpoch = true | ||
} | ||
if !h.network.PastAlanForkAtEpoch(epoch) { | ||
h.processReorg(ctx, epoch, reorgEvent) | ||
} | ||
|
||
case <-h.indicesChange: | ||
slot := h.network.Beacon.EstimatedCurrentSlot() | ||
currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(slot) | ||
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1) | ||
h.logger.Info("🔁 indices change received", zap.String("epoch_slot_pos", buildStr)) | ||
|
||
h.indicesChanged = true | ||
h.fetchCurrentEpoch = true | ||
epoch := h.network.Beacon.EstimatedEpochAtSlot(slot) | ||
tickerID := fields.FormatSlotTickerID(epoch, slot) | ||
h.logger.Info("🔁 indices change received", fields.SlotTickerID(tickerID)) | ||
|
||
// reset next epoch duties if in appropriate slot range | ||
if h.shouldFetchNexEpoch(slot) { | ||
h.duties.ResetEpoch(currentEpoch + 1) | ||
h.fetchNextEpoch = true | ||
if !h.network.PastAlanForkAtEpoch(epoch) { | ||
h.indicesChanged = true | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (h *AttesterHandler) HandleInitialDuties(ctx context.Context) { | ||
ctx, cancel := context.WithTimeout(ctx, h.network.Beacon.SlotDurationSec()/2) | ||
defer cancel() | ||
|
||
slot := h.network.Beacon.EstimatedCurrentSlot() | ||
epoch := h.network.Beacon.EstimatedEpochAtSlot(slot) | ||
func (h *AttesterHandler) processFirstRun(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) { | ||
h.fetchCurrentEpoch = true | ||
h.processFetching(ctx, epoch, slot) | ||
|
||
if uint64(slot)%h.network.Beacon.SlotsPerEpoch() > h.network.Beacon.SlotsPerEpoch()/2-1 { | ||
h.fetchNextEpoch = true | ||
} | ||
h.firstRun = false | ||
} | ||
|
||
func (h *AttesterHandler) processFetching(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) { | ||
|
@@ -162,7 +111,7 @@ func (h *AttesterHandler) processFetching(ctx context.Context, epoch phase0.Epoc | |
h.fetchCurrentEpoch = false | ||
} | ||
|
||
if h.fetchNextEpoch && h.shouldFetchNexEpoch(slot) { | ||
if h.fetchNextEpoch { | ||
if err := h.fetchAndProcessDuties(ctx, epoch+1); err != nil { | ||
h.logger.Error("failed to fetch duties for next epoch", zap.Error(err)) | ||
return | ||
|
@@ -177,27 +126,64 @@ func (h *AttesterHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot) | |
return | ||
} | ||
|
||
if !h.network.PastAlanForkAtEpoch(h.network.Beacon.EstimatedEpochAtSlot(slot)) { | ||
toExecute := make([]*genesisspectypes.Duty, 0, len(duties)*2) | ||
for _, d := range duties { | ||
if h.shouldExecute(d) { | ||
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAttester)) | ||
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAggregator)) | ||
} | ||
toExecute := make([]*genesisspectypes.Duty, 0, len(duties)*2) | ||
for _, d := range duties { | ||
if h.shouldExecute(d) { | ||
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAttester)) | ||
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAggregator)) | ||
} | ||
} | ||
|
||
h.dutiesExecutor.ExecuteGenesisDuties(h.logger, toExecute) | ||
return | ||
h.dutiesExecutor.ExecuteGenesisDuties(h.logger, toExecute) | ||
} | ||
|
||
func (h *AttesterHandler) processIndicesChange(epoch phase0.Epoch, slot phase0.Slot) { | ||
h.duties.Reset(epoch) | ||
h.fetchCurrentEpoch = true | ||
|
||
// reset next epoch duties if in appropriate slot range | ||
if h.shouldFetchNexEpoch(slot) { | ||
h.duties.Reset(epoch + 1) | ||
h.fetchNextEpoch = true | ||
} | ||
|
||
toExecute := make([]*spectypes.ValidatorDuty, 0, len(duties)) | ||
for _, d := range duties { | ||
if h.shouldExecute(d) { | ||
toExecute = append(toExecute, h.toSpecDuty(d, spectypes.BNRoleAggregator)) | ||
h.indicesChanged = false | ||
} | ||
|
||
func (h *AttesterHandler) processReorg(ctx context.Context, epoch phase0.Epoch, reorgEvent ReorgEvent) { | ||
// reset current epoch duties | ||
if reorgEvent.Previous { | ||
h.duties.Reset(epoch) | ||
h.fetchCurrentEpoch = true | ||
if h.shouldFetchNexEpoch(reorgEvent.Slot) { | ||
h.duties.Reset(epoch + 1) | ||
h.fetchNextEpoch = true | ||
} | ||
|
||
h.processFetching(ctx, epoch, reorgEvent.Slot) | ||
} else if reorgEvent.Current { | ||
// reset & re-fetch next epoch duties if in appropriate slot range, | ||
// otherwise they will be fetched by the appropriate slot tick. | ||
if h.shouldFetchNexEpoch(reorgEvent.Slot) { | ||
h.duties.Reset(epoch + 1) | ||
h.fetchNextEpoch = true | ||
} | ||
} | ||
} | ||
|
||
h.dutiesExecutor.ExecuteDuties(h.logger, toExecute) | ||
func (h *AttesterHandler) processSlotTransition(epoch phase0.Epoch, slot phase0.Slot) { | ||
slotsPerEpoch := h.network.Beacon.SlotsPerEpoch() | ||
|
||
// If we have reached the mid-point of the epoch, fetch the duties for the next epoch in the next slot. | ||
// This allows us to set them up at a time when the beacon node should be less busy. | ||
if uint64(slot)%slotsPerEpoch == slotsPerEpoch/2-1 { | ||
h.fetchNextEpoch = true | ||
} | ||
Comment on lines
+176
to
+180
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can use |
||
|
||
// last slot of epoch | ||
if uint64(slot)%slotsPerEpoch == slotsPerEpoch-1 { | ||
h.duties.Reset(epoch - 1) | ||
} | ||
Comment on lines
+182
to
+185
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to have helper |
||
} | ||
|
||
func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, epoch phase0.Epoch) error { | ||
|
@@ -309,5 +295,5 @@ func toBeaconCommitteeSubscription(duty *eth2apiv1.AttesterDuty, role spectypes. | |
} | ||
|
||
func (h *AttesterHandler) shouldFetchNexEpoch(slot phase0.Slot) bool { | ||
return uint64(slot)%h.network.Beacon.SlotsPerEpoch() > h.network.Beacon.SlotsPerEpoch()/2-2 | ||
return uint64(slot)%h.network.Beacon.SlotsPerEpoch() >= h.network.Beacon.SlotsPerEpoch()/2-1 | ||
} | ||
Comment on lines
303
to
305
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to have this clarifying comment (for the reason for why we are doing this) here:
also maybe fix typo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use
if h.shouldFetchNexEpoch(...
here ? Although it uses>=
(would be nice to clarify which is the correct one).