diff --git a/message/validation/genesis/validation_test.go b/message/validation/genesis/validation_test.go index 3d4d10d511..82585fbe02 100644 --- a/message/validation/genesis/validation_test.go +++ b/message/validation/genesis/validation_test.go @@ -850,7 +850,9 @@ func Test_ValidateSSVMessage(t *testing.T) { height := specqbft.Height(slot) dutyStore := dutystore.New() - dutyStore.Proposer.Add(epoch, slot, validatorIndex+1, ð2apiv1.ProposerDuty{}, true) + dutyStore.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + {Slot: slot, ValidatorIndex: validatorIndex + 1, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + }) validator := New(netCfg, WithNodeStorage(ns), WithDutyStore(dutyStore)).(*messageValidator) validSignedMessage := spectestingutils.TestingProposalMessageWithHeight(ks.Shares[1], 1, height) @@ -872,7 +874,9 @@ func Test_ValidateSSVMessage(t *testing.T) { require.ErrorContains(t, err, ErrNoDuty.Error()) dutyStore = dutystore.New() - dutyStore.Proposer.Add(epoch, slot, validatorIndex, ð2apiv1.ProposerDuty{}, true) + dutyStore.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + {Slot: slot, ValidatorIndex: validatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + }) validator = New(netCfg, WithNodeStorage(ns), WithDutyStore(dutyStore)).(*messageValidator) timeToWait, err = validator.waitAfterSlotStart(spectypes.BNRoleProposer) require.NoError(t, err) diff --git a/message/validation/validation_test.go b/message/validation/validation_test.go index 7d084d43ce..9935bf3a4a 100644 --- a/message/validation/validation_test.go +++ b/message/validation/validation_test.go @@ -562,9 +562,11 @@ func Test_ValidateSSVMessage(t *testing.T) { epoch := phase0.Epoch(1) slot := netCfg.Beacon.FirstSlotAtEpoch(epoch) - dutyStore.Proposer.Add(epoch, slot, shares.active.ValidatorIndex, ð2apiv1.ProposerDuty{}, true) - dutyStore.Proposer.Add(epoch, slot+4, shares.active.ValidatorIndex, ð2apiv1.ProposerDuty{}, true) - dutyStore.Proposer.Add(epoch, slot+8, shares.active.ValidatorIndex, ð2apiv1.ProposerDuty{}, true) + dutyStore.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + {Slot: slot, ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + {Slot: slot + 4, ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + {Slot: slot + 8, ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + }) role := spectypes.RoleAggregator identifier := spectypes.NewMsgID(netCfg.DomainType(), ks.ValidatorPK.Serialize(), role) @@ -589,7 +591,9 @@ func Test_ValidateSSVMessage(t *testing.T) { slot := netCfg.Beacon.FirstSlotAtEpoch(epoch) ds := dutystore.New() - ds.Proposer.Add(epoch, slot, shares.active.ValidatorIndex+1, ð2apiv1.ProposerDuty{}, true) + ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + {Slot: slot, ValidatorIndex: shares.active.ValidatorIndex + 1, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + }) validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) identifier := spectypes.NewMsgID(netCfg.DomainType(), ks.ValidatorPK.Serialize(), spectypes.RoleProposer) @@ -600,7 +604,9 @@ func Test_ValidateSSVMessage(t *testing.T) { require.ErrorContains(t, err, ErrNoDuty.Error()) ds = dutystore.New() - ds.Proposer.Add(epoch, slot, shares.active.ValidatorIndex, ð2apiv1.ProposerDuty{}, true) + ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + {Slot: slot, ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + }) validator = New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) _, err = validator.handleSignedSSVMessage(signedSSVMessage, topicID, netCfg.Beacon.GetSlotStartTime(slot)) require.NoError(t, err) @@ -728,8 +734,12 @@ func Test_ValidateSSVMessage(t *testing.T) { subtestName := fmt.Sprintf("%v/%v", message.RunnerRoleToString(role), message.PartialMsgTypeToString(msgType)) t.Run(subtestName, func(t *testing.T) { ds := dutystore.New() - ds.Proposer.Add(spectestingutils.TestingDutyEpoch, spectestingutils.TestingDutySlot, shares.active.ValidatorIndex, ð2apiv1.ProposerDuty{}, true) - ds.SyncCommittee.Add(0, shares.active.ValidatorIndex, ð2apiv1.SyncCommitteeDuty{}, true) + ds.Proposer.Set(spectestingutils.TestingDutyEpoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + {Slot: spectestingutils.TestingDutySlot, ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + }) + ds.SyncCommittee.Set(0, []dutystore.StoreSyncCommitteeDuty{ + {ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.SyncCommitteeDuty{}, InCommittee: true}, + }) ds.VoluntaryExit.AddDuty(spectestingutils.TestingDutySlot, phase0.BLSPubKey(shares.active.ValidatorPubKey)) validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) @@ -805,8 +815,12 @@ func Test_ValidateSSVMessage(t *testing.T) { subtestName := fmt.Sprintf("%v/%v", message.RunnerRoleToString(role), message.PartialMsgTypeToString(msgType)) t.Run(subtestName, func(t *testing.T) { ds := dutystore.New() - ds.Proposer.Add(spectestingutils.TestingDutyEpoch, spectestingutils.TestingDutySlot, shares.active.ValidatorIndex, ð2apiv1.ProposerDuty{}, true) - ds.SyncCommittee.Add(0, shares.active.ValidatorIndex, ð2apiv1.SyncCommitteeDuty{}, true) + ds.Proposer.Set(spectestingutils.TestingDutyEpoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + {Slot: spectestingutils.TestingDutySlot, ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + }) + ds.SyncCommittee.Set(0, []dutystore.StoreSyncCommitteeDuty{ + {ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.SyncCommitteeDuty{}, InCommittee: true}, + }) validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) @@ -1001,8 +1015,12 @@ func Test_ValidateSSVMessage(t *testing.T) { slot := netCfg.Beacon.FirstSlotAtEpoch(epoch) ds := dutystore.New() - ds.Proposer.Add(epoch, slot, shares.active.ValidatorIndex, ð2apiv1.ProposerDuty{}, true) - ds.SyncCommittee.Add(0, shares.active.ValidatorIndex, ð2apiv1.SyncCommitteeDuty{}, true) + ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + {Slot: slot, ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + }) + ds.SyncCommittee.Set(0, []dutystore.StoreSyncCommitteeDuty{ + {ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.SyncCommitteeDuty{}, InCommittee: true}, + }) validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) @@ -1322,8 +1340,12 @@ func Test_ValidateSSVMessage(t *testing.T) { slot := netCfg.Beacon.FirstSlotAtEpoch(epoch) ds := dutystore.New() - ds.Proposer.Add(epoch, slot, shares.active.ValidatorIndex, ð2apiv1.ProposerDuty{}, true) - ds.SyncCommittee.Add(0, shares.active.ValidatorIndex, ð2apiv1.SyncCommitteeDuty{}, true) + ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + {Slot: slot, ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, + }) + ds.SyncCommittee.Set(0, []dutystore.StoreSyncCommitteeDuty{ + {ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.SyncCommitteeDuty{}, InCommittee: true}, + }) validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) diff --git a/operator/duties/attester.go b/operator/duties/attester.go index cc3971f9ea..396ba70cd2 100644 --- a/operator/duties/attester.go +++ b/operator/duties/attester.go @@ -80,10 +80,6 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) { h.logger.Debug("🛠 ticker event", zap.String("epoch_slot_pos", buildStr)) h.processExecution(currentEpoch, slot) - if h.indicesChanged { - h.duties.ResetEpoch(currentEpoch) - h.indicesChanged = false - } h.processFetching(ctx, currentEpoch, slot) slotsPerEpoch := h.network.Beacon.SlotsPerEpoch() @@ -129,7 +125,6 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) { buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1) h.logger.Info("🔁 indices change received", zap.String("epoch_slot_pos", buildStr)) - h.indicesChanged = true h.fetchCurrentEpoch = true // reset next epoch duties if in appropriate slot range @@ -215,10 +210,17 @@ func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, epoch phase } specDuties := make([]*spectypes.ValidatorDuty, 0, len(duties)) + storeDuties := make([]dutystore.StoreDuty[eth2apiv1.AttesterDuty], 0, len(duties)) for _, d := range duties { - h.duties.Add(epoch, d.Slot, d.ValidatorIndex, d, true) + storeDuties = append(storeDuties, dutystore.StoreDuty[eth2apiv1.AttesterDuty]{ + Slot: d.Slot, + ValidatorIndex: d.ValidatorIndex, + Duty: d, + InCommittee: true, + }) specDuties = append(specDuties, h.toSpecDuty(d, spectypes.BNRoleAttester)) } + h.duties.Set(epoch, storeDuties) h.logger.Debug("🗂 got duties", fields.Count(len(duties)), diff --git a/operator/duties/dutystore/duties.go b/operator/duties/dutystore/duties.go index 50fd0d7e22..175d230ca0 100644 --- a/operator/duties/dutystore/duties.go +++ b/operator/duties/dutystore/duties.go @@ -8,22 +8,24 @@ import ( ) type Duty interface { - eth2apiv1.AttesterDuty | eth2apiv1.ProposerDuty | eth2apiv1.SyncCommitteeDuty + eth2apiv1.AttesterDuty | eth2apiv1.ProposerDuty } -type dutyDescriptor[D Duty] struct { - duty *D - inCommittee bool +type StoreDuty[D Duty] struct { + Slot phase0.Slot + ValidatorIndex phase0.ValidatorIndex + Duty *D + InCommittee bool } type Duties[D Duty] struct { mu sync.RWMutex - m map[phase0.Epoch]map[phase0.Slot]map[phase0.ValidatorIndex]dutyDescriptor[D] + m map[phase0.Epoch]map[phase0.Slot]map[phase0.ValidatorIndex]StoreDuty[D] } func NewDuties[D Duty]() *Duties[D] { return &Duties[D]{ - m: make(map[phase0.Epoch]map[phase0.Slot]map[phase0.ValidatorIndex]dutyDescriptor[D]), + m: make(map[phase0.Epoch]map[phase0.Slot]map[phase0.ValidatorIndex]StoreDuty[D]), } } @@ -43,8 +45,8 @@ func (d *Duties[D]) CommitteeSlotDuties(epoch phase0.Epoch, slot phase0.Slot) [] var duties []*D for _, descriptor := range descriptorMap { - if descriptor.inCommittee { - duties = append(duties, descriptor.duty) + if descriptor.InCommittee { + duties = append(duties, descriptor.Duty) } } @@ -70,23 +72,22 @@ func (d *Duties[D]) ValidatorDuty(epoch phase0.Epoch, slot phase0.Slot, validato return nil } - return descriptor.duty + return descriptor.Duty } -func (d *Duties[D]) Add(epoch phase0.Epoch, slot phase0.Slot, validatorIndex phase0.ValidatorIndex, duty *D, inCommittee bool) { +func (d *Duties[D]) Set(epoch phase0.Epoch, duties []StoreDuty[D]) { + mapped := make(map[phase0.Slot]map[phase0.ValidatorIndex]StoreDuty[D]) + for _, duty := range duties { + if _, ok := mapped[duty.Slot]; !ok { + mapped[duty.Slot] = make(map[phase0.ValidatorIndex]StoreDuty[D]) + } + mapped[duty.Slot][duty.ValidatorIndex] = duty + } + d.mu.Lock() defer d.mu.Unlock() - if _, ok := d.m[epoch]; !ok { - d.m[epoch] = make(map[phase0.Slot]map[phase0.ValidatorIndex]dutyDescriptor[D]) - } - if _, ok := d.m[epoch][slot]; !ok { - d.m[epoch][slot] = make(map[phase0.ValidatorIndex]dutyDescriptor[D]) - } - d.m[epoch][slot][validatorIndex] = dutyDescriptor[D]{ - duty: duty, - inCommittee: inCommittee, - } + d.m[epoch] = mapped } func (d *Duties[D]) ResetEpoch(epoch phase0.Epoch) { diff --git a/operator/duties/dutystore/sync_committee.go b/operator/duties/dutystore/sync_committee.go index 0ae13041c7..c6a28c999e 100644 --- a/operator/duties/dutystore/sync_committee.go +++ b/operator/duties/dutystore/sync_committee.go @@ -7,14 +7,20 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" ) +type StoreSyncCommitteeDuty struct { + ValidatorIndex phase0.ValidatorIndex + Duty *eth2apiv1.SyncCommitteeDuty + InCommittee bool +} + type SyncCommitteeDuties struct { mu sync.RWMutex - m map[uint64]map[phase0.ValidatorIndex]dutyDescriptor[eth2apiv1.SyncCommitteeDuty] + m map[uint64]map[phase0.ValidatorIndex]StoreSyncCommitteeDuty } func NewSyncCommitteeDuties() *SyncCommitteeDuties { return &SyncCommitteeDuties{ - m: make(map[uint64]map[phase0.ValidatorIndex]dutyDescriptor[eth2apiv1.SyncCommitteeDuty]), + m: make(map[uint64]map[phase0.ValidatorIndex]StoreSyncCommitteeDuty), } } @@ -29,8 +35,8 @@ func (d *SyncCommitteeDuties) CommitteePeriodDuties(period uint64) []*eth2apiv1. var duties []*eth2apiv1.SyncCommitteeDuty for _, descriptor := range descriptorMap { - if descriptor.inCommittee { - duties = append(duties, descriptor.duty) + if descriptor.InCommittee { + duties = append(duties, descriptor.Duty) } } @@ -51,21 +57,19 @@ func (d *SyncCommitteeDuties) Duty(period uint64, validatorIndex phase0.Validato return nil } - return descriptor.duty + return descriptor.Duty } -func (d *SyncCommitteeDuties) Add(period uint64, validatorIndex phase0.ValidatorIndex, duty *eth2apiv1.SyncCommitteeDuty, inCommittee bool) { +func (d *SyncCommitteeDuties) Set(period uint64, duties []StoreSyncCommitteeDuty) { + mapped := make(map[phase0.ValidatorIndex]StoreSyncCommitteeDuty) + for _, duty := range duties { + mapped[duty.ValidatorIndex] = duty + } + d.mu.Lock() defer d.mu.Unlock() - if _, ok := d.m[period]; !ok { - d.m[period] = make(map[phase0.ValidatorIndex]dutyDescriptor[eth2apiv1.SyncCommitteeDuty]) - } - - d.m[period][validatorIndex] = dutyDescriptor[eth2apiv1.SyncCommitteeDuty]{ - duty: duty, - inCommittee: inCommittee, - } + d.m[period] = mapped } func (d *SyncCommitteeDuties) Reset(period uint64) { diff --git a/operator/duties/proposer.go b/operator/duties/proposer.go index 8056737a3c..dfeb2cfea4 100644 --- a/operator/duties/proposer.go +++ b/operator/duties/proposer.go @@ -181,11 +181,18 @@ func (h *ProposerHandler) fetchAndProcessDuties(ctx context.Context, epoch phase h.duties.ResetEpoch(epoch) specDuties := make([]*spectypes.ValidatorDuty, 0, len(duties)) + storeDuties := make([]dutystore.StoreDuty[eth2apiv1.ProposerDuty], 0, len(duties)) for _, d := range duties { _, inCommitteeDuty := selfIndicesSet[d.ValidatorIndex] - h.duties.Add(epoch, d.Slot, d.ValidatorIndex, d, inCommitteeDuty) + storeDuties = append(storeDuties, dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ + Slot: d.Slot, + ValidatorIndex: d.ValidatorIndex, + Duty: d, + InCommittee: inCommitteeDuty, + }) specDuties = append(specDuties, h.toSpecDuty(d, spectypes.BNRoleProposer)) } + h.duties.Set(epoch, storeDuties) h.logger.Debug("📚 got duties", fields.Count(len(duties)), diff --git a/operator/duties/sync_committee.go b/operator/duties/sync_committee.go index a596106fb6..13f2776e16 100644 --- a/operator/duties/sync_committee.go +++ b/operator/duties/sync_committee.go @@ -218,11 +218,16 @@ func (h *SyncCommitteeHandler) fetchAndProcessDuties(ctx context.Context, period return fmt.Errorf("failed to fetch sync committee duties: %w", err) } - h.duties.Reset(period) + storeDuties := make([]dutystore.StoreSyncCommitteeDuty, 0, len(duties)) for _, d := range duties { _, inCommitteeDuty := inCommitteeIndicesSet[d.ValidatorIndex] - h.duties.Add(period, d.ValidatorIndex, d, inCommitteeDuty) + storeDuties = append(storeDuties, dutystore.StoreSyncCommitteeDuty{ + ValidatorIndex: d.ValidatorIndex, + Duty: d, + InCommittee: inCommitteeDuty, + }) } + h.duties.Set(period, storeDuties) h.prepareDutiesResultLog(period, duties, start)