Skip to content

Commit

Permalink
refactor ValidationHost to Host, and validationWorkerPool to workerPool
Browse files Browse the repository at this point in the history
test(dot/parachain/backing): can not second multiple candidate per relay parent without prospective parachain (#4134)

- I have written a test to ensure it's impossible to second multiple candidates per relay parent when prospective parachain mode is inactive(async backing is not supported).
    - received a candidate backing message to second a candidate. Ensured it seconds the candidate.
    - I received another candidate backing message to second a candidate with the same relay parent. I ensured this candidate got rejected.
- Written some helper functions to reuse the code.
- Added a function to stop the mockable overseer and wait some time to finish the ongoing process before exiting the test.
- I have added an extra check to handle expected overseer action in the mockable overseer.
- Because of this extra check, other tests failed, so I fixed them.

refactor go concurrancy

test(dot/parachain/backing): ensure new lead view doesn't clobber the old view (#4148)
  • Loading branch information
edwardmack committed Oct 4, 2024
1 parent c0ef147 commit a17be6b
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 268 deletions.
41 changes: 19 additions & 22 deletions dot/parachain/candidate-validation/candidate_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type CandidateValidation struct {
SubsystemToOverseer chan<- any
OverseerToSubsystem <-chan any
BlockState BlockState
pvfHost *pvf.ValidationHost
pvfHost *pvf.Host
}

type BlockState interface {
Expand All @@ -51,9 +51,8 @@ func NewCandidateValidation(overseerChan chan<- any, blockState BlockState) *Can
}

// Run starts the CandidateValidation subsystem
func (cv *CandidateValidation) Run(context.Context, chan any, chan any) {
func (cv *CandidateValidation) Run(context.Context, <-chan any) {
cv.wg.Add(1)
go cv.pvfHost.Start()
go cv.processMessages(&cv.wg)
}

Expand All @@ -76,7 +75,6 @@ func (*CandidateValidation) ProcessBlockFinalizedSignal(parachaintypes.BlockFina

// Stop stops the CandidateValidation subsystem
func (cv *CandidateValidation) Stop() {
cv.pvfHost.Stop()
close(cv.stopChan)
cv.wg.Wait()
}
Expand All @@ -93,30 +91,31 @@ func (cv *CandidateValidation) processMessages(wg *sync.WaitGroup) {
cv.validateFromChainState(msg)

case ValidateFromExhaustive:
taskResult := make(chan *pvf.ValidationTaskResult)
validationTask := &pvf.ValidationTask{
PersistedValidationData: msg.PersistedValidationData,
ValidationCode: &msg.ValidationCode,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.PoV,
ExecutorParams: msg.ExecutorParams,
PvfExecTimeoutKind: msg.PvfExecTimeoutKind,
ResultCh: taskResult,
}
go cv.pvfHost.Validate(validationTask)

result := <-taskResult
if result.InternalError != nil {
logger.Errorf("failed to validate from exhaustive: %w", result.InternalError)
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Err: result.InternalError,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: *result.Result,
}
}
taskResultChan := cv.pvfHost.Validate(validationTask)

go func() {

result := <-taskResultChan
if result.InternalError != nil {
logger.Errorf("failed to validate from exhaustive: %w", result.InternalError)
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Err: result.InternalError,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: *result.Result,
}
}
}()
case PreCheck:
// TODO: implement functionality to handle PreCheck, see issue #3921

Expand Down Expand Up @@ -195,19 +194,17 @@ func (cv *CandidateValidation) validateFromChainState(msg ValidateFromChainState
return
}

taskResult := make(chan *pvf.ValidationTaskResult)
validationTask := &pvf.ValidationTask{
PersistedValidationData: *persistedValidationData,
ValidationCode: validationCode,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.Pov,
ExecutorParams: msg.ExecutorParams,
PvfExecTimeoutKind: parachaintypes.PvfExecTimeoutKind{},
ResultCh: taskResult,
}
go cv.pvfHost.Validate(validationTask)

result := <-taskResult
taskResultChan := cv.pvfHost.Validate(validationTask)
result := <-taskResultChan
if result.InternalError != nil {
logger.Errorf("failed to validate from chain state: %w", result.InternalError)
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ func TestCandidateValidation_validateFromExhaustive(t *testing.T) {
executionError := pvf.ExecutionError

pvfHost := pvf.NewValidationHost()
pvfHost.Start()
defer pvfHost.Stop()

ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
Expand Down Expand Up @@ -293,7 +291,7 @@ func TestCandidateValidation_validateFromExhaustive(t *testing.T) {

taskResult := make(chan *pvf.ValidationTaskResult)
defer close(taskResult)
tt.validationTask.ResultCh = taskResult
//tt.validationTask.ResultCh = taskResult

go pvfHost.Validate(tt.validationTask)

Expand Down
46 changes: 25 additions & 21 deletions dot/parachain/collator-protocol/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/ChainSafe/gossamer/dot/network"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
networkbridgemessages "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/messages"
"github.com/ChainSafe/gossamer/dot/parachain/overseer"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/dot/peerset"
)
Expand Down Expand Up @@ -375,17 +374,22 @@ func TestHandleCollationMessageDeclare(t *testing.T) {
c := c
t.Run(c.description, func(t *testing.T) {
t.Parallel()

subsystemToOverseer := make(chan any)
cpvs := CollatorProtocolValidatorSide{
peerData: c.peerData,
currentAssignments: c.currentAssignments,
SubSystemToOverseer: subsystemToOverseer,
peerData: c.peerData,
currentAssignments: c.currentAssignments,
}

mockOverseer := overseer.NewMockableOverseer(t)
mockOverseer.RegisterSubsystem(&cpvs)
cpvs.SubSystemToOverseer = mockOverseer.GetSubsystemToOverseerChannel()

mockOverseer.Start()
defer mockOverseer.Stop()
// ensure that the expected messages are sent to the overseer
if len(c.expectedMessages) > 0 {
go func() {
for _, expectedMessage := range c.expectedMessages {
require.Equal(t, expectedMessage, <-subsystemToOverseer)
}
}()
}

msg := collatorprotocolmessages.NewCollationProtocol()
vdtChild := collatorprotocolmessages.NewCollatorProtocolMessage()
Expand Down Expand Up @@ -444,7 +448,6 @@ func TestHandleCollationMessageAdvertiseCollation(t *testing.T) {
},
errString: ErrRelayParentUnknown.Error(),
},

{
description: "fail with unknown peer if peer is not tracked in our list of active collators",
advertiseCollation: collatorprotocolmessages.AdvertiseCollation(testRelayParent),
Expand Down Expand Up @@ -574,19 +577,21 @@ func TestHandleCollationMessageAdvertiseCollation(t *testing.T) {
t.Run(c.description, func(t *testing.T) {
t.Parallel()

subsystemToOverseer := make(chan any)
cpvs := CollatorProtocolValidatorSide{
net: c.net,
perRelayParent: c.perRelayParent,
peerData: c.peerData,
activeLeaves: c.activeLeaves,
SubSystemToOverseer: subsystemToOverseer,
net: c.net,
perRelayParent: c.perRelayParent,
peerData: c.peerData,
activeLeaves: c.activeLeaves,
}

mockOverseer := overseer.NewMockableOverseer(t)
mockOverseer.RegisterSubsystem(&cpvs)
cpvs.SubSystemToOverseer = mockOverseer.GetSubsystemToOverseerChannel()

mockOverseer.Start()
defer mockOverseer.Stop()
// ensure that the expected messages are sent to the overseer
if c.expectedMessage != nil {
go func() {
require.Equal(t, c.expectedMessage, <-subsystemToOverseer)
}()
}

msg := collatorprotocolmessages.NewCollationProtocol()
vdtChild := collatorprotocolmessages.NewCollatorProtocolMessage()
Expand All @@ -604,7 +609,6 @@ func TestHandleCollationMessageAdvertiseCollation(t *testing.T) {
} else {
require.ErrorContains(t, err, c.errString)
}

})
}
}
Expand Down
7 changes: 7 additions & 0 deletions dot/parachain/overseer/mockable_overseer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,15 @@ func (m *MockableOverseer) processMessages() {
}

actionIndex = actionIndex + 1
} else {
m.t.Errorf("unexpected message: %T", msg)
return
}
case <-m.ctx.Done():
if actionIndex < len(m.actionsForExpectedMessages) {
m.t.Errorf("expected %d overseer actions, but got only %d", len(m.actionsForExpectedMessages), actionIndex)
}

if err := m.ctx.Err(); err != nil {
m.t.Logf("ctx error: %v\n", err)
}
Expand Down
137 changes: 66 additions & 71 deletions dot/parachain/pvf/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pvf

import (
"fmt"
"sync"

parachainruntime "github.com/ChainSafe/gossamer/dot/parachain/runtime"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
Expand All @@ -12,96 +11,92 @@ import (

var logger = log.NewFromGlobal(log.AddContext("pkg", "pvf"), log.SetLevel(log.Debug))

type ValidationHost struct {
wg sync.WaitGroup
type Host struct {
stopCh chan struct{}

workerPool *validationWorkerPool
workerPool *workerPool
}

func (v *ValidationHost) Start() {
v.wg.Add(1)
logger.Debug("Starting validation host")
go func() {
defer v.wg.Done()
}()
}

func (v *ValidationHost) Stop() {
close(v.stopCh)
v.wg.Wait()
}

func NewValidationHost() *ValidationHost {
return &ValidationHost{
//func (v *Host) Start() {
// v.wg.Add(1)
// logger.Debug("Starting validation host")
// go func() {
// defer v.wg.Done()
// }()
//}

//func (v *Host) Stop() {
// close(v.stopCh)
// v.wg.Wait()
//}

func NewValidationHost() *Host {
return &Host{
stopCh: make(chan struct{}),
workerPool: newValidationWorkerPool(),
}
}

func (v *ValidationHost) Validate(msg *ValidationTask) {
logger.Debugf("Validating worker %x", msg.WorkerID)

validationCodeHash := msg.ValidationCode.Hash()
// basic checks
validationErr, internalErr := performBasicChecks(&msg.CandidateReceipt.Descriptor,
msg.PersistedValidationData.MaxPovSize,
msg.PoV,
validationCodeHash)

if internalErr != nil {
logger.Errorf("performing basic checks: %w", internalErr)
intErr := &ValidationTaskResult{
who: validationCodeHash,
InternalError: internalErr,
func (v *Host) Validate(msg *ValidationTask) <-chan *ValidationTaskResult {
resultCh := make(chan *ValidationTaskResult)
go func() {
defer close(resultCh)
logger.Debugf("Start Validating worker %x", msg.WorkerID)
validationCodeHash := msg.ValidationCode.Hash()
// performBasicChecks
validationErr, internalErr := performBasicChecks(&msg.CandidateReceipt.Descriptor,
msg.PersistedValidationData.MaxPovSize,
msg.PoV,
validationCodeHash)

if internalErr != nil {
resultCh <- &ValidationTaskResult{
who: validationCodeHash,
InternalError: internalErr,
}
}
msg.ResultCh <- intErr
return
}

if validationErr != nil {
valErr := &ValidationTaskResult{
who: validationCodeHash,
Result: &ValidationResult{
InvalidResult: validationErr,
},
if validationErr != nil {
resultCh <- &ValidationTaskResult{
who: validationCodeHash,
Result: &ValidationResult{InvalidResult: validationErr},
}
}
// check if worker is in pool
workerID, err := v.poolContainsWorker(msg)
if err != nil {
resultCh <- &ValidationTaskResult{
who: validationCodeHash,
InternalError: err,
}
}
msg.ResultCh <- valErr
return
}

workerID, err := v.poolContainsWorker(msg)
if err != nil {
logger.Errorf("pool contains worker: %w", err)
intErr := &ValidationTaskResult{
who: validationCodeHash,
InternalError: err,
// submit request
validationParams := parachainruntime.ValidationParameters{
ParentHeadData: msg.PersistedValidationData.ParentHead,
BlockData: msg.PoV.BlockData,
RelayParentNumber: msg.PersistedValidationData.RelayParentNumber,
RelayParentStorageRoot: msg.PersistedValidationData.RelayParentStorageRoot,
}
msg.ResultCh <- intErr
return
}
validationParams := parachainruntime.ValidationParameters{
ParentHeadData: msg.PersistedValidationData.ParentHead,
BlockData: msg.PoV.BlockData,
RelayParentNumber: msg.PersistedValidationData.RelayParentNumber,
RelayParentStorageRoot: msg.PersistedValidationData.RelayParentStorageRoot,
}
workTask := &workerTask{
work: validationParams,
maxPoVSize: msg.PersistedValidationData.MaxPovSize,
candidateReceipt: msg.CandidateReceipt,
ResultCh: msg.ResultCh,
}
v.workerPool.submitRequest(*workerID, workTask)
workTask := &workerTask{
work: validationParams,
maxPoVSize: msg.PersistedValidationData.MaxPovSize,
candidateReceipt: msg.CandidateReceipt,
}
logger.Debugf("Working Validating worker %x", workerID)
resultWorkCh := v.workerPool.submitRequest(*workerID, workTask)

result := <-resultWorkCh
resultCh <- result
}()
return resultCh
}

func (v *ValidationHost) poolContainsWorker(msg *ValidationTask) (*parachaintypes.ValidationCodeHash, error) {
func (v *Host) poolContainsWorker(msg *ValidationTask) (*parachaintypes.ValidationCodeHash, error) {
if msg.WorkerID != nil {
return msg.WorkerID, nil
}
validationCodeHash := msg.ValidationCode.Hash()
if v.workerPool.containsWorker(validationCodeHash) {

return &validationCodeHash, nil
} else {
return v.workerPool.newValidationWorker(*msg.ValidationCode)
Expand Down
Loading

0 comments on commit a17be6b

Please sign in to comment.