Skip to content

Commit

Permalink
chore(dot/parachain): improve Run method of subsystems (#4113)
Browse files Browse the repository at this point in the history
- I have updated the Run method of the subsystem interface to pass only necessary arguments.
- removed context, context cancel func, wait group and OverseerToSubSystem channel from the state struct of all the subsystems(example: candidateBacking{}, AvailabilityStoreSubsystem{}).
- improved Run method of some subsystems.
- removed return type from RegisterSubsystem method of overseer.
- because of these changes, some tests were failing, so I fixed those tests.
  • Loading branch information
axaysagathiya authored Aug 19, 2024
1 parent 9c51248 commit 7cdb52b
Show file tree
Hide file tree
Showing 20 changed files with 239 additions and 355 deletions.
160 changes: 72 additions & 88 deletions dot/parachain/availability-store/availability_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/parachain/chainapi"
Expand Down Expand Up @@ -75,12 +74,7 @@ var defaultPruningConfig = pruningConfig{

// AvailabilityStoreSubsystem is the struct that holds subsystem data for the availability store
type AvailabilityStoreSubsystem struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

subSystemToOverseer chan<- any
OverseerToSubSystem <-chan any
availabilityStore availabilityStore
finalizedBlockNumber parachaintypes.BlockNumber
knownUnfinalizedBlocks knownUnfinalizedBlocks
Expand All @@ -91,12 +85,7 @@ type AvailabilityStoreSubsystem struct {
func NewAvailabilityStoreSubsystem(db database.Database) *AvailabilityStoreSubsystem {
availabilityStore := NewAvailabilityStore(db)

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

availabilityStoreSubsystem := &AvailabilityStoreSubsystem{
ctx: ctx,
cancel: cancel,
pruningConfig: defaultPruningConfig,
availabilityStore: *availabilityStore,
knownUnfinalizedBlocks: *newKnownUnfinalizedBlock(),
Expand Down Expand Up @@ -420,89 +409,85 @@ func branchesFromChunks(chunks [][]byte) (branches, error) {
}

// Run runs the availability store subsystem
func (av *AvailabilityStoreSubsystem) Run(ctx context.Context, OverseerToSubsystem chan any,
SubsystemToOverseer chan any) {

av.wg.Add(1)
go av.processMessages()
func (av *AvailabilityStoreSubsystem) Run(ctx context.Context, overseerToSubsystem <-chan any) {
for {
select {
case msg := <-overseerToSubsystem:
logger.Infof("received message %T, %v", msg, msg)
av.processMessage(msg)
case <-time.After(av.pruningConfig.pruningInterval):
av.pruneAll()
case <-ctx.Done():
if err := ctx.Err(); err != nil {
logger.Errorf("ctx error: %v", err)
}
return
}
}
}

// Name returns the name of the availability store subsystem
func (*AvailabilityStoreSubsystem) Name() parachaintypes.SubSystemName {
return parachaintypes.AvailabilityStore
}

func (av *AvailabilityStoreSubsystem) processMessages() {
for {
select {
case msg := <-av.OverseerToSubSystem:
logger.Infof("received message %T, %v", msg, msg)
switch msg := msg.(type) {
case QueryAvailableData:
err := av.handleQueryAvailableData(msg)
if err != nil {
logger.Errorf("failed to handle available data: %w", err)
}
case QueryDataAvailability:
err := av.handleQueryDataAvailability(msg)
if err != nil {
logger.Errorf("failed to handle query data availability: %w", err)
}
case QueryChunk:
err := av.handleQueryChunk(msg)
if err != nil {
logger.Errorf("failed to handle query chunk: %w", err)
}
case QueryChunkSize:
err := av.handleQueryChunkSize(msg)
if err != nil {
logger.Errorf("failed to handle query chunk size: %w", err)
}
case QueryAllChunks:
err := av.handleQueryAllChunks(msg)
if err != nil {
logger.Errorf("failed to handle query all chunks: %w", err)
}
case QueryChunkAvailability:
err := av.handleQueryChunkAvailability(msg)
if err != nil {
logger.Errorf("failed to handle query chunk availability: %w", err)
}
case StoreChunk:
err := av.handleStoreChunk(msg)
if err != nil {
logger.Errorf("failed to handle store chunk: %w", err)
}
case StoreAvailableData:
err := av.handleStoreAvailableData(msg)
if err != nil {
logger.Errorf("failed to handle store available data: %w", err)
}
case parachaintypes.ActiveLeavesUpdateSignal:
err := av.ProcessActiveLeavesUpdateSignal(msg)
if err != nil {
logger.Errorf("failed to process active leaves update signal: %w", err)
}
case parachaintypes.BlockFinalizedSignal:
err := av.ProcessBlockFinalizedSignal(msg)
if err != nil {
logger.Errorf("failed to process block finalized signal: %w", err)
}
func (av *AvailabilityStoreSubsystem) processMessage(msg any) {
switch msg := msg.(type) {
case QueryAvailableData:
err := av.handleQueryAvailableData(msg)
if err != nil {
logger.Errorf("failed to handle available data: %w", err)
}
case QueryDataAvailability:
err := av.handleQueryDataAvailability(msg)
if err != nil {
logger.Errorf("failed to handle query data availability: %w", err)
}
case QueryChunk:
err := av.handleQueryChunk(msg)
if err != nil {
logger.Errorf("failed to handle query chunk: %w", err)
}
case QueryChunkSize:
err := av.handleQueryChunkSize(msg)
if err != nil {
logger.Errorf("failed to handle query chunk size: %w", err)
}
case QueryAllChunks:
err := av.handleQueryAllChunks(msg)
if err != nil {
logger.Errorf("failed to handle query all chunks: %w", err)
}
case QueryChunkAvailability:
err := av.handleQueryChunkAvailability(msg)
if err != nil {
logger.Errorf("failed to handle query chunk availability: %w", err)
}
case StoreChunk:
err := av.handleStoreChunk(msg)
if err != nil {
logger.Errorf("failed to handle store chunk: %w", err)
}
case StoreAvailableData:
err := av.handleStoreAvailableData(msg)
if err != nil {
logger.Errorf("failed to handle store available data: %w", err)
}
case parachaintypes.ActiveLeavesUpdateSignal:
err := av.ProcessActiveLeavesUpdateSignal(msg)
if err != nil {
logger.Errorf("failed to process active leaves update signal: %w", err)
}
case parachaintypes.BlockFinalizedSignal:
err := av.ProcessBlockFinalizedSignal(msg)
if err != nil {
logger.Errorf("failed to process block finalized signal: %w", err)
}

default:
if msg != nil {
// this error shouldn't happen, so we'll panic to catch it during development
panic(fmt.Sprintf("%s: %T", parachaintypes.ErrUnknownOverseerMessage.Error(), msg))
}
}
case <-av.ctx.Done():
if err := av.ctx.Err(); err != nil {
logger.Errorf("ctx error: %v", err)
}
av.wg.Done()
return
case <-time.After(av.pruningConfig.pruningInterval):
av.pruneAll()
default:
if msg != nil {
// this error shouldn't happen, so we'll panic to catch it during development
panic(fmt.Sprintf("%s: %T", parachaintypes.ErrUnknownOverseerMessage.Error(), msg))
}
}
}
Expand Down Expand Up @@ -928,8 +913,7 @@ func (av *AvailabilityStoreSubsystem) processPruneKey(key []byte) error {
}

func (av *AvailabilityStoreSubsystem) Stop() {
av.cancel()
av.wg.Wait()
logger.Infof("Stopping availability store subsystem")
}

func (av *AvailabilityStoreSubsystem) deleteUnfinalizedHeight(writer database.Writer,
Expand Down
12 changes: 7 additions & 5 deletions dot/parachain/availability-store/availability_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func newTestHarness(t *testing.T) *testHarness {

require.NoError(t, err)

availabilityStore.OverseerToSubSystem = harness.overseer.RegisterSubsystem(availabilityStore)
harness.overseer.RegisterSubsystem(availabilityStore)

return harness
}
Expand Down Expand Up @@ -1115,18 +1115,20 @@ func buildAvailableDataBranchesRoot(t *testing.T, numValidators uint32, availabl
}

func newTestOverseer() *testOverseer {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())

return &testOverseer{
ctx: ctx,
cancel: cancel,
subsystems: make(map[parachaintypes.Subsystem]chan any),
SubsystemsToOverseer: make(chan any),
}
}

type testOverseer struct {
ctx context.Context
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

subsystems map[parachaintypes.Subsystem]chan any
SubsystemsToOverseer chan any
Expand All @@ -1148,7 +1150,7 @@ func (to *testOverseer) Start() error {
for subsystem, overseerToSubSystem := range to.subsystems {
to.wg.Add(1)
go func(sub parachaintypes.Subsystem, overseerToSubSystem chan any) {
sub.Run(to.ctx, overseerToSubSystem, to.SubsystemsToOverseer)
sub.Run(to.ctx, overseerToSubSystem)
logger.Infof("subsystem %v stopped", sub)
to.wg.Done()
}(subsystem, overseerToSubSystem)
Expand Down
19 changes: 5 additions & 14 deletions dot/parachain/backing/candidate_backing.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ var (

// CandidateBacking represents the state of the subsystem responsible for managing candidate backing.
type CandidateBacking struct {
ctx context.Context
cancel context.CancelFunc

SubSystemToOverseer chan<- any
OverseerToSubSystem <-chan any
// State tracked for all relay-parents backing work is ongoing for. This includes
// all active leaves.
//
Expand Down Expand Up @@ -199,18 +195,15 @@ type StatementMessage struct {

// New creates a new CandidateBacking instance and initialises it with the provided overseer channel.
func New(overseerChan chan<- any) *CandidateBacking {
ctx, cancel := context.WithCancel(context.Background())
return &CandidateBacking{
ctx: ctx,
cancel: cancel,
SubSystemToOverseer: overseerChan,
perRelayParent: map[common.Hash]*perRelayParentState{},
perCandidate: map[parachaintypes.CandidateHash]*perCandidateState{},
perLeaf: map[common.Hash]*activeLeafState{},
}
}

func (cb *CandidateBacking) Run(ctx context.Context, overseerToSubSystem chan any, subSystemToOverseer chan any) {
func (cb *CandidateBacking) Run(ctx context.Context, overseerToSubSystem <-chan any) {
chRelayParentAndCommand := make(chan relayParentAndCommand, 1)

for {
Expand All @@ -219,26 +212,24 @@ func (cb *CandidateBacking) Run(ctx context.Context, overseerToSubSystem chan an
if err := cb.processValidatedCandidateCommand(rpAndCmd, chRelayParentAndCommand); err != nil {
logger.Errorf("processing validated candidated command: %s", err.Error())
}
case msg, ok := <-cb.OverseerToSubSystem:
case msg, ok := <-overseerToSubSystem:
if !ok {
return
}
if err := cb.processMessage(msg, chRelayParentAndCommand); err != nil {
logger.Errorf("processing message: %s", err.Error())
}
case <-cb.ctx.Done():
case <-ctx.Done():
close(chRelayParentAndCommand)
if err := cb.ctx.Err(); err != nil {
if err := ctx.Err(); err != nil {
logger.Errorf("ctx error: %s\n", err)
}
return
}
}
}

func (cb *CandidateBacking) Stop() {
cb.cancel()
}
func (cb *CandidateBacking) Stop() {}

func (*CandidateBacking) Name() parachaintypes.SubSystemName {
return parachaintypes.CandidateBacking
Expand Down
2 changes: 1 addition & 1 deletion dot/parachain/backing/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func initBackingAndOverseerMock(t *testing.T) (*backing.CandidateBacking, *overs
overseerMock := overseer.NewMockableOverseer(t)

backing := backing.New(overseerMock.SubsystemsToOverseer)
backing.OverseerToSubSystem = overseerMock.RegisterSubsystem(backing)
overseerMock.RegisterSubsystem(backing)
backing.SubSystemToOverseer = overseerMock.GetSubsystemToOverseerChannel()

backing.Keystore = keystore.NewBasicKeystore("test", crypto.Sr25519Type)
Expand Down
Loading

0 comments on commit 7cdb52b

Please sign in to comment.