From dabff423a6614fde41e0ccabe489a8302b8a1a55 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Tue, 24 Dec 2024 09:54:53 +0000 Subject: [PATCH] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement_distribution.go | 113 ++++++------------ 1 file changed, 36 insertions(+), 77 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 1fc803d4d5..53032d01d1 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -3,6 +3,7 @@ package statementdistribution import ( "context" "fmt" + "time" "github.com/ChainSafe/gossamer/internal/log" @@ -24,29 +25,44 @@ type MuxedMessage struct { func (s StatementDistribution) Run( ctx context.Context, overseerToSubSystem <-chan any, - v1RequesterChannel <-chan any, - v1CommChannel <-chan any, v2CommChannel <-chan any, receiverRespCh <-chan any, retryReqCh <-chan any, ) { - muxedChannel := FanIn( - ctx, - overseerToSubSystem, - v1RequesterChannel, - v1CommChannel, - v2CommChannel, - receiverRespCh, - retryReqCh, - ) + // Timer for reputation aggregator trigger + ticker := time.NewTicker(1 * time.Minute) // Adjust the duration as needed + defer ticker.Stop() for { select { - case muxedMsg := <-muxedChannel: - err := s.processMuxedMessage(muxedMsg) - if err != nil { - logger.Errorf("error processing muxed message: %w", err) + case msg, ok := <-overseerToSubSystem: + if ok { + err := s.processMessage(msg) + if err != nil { + logger.Errorf("error processing overseer message: %v", err) + } + } + case msg, ok := <-v2CommChannel: + if ok { + err := s.processMuxedMessage(MuxedMessage{Source: "V2Responder", Message: msg}) + if err != nil { + logger.Errorf("error processing V2 responder message: %v", err) + } + } + case msg, ok := <-receiverRespCh: + if ok { + err := s.processMuxedMessage(MuxedMessage{Source: "Receive_Response", Message: msg}) + if err != nil { + logger.Errorf("error processing receiver response message: %v", err) + } + } + case _, ok := <-retryReqCh: + if ok { + logger.Infof("received retry request, no action taken") } + case <-ticker.C: + // Trigger reputation aggregator logic + s.triggerReputationAggregator() case <-ctx.Done(): logger.Infof("shutting down: %v", ctx.Err()) return @@ -55,37 +71,25 @@ func (s StatementDistribution) Run( } func (s StatementDistribution) processMessage(msg any) error { - switch msg := msg.(type) { case statementedistributionmessages.Backed: // TODO #4171 case statementedistributionmessages.Share: // TODO #4170 - // case statementedistributionmessages.NetworkBridgeUpdate - // TODO #4172 this above case would need to wait until network bridge receiver side is merged case parachaintypes.ActiveLeavesUpdateSignal: return s.ProcessActiveLeavesUpdateSignal(msg) case parachaintypes.BlockFinalizedSignal: return s.ProcessBlockFinalizedSignal(msg) - default: return parachaintypes.ErrUnknownOverseerMessage } - return nil } func (s StatementDistribution) processMuxedMessage(muxedMsg MuxedMessage) error { switch muxedMsg.Source { case "SubsystemMsg": - // Use processMessage for messages from overseerToSubSystem return s.processMessage(muxedMsg.Message) - case "V1Requester": - // Handle legacy V1Requester messages - return nil - case "V1Responder": - // Handle legacy V1Responder messages - return nil case "V2Responder": // Handle V2Responder messages return nil @@ -93,7 +97,6 @@ func (s StatementDistribution) processMuxedMessage(muxedMsg MuxedMessage) error // Handle response messages return nil case "Retry_Request": - // Do nothing for retry requests logger.Infof("received retry request, no action taken") return nil default: @@ -116,53 +119,9 @@ func (s StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes return nil } -func (s StatementDistribution) Stop() {} - -func FanIn( - ctx context.Context, - overseerChannel <-chan any, - v1RequesterChannel <-chan any, - v1CommChannel <-chan any, - v2CommChannel <-chan any, - receiverRespCh <-chan any, - retryReqCh <-chan any, -) <-chan MuxedMessage { - output := make(chan MuxedMessage) - - go func() { - defer close(output) - for { - select { - // On each case verify if the channel is open before send the message - case <-ctx.Done(): - return - case msg, ok := <-overseerChannel: - if ok { - output <- MuxedMessage{Source: "SubsystemMsg", Message: msg} - } - case msg, ok := <-v1RequesterChannel: - if ok { - output <- MuxedMessage{Source: "V1Requester", Message: msg} - } - case msg, ok := <-v1CommChannel: - if ok { - output <- MuxedMessage{Source: "V1Responder", Message: msg} - } - case msg, ok := <-v2CommChannel: - if ok { - output <- MuxedMessage{Source: "V2Responder", Message: msg} - } - case msg, ok := <-receiverRespCh: - if ok { - output <- MuxedMessage{Source: "Receive_Response", Message: msg} - } - case msg, ok := <-retryReqCh: - if ok { - output <- MuxedMessage{Source: "Retry_Request", Message: msg} - } - } - } - }() - - return output +func (s StatementDistribution) triggerReputationAggregator() { + // Implement the logic to send reputation changes + logger.Infof("triggering reputation aggregator logic") } + +func (s StatementDistribution) Stop() {}