Skip to content

Commit

Permalink
feat(statement-distribution):Implement FanIn approach to statement di…
Browse files Browse the repository at this point in the history
…stribution
  • Loading branch information
DanielDDHM committed Dec 24, 2024
1 parent 25771c5 commit dabff42
Showing 1 changed file with 36 additions and 77 deletions.
113 changes: 36 additions & 77 deletions dot/parachain/statement-distribution/statement_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package statementdistribution
import (
"context"
"fmt"
"time"

"github.com/ChainSafe/gossamer/internal/log"

Expand All @@ -24,29 +25,44 @@ type MuxedMessage struct {
func (s StatementDistribution) Run(
ctx context.Context,
overseerToSubSystem <-chan any,
v1RequesterChannel <-chan any,
v1CommChannel <-chan any,
v2CommChannel <-chan any,
receiverRespCh <-chan any,
retryReqCh <-chan any,
) {
muxedChannel := FanIn(
ctx,
overseerToSubSystem,
v1RequesterChannel,
v1CommChannel,
v2CommChannel,
receiverRespCh,
retryReqCh,
)
// Timer for reputation aggregator trigger
ticker := time.NewTicker(1 * time.Minute) // Adjust the duration as needed
defer ticker.Stop()

for {
select {
case muxedMsg := <-muxedChannel:
err := s.processMuxedMessage(muxedMsg)
if err != nil {
logger.Errorf("error processing muxed message: %w", err)
case msg, ok := <-overseerToSubSystem:
if ok {
err := s.processMessage(msg)
if err != nil {
logger.Errorf("error processing overseer message: %v", err)
}
}
case msg, ok := <-v2CommChannel:
if ok {
err := s.processMuxedMessage(MuxedMessage{Source: "V2Responder", Message: msg})
if err != nil {
logger.Errorf("error processing V2 responder message: %v", err)
}
}
case msg, ok := <-receiverRespCh:
if ok {
err := s.processMuxedMessage(MuxedMessage{Source: "Receive_Response", Message: msg})
if err != nil {
logger.Errorf("error processing receiver response message: %v", err)
}
}
case _, ok := <-retryReqCh:
if ok {
logger.Infof("received retry request, no action taken")
}
case <-ticker.C:
// Trigger reputation aggregator logic
s.triggerReputationAggregator()
case <-ctx.Done():
logger.Infof("shutting down: %v", ctx.Err())
return
Expand All @@ -55,45 +71,32 @@ func (s StatementDistribution) Run(
}

func (s StatementDistribution) processMessage(msg any) error {

switch msg := msg.(type) {
case statementedistributionmessages.Backed:
// TODO #4171
case statementedistributionmessages.Share:
// TODO #4170
// case statementedistributionmessages.NetworkBridgeUpdate
// TODO #4172 this above case would need to wait until network bridge receiver side is merged
case parachaintypes.ActiveLeavesUpdateSignal:
return s.ProcessActiveLeavesUpdateSignal(msg)
case parachaintypes.BlockFinalizedSignal:
return s.ProcessBlockFinalizedSignal(msg)

default:
return parachaintypes.ErrUnknownOverseerMessage
}

return nil
}

func (s StatementDistribution) processMuxedMessage(muxedMsg MuxedMessage) error {
switch muxedMsg.Source {
case "SubsystemMsg":
// Use processMessage for messages from overseerToSubSystem
return s.processMessage(muxedMsg.Message)
case "V1Requester":
// Handle legacy V1Requester messages
return nil
case "V1Responder":
// Handle legacy V1Responder messages
return nil
case "V2Responder":
// Handle V2Responder messages
return nil
case "Receive_Response":
// Handle response messages
return nil
case "Retry_Request":
// Do nothing for retry requests
logger.Infof("received retry request, no action taken")
return nil
default:
Expand All @@ -116,53 +119,9 @@ func (s StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes
return nil
}

func (s StatementDistribution) Stop() {}

func FanIn(
ctx context.Context,
overseerChannel <-chan any,
v1RequesterChannel <-chan any,
v1CommChannel <-chan any,
v2CommChannel <-chan any,
receiverRespCh <-chan any,
retryReqCh <-chan any,
) <-chan MuxedMessage {
output := make(chan MuxedMessage)

go func() {
defer close(output)
for {
select {
// On each case verify if the channel is open before send the message
case <-ctx.Done():
return
case msg, ok := <-overseerChannel:
if ok {
output <- MuxedMessage{Source: "SubsystemMsg", Message: msg}
}
case msg, ok := <-v1RequesterChannel:
if ok {
output <- MuxedMessage{Source: "V1Requester", Message: msg}
}
case msg, ok := <-v1CommChannel:
if ok {
output <- MuxedMessage{Source: "V1Responder", Message: msg}
}
case msg, ok := <-v2CommChannel:
if ok {
output <- MuxedMessage{Source: "V2Responder", Message: msg}
}
case msg, ok := <-receiverRespCh:
if ok {
output <- MuxedMessage{Source: "Receive_Response", Message: msg}
}
case msg, ok := <-retryReqCh:
if ok {
output <- MuxedMessage{Source: "Retry_Request", Message: msg}
}
}
}
}()

return output
func (s StatementDistribution) triggerReputationAggregator() {
// Implement the logic to send reputation changes
logger.Infof("triggering reputation aggregator logic")
}

func (s StatementDistribution) Stop() {}

0 comments on commit dabff42

Please sign in to comment.