Skip to content

Commit

Permalink
compress MessageCounts from 48 bytes to 1
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Nov 22, 2024
1 parent 55c7525 commit 8fbb447
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 88 deletions.
5 changes: 2 additions & 3 deletions message/validation/consensus_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ func (mv *messageValidator) validateQBFTLogic(
}

// Rule: Peer must send only 1 proposal, 1 prepare, 1 commit, and 1 round-change per round
limits := maxMessageCounts()
if err := signerState.MessageCounts.ValidateConsensusMessage(signedSSVMessage, consensusMessage, limits); err != nil {
if err := signerState.SeenMsgTypes.ValidateConsensusMessage(signedSSVMessage, consensusMessage); err != nil {
return err
}
}
Expand Down Expand Up @@ -349,7 +348,7 @@ func (mv *messageValidator) processSignerState(
signerState.SeenSigners[quorum.ToBitMask()] = struct{}{}
}

return signerState.MessageCounts.RecordConsensusMessage(signedSSVMessage, consensusMessage)
return signerState.SeenMsgTypes.RecordConsensusMessage(signedSSVMessage, consensusMessage)
}

func (mv *messageValidator) validateJustifications(message *specqbft.Message) error {
Expand Down
136 changes: 92 additions & 44 deletions message/validation/message_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,54 +9,66 @@ import (
spectypes "github.com/ssvlabs/ssv-spec/types"
)

// MessageCounts tracks the number of various message types received for validation.
type MessageCounts struct {
PreConsensus int
Proposal int
Prepare int
Commit int
RoundChange int
PostConsensus int
}

// String provides a formatted representation of the MessageCounts.
func (c *MessageCounts) String() string {
const (
preConsensusIdx = iota
proposalIdx
prepareIdx
commitIdx
roundChangeIdx
postConsensusIdx
)

// SeenMsgTypes tracks whether various message types were received for validation.
// It stores them as a bitset. It is enough because limit of all messages is 1.
type SeenMsgTypes struct {
v uint8 // wrapped into a struct to avoid incorrect usage
}

This comment has been minimized.

Copy link
@moshe-blox

moshe-blox Nov 27, 2024

Contributor

this is a very impressive optimization!

however is it really significant in practice to justify the added complexity?

lets say with 50k validators and 5 roles, we'd have about 250k records times 8 bytes times 6 fields = 12mb

that is a lot!

however we can reduce that to 1.5mb if we use byte instead of int (8 bytes) with very minimal code changes from before

also, getting/setting bits is more CPU-intensive than integers (which includes bytes), and i think we value CPU usage a bit more than 1.5mb savings

i'm not saying we should do it yet, just asking WDYT?

This comment has been minimized.

Copy link
@nkryuchkov

nkryuchkov Nov 27, 2024

Author Contributor

@moshe-blox

however is it really significant in practice to justify the added complexity?

I didn't measure how much memory this particular change reduced, but the whole PR reduced quite a lot (see PR description). I don't think it added a lot of complexity because it's documented and has all the needed methods, so we likely won't need to touch it.

lets say with 50k validators and 5 roles, we'd have about 250k records times 8 bytes times 6 fields = 12mb

We also store this data for each slot and signer, so before the PR it's up to 50000(validators)*5(roles)*13(max signers)*34(slots)*8(uint64)*6(fields) ~= 5.3 GB just for message counts. After, it's 48 times less: up to ~100 MB

however we can reduce that to 1.5mb if we use byte instead of int (8 bytes) with very minimal code changes from before

It's uint8 now, not int, so it has the same size as byte:

// byte is an alias for uint8 and is equivalent to uint8 in all ways. It is
// used, by convention, to distinguish byte values from 8-bit unsigned
// integer values.
type byte = uint8

also, getting/setting bits is more CPU-intensive than integers (which includes bytes), and i think we value CPU usage a bit more than 1.5mb savings

No, 1 byte fits in any CPU smallest register (e.g. al), and bit operations are the fastest for CPUs. E.g. recoding commit message as seen is just one 2-byte CPU instruction ORL $8, CX, checking for commit message is just two instructions TESTB $8, AX; SETNE AX.

You can check it here:
https://godbolt.org/z/ovb7MjchY

A struct with 6 fields by 8 bytes cannot be faster: it doesn't fit a CPU register, and the compiler generates a lot of instructions for doing anything with it.

This comment has been minimized.

Copy link
@nkryuchkov

nkryuchkov Nov 27, 2024

Author Contributor

A struct with 6 fields by 8 bytes cannot be faster: it doesn't fit a CPU register, and the compiler generates a lot of instructions for doing anything with it.

Actually, I think I overestimated "a lot of" instructions: it would be just loading a field from stack to CPU register, making changes, and storing it back. It would likely be a bit slower but the difference wouldn't be ever noticeable


// String provides a formatted representation of the SeenMsgTypes.
func (c *SeenMsgTypes) String() string {
b2i := func(b bool) int {
if b {
return 1
}
return 0
}

return fmt.Sprintf("pre-consensus: %v, proposal: %v, prepare: %v, commit: %v, round change: %v, post-consensus: %v",
c.PreConsensus,
c.Proposal,
c.Prepare,
c.Commit,
c.RoundChange,
c.PostConsensus,
b2i(c.reachedPreConsensusLimit()),
b2i(c.reachedProposalLimit()),
b2i(c.reachedPrepareLimit()),
b2i(c.reachedCommitLimit()),
b2i(c.reachedRoundChangeLimit()),
b2i(c.reachedPostConsensusLimit()),
)
}

// ValidateConsensusMessage checks if the provided consensus message exceeds the set limits.
// Returns an error if the message type exceeds its respective count limit.
func (c *MessageCounts) ValidateConsensusMessage(signedSSVMessage *spectypes.SignedSSVMessage, msg *specqbft.Message, limits MessageCounts) error {
func (c *SeenMsgTypes) ValidateConsensusMessage(signedSSVMessage *spectypes.SignedSSVMessage, msg *specqbft.Message) error {
switch msg.MsgType {
case specqbft.ProposalMsgType:
if c.Proposal >= limits.Proposal {
if c.reachedProposalLimit() {
err := ErrDuplicatedMessage
err.got = fmt.Sprintf("proposal, having %v", c.String())
return err
}
case specqbft.PrepareMsgType:
if c.Prepare >= limits.Prepare {
if c.reachedPrepareLimit() {
err := ErrDuplicatedMessage
err.got = fmt.Sprintf("prepare, having %v", c.String())
return err
}
case specqbft.CommitMsgType:
if len(signedSSVMessage.OperatorIDs) == 1 {
if c.Commit >= limits.Commit {
if c.reachedCommitLimit() {
err := ErrDuplicatedMessage
err.got = fmt.Sprintf("commit, having %v", c.String())
return err
}
}
case specqbft.RoundChangeMsgType:
if c.RoundChange >= limits.RoundChange {
if c.reachedRoundChangeLimit() {
err := ErrDuplicatedMessage

err.got = fmt.Sprintf("round change, having %v", c.String())
Expand All @@ -71,16 +83,16 @@ func (c *MessageCounts) ValidateConsensusMessage(signedSSVMessage *spectypes.Sig

// ValidatePartialSignatureMessage checks if the provided partial signature message exceeds the set limits.
// Returns an error if the message type exceeds its respective count limit.
func (c *MessageCounts) ValidatePartialSignatureMessage(m *spectypes.PartialSignatureMessages, limits MessageCounts) error {
func (c *SeenMsgTypes) ValidatePartialSignatureMessage(m *spectypes.PartialSignatureMessages) error {
switch m.Type {
case spectypes.RandaoPartialSig, spectypes.SelectionProofPartialSig, spectypes.ContributionProofs, spectypes.ValidatorRegistrationPartialSig, spectypes.VoluntaryExitPartialSig:
if c.PreConsensus >= limits.PreConsensus {
if c.reachedPreConsensusLimit() {
err := ErrInvalidPartialSignatureTypeCount
err.got = fmt.Sprintf("pre-consensus, having %v", c.String())
return err
}
case spectypes.PostConsensusPartialSig:
if c.PostConsensus >= limits.PostConsensus {
if c.reachedPostConsensusLimit() {
err := ErrInvalidPartialSignatureTypeCount
err.got = fmt.Sprintf("post-consensus, having %v", c.String())
return err
Expand All @@ -93,45 +105,81 @@ func (c *MessageCounts) ValidatePartialSignatureMessage(m *spectypes.PartialSign
}

// RecordConsensusMessage updates the counts based on the provided consensus message type.
func (c *MessageCounts) RecordConsensusMessage(signedSSVMessage *spectypes.SignedSSVMessage, msg *specqbft.Message) error {
func (c *SeenMsgTypes) RecordConsensusMessage(signedSSVMessage *spectypes.SignedSSVMessage, msg *specqbft.Message) error {
switch msg.MsgType {
case specqbft.ProposalMsgType:
c.Proposal++
c.recordProposal()
case specqbft.PrepareMsgType:
c.Prepare++
c.recordPrepare()
case specqbft.CommitMsgType:
if len(signedSSVMessage.OperatorIDs) == 1 {
c.Commit++
c.recordCommit()
}
case specqbft.RoundChangeMsgType:
c.RoundChange++
c.recordRoundChange()
default:
return fmt.Errorf("unexpected signed message type") // should be checked before
}
return nil
}

// RecordPartialSignatureMessage updates the counts based on the provided partial signature message type.
func (c *MessageCounts) RecordPartialSignatureMessage(messages *spectypes.PartialSignatureMessages) error {
func (c *SeenMsgTypes) RecordPartialSignatureMessage(messages *spectypes.PartialSignatureMessages) error {
switch messages.Type {
case spectypes.RandaoPartialSig, spectypes.SelectionProofPartialSig, spectypes.ContributionProofs, spectypes.ValidatorRegistrationPartialSig, spectypes.VoluntaryExitPartialSig:
c.PreConsensus++
c.recordPreConsensus()
case spectypes.PostConsensusPartialSig:
c.PostConsensus++
c.recordPostConsensus()
default:
return fmt.Errorf("unexpected partial signature message type") // should be checked before
}
return nil
}

// maxMessageCounts is the maximum number of acceptable messages from a signer within a slot & round.
func maxMessageCounts() MessageCounts {
return MessageCounts{
PreConsensus: 1,
Proposal: 1,
Prepare: 1,
Commit: 1,
RoundChange: 1,
PostConsensus: 1,
}
func (c *SeenMsgTypes) recordPreConsensus() {
c.v |= 1 << preConsensusIdx
}

func (c *SeenMsgTypes) recordProposal() {
c.v |= 1 << proposalIdx
}

func (c *SeenMsgTypes) recordPrepare() {
c.v |= 1 << prepareIdx
}

func (c *SeenMsgTypes) recordCommit() {
c.v |= 1 << commitIdx
}

func (c *SeenMsgTypes) recordRoundChange() {
c.v |= 1 << roundChangeIdx
}

func (c *SeenMsgTypes) recordPostConsensus() {
c.v |= 1 << postConsensusIdx
}

func (c *SeenMsgTypes) reachedPreConsensusLimit() bool {
return (c.v & (1 << preConsensusIdx)) != 0
}

func (c *SeenMsgTypes) reachedProposalLimit() bool {
return (c.v & (1 << proposalIdx)) != 0
}

func (c *SeenMsgTypes) reachedPrepareLimit() bool {
return (c.v & (1 << prepareIdx)) != 0
}

func (c *SeenMsgTypes) reachedCommitLimit() bool {
return (c.v & (1 << commitIdx)) != 0
}

func (c *SeenMsgTypes) reachedRoundChangeLimit() bool {
return (c.v & (1 << roundChangeIdx)) != 0
}

func (c *SeenMsgTypes) reachedPostConsensusLimit() bool {
return (c.v & (1 << postConsensusIdx)) != 0
}
5 changes: 2 additions & 3 deletions message/validation/partial_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ func (mv *messageValidator) validatePartialSigMessagesByDutyLogic(
// - 1 SelectionProofPartialSig and 1 PostConsensusPartialSig for Sync committee contribution
// - 1 ValidatorRegistrationPartialSig for Validator Registration
// - 1 VoluntaryExitPartialSig for Voluntary Exit
limits := maxMessageCounts()
if err := signerState.MessageCounts.ValidatePartialSignatureMessage(partialSignatureMessages, limits); err != nil {
if err := signerState.SeenMsgTypes.ValidatePartialSignatureMessage(partialSignatureMessages); err != nil {
return err
}
}
Expand Down Expand Up @@ -239,7 +238,7 @@ func (mv *messageValidator) updatePartialSignatureState(
stateBySlot.Set(messageSlot, messageEpoch, signerState)
}

return signerState.MessageCounts.RecordPartialSignatureMessage(partialSignatureMessages)
return signerState.SeenMsgTypes.RecordPartialSignatureMessage(partialSignatureMessages)
}

func (mv *messageValidator) validPartialSigMsgType(msgType spectypes.PartialSigMsgType) bool {
Expand Down
8 changes: 4 additions & 4 deletions message/validation/signer_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
// SignerState represents the state of a signer, including its start time, slot, round,
// message counts, proposal data, and the number of duties performed in the current epoch.
type SignerState struct {
Slot phase0.Slot // index stores slot modulo, so we also need to store slot here
Round specqbft.Round
MessageCounts MessageCounts
Slot phase0.Slot // index stores slot modulo, so we also need to store slot here
Round specqbft.Round
SeenMsgTypes SeenMsgTypes
// Storing pointer to byte array instead of slice to reduce memory consumption when we don't need the hash.
// A nil slice could be an alternative, but it'd consume more memory, and we'd need to cast [32]byte returned by sha256.Sum256() to slice.
HashedProposalData *[32]byte
Expand All @@ -30,7 +30,7 @@ func NewSignerState(slot phase0.Slot, round specqbft.Round) *SignerState {
func (s *SignerState) Reset(slot phase0.Slot, round specqbft.Round) {
s.Slot = slot
s.Round = round
s.MessageCounts = MessageCounts{}
s.SeenMsgTypes = SeenMsgTypes{}
s.HashedProposalData = nil
s.SeenSigners = nil // lazy init on demand to reduce mem consumption
}
Loading

0 comments on commit 8fbb447

Please sign in to comment.