Skip to content

Commit

Permalink
fix: (DutyScheduler) duties reset race condition (#1741)
Browse files Browse the repository at this point in the history
* fix: (DutyScheduler) duties reset race condition

* refactors

* fix missing lock

* fix test

* optimization
  • Loading branch information
moshe-blox authored Sep 17, 2024
1 parent 011c988 commit ce465c8
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 58 deletions.
8 changes: 6 additions & 2 deletions message/validation/genesis/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,9 @@ func Test_ValidateSSVMessage(t *testing.T) {
height := specqbft.Height(slot)

dutyStore := dutystore.New()
dutyStore.Proposer.Add(epoch, slot, validatorIndex+1, &eth2apiv1.ProposerDuty{}, true)
dutyStore.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
{Slot: slot, ValidatorIndex: validatorIndex + 1, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
})
validator := New(netCfg, WithNodeStorage(ns), WithDutyStore(dutyStore)).(*messageValidator)

validSignedMessage := spectestingutils.TestingProposalMessageWithHeight(ks.Shares[1], 1, height)
Expand All @@ -872,7 +874,9 @@ func Test_ValidateSSVMessage(t *testing.T) {
require.ErrorContains(t, err, ErrNoDuty.Error())

dutyStore = dutystore.New()
dutyStore.Proposer.Add(epoch, slot, validatorIndex, &eth2apiv1.ProposerDuty{}, true)
dutyStore.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
{Slot: slot, ValidatorIndex: validatorIndex, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
})
validator = New(netCfg, WithNodeStorage(ns), WithDutyStore(dutyStore)).(*messageValidator)
timeToWait, err = validator.waitAfterSlotStart(spectypes.BNRoleProposer)
require.NoError(t, err)
Expand Down
48 changes: 35 additions & 13 deletions message/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,11 @@ func Test_ValidateSSVMessage(t *testing.T) {
epoch := phase0.Epoch(1)
slot := netCfg.Beacon.FirstSlotAtEpoch(epoch)

dutyStore.Proposer.Add(epoch, slot, shares.active.ValidatorIndex, &eth2apiv1.ProposerDuty{}, true)
dutyStore.Proposer.Add(epoch, slot+4, shares.active.ValidatorIndex, &eth2apiv1.ProposerDuty{}, true)
dutyStore.Proposer.Add(epoch, slot+8, shares.active.ValidatorIndex, &eth2apiv1.ProposerDuty{}, true)
dutyStore.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
{Slot: slot, ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
{Slot: slot + 4, ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
{Slot: slot + 8, ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
})

role := spectypes.RoleAggregator
identifier := spectypes.NewMsgID(netCfg.DomainType(), ks.ValidatorPK.Serialize(), role)
Expand All @@ -589,7 +591,9 @@ func Test_ValidateSSVMessage(t *testing.T) {
slot := netCfg.Beacon.FirstSlotAtEpoch(epoch)

ds := dutystore.New()
ds.Proposer.Add(epoch, slot, shares.active.ValidatorIndex+1, &eth2apiv1.ProposerDuty{}, true)
ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
{Slot: slot, ValidatorIndex: shares.active.ValidatorIndex + 1, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
})
validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator)

identifier := spectypes.NewMsgID(netCfg.DomainType(), ks.ValidatorPK.Serialize(), spectypes.RoleProposer)
Expand All @@ -600,7 +604,9 @@ func Test_ValidateSSVMessage(t *testing.T) {
require.ErrorContains(t, err, ErrNoDuty.Error())

ds = dutystore.New()
ds.Proposer.Add(epoch, slot, shares.active.ValidatorIndex, &eth2apiv1.ProposerDuty{}, true)
ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
{Slot: slot, ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
})
validator = New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator)
_, err = validator.handleSignedSSVMessage(signedSSVMessage, topicID, netCfg.Beacon.GetSlotStartTime(slot))
require.NoError(t, err)
Expand Down Expand Up @@ -728,8 +734,12 @@ func Test_ValidateSSVMessage(t *testing.T) {
subtestName := fmt.Sprintf("%v/%v", message.RunnerRoleToString(role), message.PartialMsgTypeToString(msgType))
t.Run(subtestName, func(t *testing.T) {
ds := dutystore.New()
ds.Proposer.Add(spectestingutils.TestingDutyEpoch, spectestingutils.TestingDutySlot, shares.active.ValidatorIndex, &eth2apiv1.ProposerDuty{}, true)
ds.SyncCommittee.Add(0, shares.active.ValidatorIndex, &eth2apiv1.SyncCommitteeDuty{}, true)
ds.Proposer.Set(spectestingutils.TestingDutyEpoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
{Slot: spectestingutils.TestingDutySlot, ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
})
ds.SyncCommittee.Set(0, []dutystore.StoreSyncCommitteeDuty{
{ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.SyncCommitteeDuty{}, InCommittee: true},
})
ds.VoluntaryExit.AddDuty(spectestingutils.TestingDutySlot, phase0.BLSPubKey(shares.active.ValidatorPubKey))

validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator)
Expand Down Expand Up @@ -805,8 +815,12 @@ func Test_ValidateSSVMessage(t *testing.T) {
subtestName := fmt.Sprintf("%v/%v", message.RunnerRoleToString(role), message.PartialMsgTypeToString(msgType))
t.Run(subtestName, func(t *testing.T) {
ds := dutystore.New()
ds.Proposer.Add(spectestingutils.TestingDutyEpoch, spectestingutils.TestingDutySlot, shares.active.ValidatorIndex, &eth2apiv1.ProposerDuty{}, true)
ds.SyncCommittee.Add(0, shares.active.ValidatorIndex, &eth2apiv1.SyncCommitteeDuty{}, true)
ds.Proposer.Set(spectestingutils.TestingDutyEpoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
{Slot: spectestingutils.TestingDutySlot, ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
})
ds.SyncCommittee.Set(0, []dutystore.StoreSyncCommitteeDuty{
{ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.SyncCommitteeDuty{}, InCommittee: true},
})

validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator)

Expand Down Expand Up @@ -1001,8 +1015,12 @@ func Test_ValidateSSVMessage(t *testing.T) {
slot := netCfg.Beacon.FirstSlotAtEpoch(epoch)

ds := dutystore.New()
ds.Proposer.Add(epoch, slot, shares.active.ValidatorIndex, &eth2apiv1.ProposerDuty{}, true)
ds.SyncCommittee.Add(0, shares.active.ValidatorIndex, &eth2apiv1.SyncCommitteeDuty{}, true)
ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
{Slot: slot, ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
})
ds.SyncCommittee.Set(0, []dutystore.StoreSyncCommitteeDuty{
{ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.SyncCommitteeDuty{}, InCommittee: true},
})

validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator)

Expand Down Expand Up @@ -1322,8 +1340,12 @@ func Test_ValidateSSVMessage(t *testing.T) {
slot := netCfg.Beacon.FirstSlotAtEpoch(epoch)

ds := dutystore.New()
ds.Proposer.Add(epoch, slot, shares.active.ValidatorIndex, &eth2apiv1.ProposerDuty{}, true)
ds.SyncCommittee.Add(0, shares.active.ValidatorIndex, &eth2apiv1.SyncCommitteeDuty{}, true)
ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
{Slot: slot, ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.ProposerDuty{}, InCommittee: true},
})
ds.SyncCommittee.Set(0, []dutystore.StoreSyncCommitteeDuty{
{ValidatorIndex: shares.active.ValidatorIndex, Duty: &eth2apiv1.SyncCommitteeDuty{}, InCommittee: true},
})

validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator)

Expand Down
14 changes: 8 additions & 6 deletions operator/duties/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) {
h.logger.Debug("🛠 ticker event", zap.String("epoch_slot_pos", buildStr))

h.processExecution(currentEpoch, slot)
if h.indicesChanged {
h.duties.ResetEpoch(currentEpoch)
h.indicesChanged = false
}
h.processFetching(ctx, currentEpoch, slot)

slotsPerEpoch := h.network.Beacon.SlotsPerEpoch()
Expand Down Expand Up @@ -129,7 +125,6 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) {
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1)
h.logger.Info("🔁 indices change received", zap.String("epoch_slot_pos", buildStr))

h.indicesChanged = true
h.fetchCurrentEpoch = true

// reset next epoch duties if in appropriate slot range
Expand Down Expand Up @@ -215,10 +210,17 @@ func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, epoch phase
}

specDuties := make([]*spectypes.ValidatorDuty, 0, len(duties))
storeDuties := make([]dutystore.StoreDuty[eth2apiv1.AttesterDuty], 0, len(duties))
for _, d := range duties {
h.duties.Add(epoch, d.Slot, d.ValidatorIndex, d, true)
storeDuties = append(storeDuties, dutystore.StoreDuty[eth2apiv1.AttesterDuty]{
Slot: d.Slot,
ValidatorIndex: d.ValidatorIndex,
Duty: d,
InCommittee: true,
})
specDuties = append(specDuties, h.toSpecDuty(d, spectypes.BNRoleAttester))
}
h.duties.Set(epoch, storeDuties)

h.logger.Debug("🗂 got duties",
fields.Count(len(duties)),
Expand Down
41 changes: 21 additions & 20 deletions operator/duties/dutystore/duties.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,24 @@ import (
)

type Duty interface {
eth2apiv1.AttesterDuty | eth2apiv1.ProposerDuty | eth2apiv1.SyncCommitteeDuty
eth2apiv1.AttesterDuty | eth2apiv1.ProposerDuty
}

type dutyDescriptor[D Duty] struct {
duty *D
inCommittee bool
type StoreDuty[D Duty] struct {
Slot phase0.Slot
ValidatorIndex phase0.ValidatorIndex
Duty *D
InCommittee bool
}

type Duties[D Duty] struct {
mu sync.RWMutex
m map[phase0.Epoch]map[phase0.Slot]map[phase0.ValidatorIndex]dutyDescriptor[D]
m map[phase0.Epoch]map[phase0.Slot]map[phase0.ValidatorIndex]StoreDuty[D]
}

func NewDuties[D Duty]() *Duties[D] {
return &Duties[D]{
m: make(map[phase0.Epoch]map[phase0.Slot]map[phase0.ValidatorIndex]dutyDescriptor[D]),
m: make(map[phase0.Epoch]map[phase0.Slot]map[phase0.ValidatorIndex]StoreDuty[D]),
}
}

Expand All @@ -43,8 +45,8 @@ func (d *Duties[D]) CommitteeSlotDuties(epoch phase0.Epoch, slot phase0.Slot) []

var duties []*D
for _, descriptor := range descriptorMap {
if descriptor.inCommittee {
duties = append(duties, descriptor.duty)
if descriptor.InCommittee {
duties = append(duties, descriptor.Duty)
}
}

Expand All @@ -70,23 +72,22 @@ func (d *Duties[D]) ValidatorDuty(epoch phase0.Epoch, slot phase0.Slot, validato
return nil
}

return descriptor.duty
return descriptor.Duty
}

func (d *Duties[D]) Add(epoch phase0.Epoch, slot phase0.Slot, validatorIndex phase0.ValidatorIndex, duty *D, inCommittee bool) {
func (d *Duties[D]) Set(epoch phase0.Epoch, duties []StoreDuty[D]) {
mapped := make(map[phase0.Slot]map[phase0.ValidatorIndex]StoreDuty[D])
for _, duty := range duties {
if _, ok := mapped[duty.Slot]; !ok {
mapped[duty.Slot] = make(map[phase0.ValidatorIndex]StoreDuty[D])
}
mapped[duty.Slot][duty.ValidatorIndex] = duty
}

d.mu.Lock()
defer d.mu.Unlock()

if _, ok := d.m[epoch]; !ok {
d.m[epoch] = make(map[phase0.Slot]map[phase0.ValidatorIndex]dutyDescriptor[D])
}
if _, ok := d.m[epoch][slot]; !ok {
d.m[epoch][slot] = make(map[phase0.ValidatorIndex]dutyDescriptor[D])
}
d.m[epoch][slot][validatorIndex] = dutyDescriptor[D]{
duty: duty,
inCommittee: inCommittee,
}
d.m[epoch] = mapped
}

func (d *Duties[D]) ResetEpoch(epoch phase0.Epoch) {
Expand Down
32 changes: 18 additions & 14 deletions operator/duties/dutystore/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
)

type StoreSyncCommitteeDuty struct {
ValidatorIndex phase0.ValidatorIndex
Duty *eth2apiv1.SyncCommitteeDuty
InCommittee bool
}

type SyncCommitteeDuties struct {
mu sync.RWMutex
m map[uint64]map[phase0.ValidatorIndex]dutyDescriptor[eth2apiv1.SyncCommitteeDuty]
m map[uint64]map[phase0.ValidatorIndex]StoreSyncCommitteeDuty
}

func NewSyncCommitteeDuties() *SyncCommitteeDuties {
return &SyncCommitteeDuties{
m: make(map[uint64]map[phase0.ValidatorIndex]dutyDescriptor[eth2apiv1.SyncCommitteeDuty]),
m: make(map[uint64]map[phase0.ValidatorIndex]StoreSyncCommitteeDuty),
}
}

Expand All @@ -29,8 +35,8 @@ func (d *SyncCommitteeDuties) CommitteePeriodDuties(period uint64) []*eth2apiv1.

var duties []*eth2apiv1.SyncCommitteeDuty
for _, descriptor := range descriptorMap {
if descriptor.inCommittee {
duties = append(duties, descriptor.duty)
if descriptor.InCommittee {
duties = append(duties, descriptor.Duty)
}
}

Expand All @@ -51,21 +57,19 @@ func (d *SyncCommitteeDuties) Duty(period uint64, validatorIndex phase0.Validato
return nil
}

return descriptor.duty
return descriptor.Duty
}

func (d *SyncCommitteeDuties) Add(period uint64, validatorIndex phase0.ValidatorIndex, duty *eth2apiv1.SyncCommitteeDuty, inCommittee bool) {
func (d *SyncCommitteeDuties) Set(period uint64, duties []StoreSyncCommitteeDuty) {
mapped := make(map[phase0.ValidatorIndex]StoreSyncCommitteeDuty)
for _, duty := range duties {
mapped[duty.ValidatorIndex] = duty
}

d.mu.Lock()
defer d.mu.Unlock()

if _, ok := d.m[period]; !ok {
d.m[period] = make(map[phase0.ValidatorIndex]dutyDescriptor[eth2apiv1.SyncCommitteeDuty])
}

d.m[period][validatorIndex] = dutyDescriptor[eth2apiv1.SyncCommitteeDuty]{
duty: duty,
inCommittee: inCommittee,
}
d.m[period] = mapped
}

func (d *SyncCommitteeDuties) Reset(period uint64) {
Expand Down
9 changes: 8 additions & 1 deletion operator/duties/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,18 @@ func (h *ProposerHandler) fetchAndProcessDuties(ctx context.Context, epoch phase
h.duties.ResetEpoch(epoch)

specDuties := make([]*spectypes.ValidatorDuty, 0, len(duties))
storeDuties := make([]dutystore.StoreDuty[eth2apiv1.ProposerDuty], 0, len(duties))
for _, d := range duties {
_, inCommitteeDuty := selfIndicesSet[d.ValidatorIndex]
h.duties.Add(epoch, d.Slot, d.ValidatorIndex, d, inCommitteeDuty)
storeDuties = append(storeDuties, dutystore.StoreDuty[eth2apiv1.ProposerDuty]{
Slot: d.Slot,
ValidatorIndex: d.ValidatorIndex,
Duty: d,
InCommittee: inCommitteeDuty,
})
specDuties = append(specDuties, h.toSpecDuty(d, spectypes.BNRoleProposer))
}
h.duties.Set(epoch, storeDuties)

h.logger.Debug("📚 got duties",
fields.Count(len(duties)),
Expand Down
9 changes: 7 additions & 2 deletions operator/duties/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,16 @@ func (h *SyncCommitteeHandler) fetchAndProcessDuties(ctx context.Context, period
return fmt.Errorf("failed to fetch sync committee duties: %w", err)
}

h.duties.Reset(period)
storeDuties := make([]dutystore.StoreSyncCommitteeDuty, 0, len(duties))
for _, d := range duties {
_, inCommitteeDuty := inCommitteeIndicesSet[d.ValidatorIndex]
h.duties.Add(period, d.ValidatorIndex, d, inCommitteeDuty)
storeDuties = append(storeDuties, dutystore.StoreSyncCommitteeDuty{
ValidatorIndex: d.ValidatorIndex,
Duty: d,
InCommittee: inCommitteeDuty,
})
}
h.duties.Set(period, storeDuties)

h.prepareDutiesResultLog(period, duties, start)

Expand Down

0 comments on commit ce465c8

Please sign in to comment.