Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/parachain): create worker pool for PVF host #4101

Merged
merged 30 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
50cc3ab
add pvf worker pool skeleton
edwardmack Jul 10, 2024
3427da0
add test to validation host
edwardmack Jul 19, 2024
6537104
create worker pool skeleton for PVF host
edwardmack Jul 23, 2024
7e314f3
add validation logic to workers
edwardmack Aug 6, 2024
0b2b5fc
address lint issues.
edwardmack Aug 6, 2024
20fd4c4
add functionality for validate from chainstate to pvf host and tests
edwardmack Aug 10, 2024
6c05d82
add updated mock files
edwardmack Aug 10, 2024
a6b2652
clean-up unused comments
edwardmack Aug 12, 2024
e978c38
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Aug 16, 2024
0655dd3
address merge conflicts
edwardmack Aug 16, 2024
674fefa
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Aug 21, 2024
01905af
resolve merge conflicts
edwardmack Aug 21, 2024
afde6fd
refactor ValidationHost to Host, and validationWorkerPool to workerPool
edwardmack Aug 21, 2024
0f25072
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Aug 23, 2024
dbf3cf8
refactor go concurrancy
edwardmack Aug 28, 2024
c95c3e7
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Aug 29, 2024
79ba043
revert run to match interface
edwardmack Aug 29, 2024
ce721e7
remove redundant structs for simplicty, removed go routines to simpli…
edwardmack Aug 29, 2024
48faacd
removed pvf package, simplified code removed concurrency
edwardmack Aug 29, 2024
391c6b4
added missing mock file
edwardmack Aug 29, 2024
7988888
address deep source comments
edwardmack Aug 29, 2024
5820e3f
address PR comments
edwardmack Sep 3, 2024
3e5ac28
make private functions/structs private
edwardmack Sep 4, 2024
020fb6f
refactor submitRequest to executeRequest
edwardmack Sep 6, 2024
b8bbeb2
address PR comments
edwardmack Sep 11, 2024
25d95bd
address PR comments, rename variables and functions
edwardmack Sep 13, 2024
6677479
fix test
edwardmack Sep 13, 2024
9429632
remove printf from debugging
edwardmack Sep 13, 2024
f869cba
address PR comments
edwardmack Sep 17, 2024
d71e539
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions dot/parachain/backing/candidate_backing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
availabilitystore "github.com/ChainSafe/gossamer/dot/parachain/availability-store"
candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
"github.com/ChainSafe/gossamer/dot/parachain/pvf"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
Expand Down Expand Up @@ -692,7 +693,7 @@ func TestValidateAndMakeAvailable(t *testing.T) {
for data := range ch {
switch data := data.(type) {
case candidatevalidation.ValidateFromExhaustive:
data.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
data.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Err: errors.New("mock error getting validation result"),
}
default:
Expand Down Expand Up @@ -728,9 +729,9 @@ func TestValidateAndMakeAvailable(t *testing.T) {
for data := range ch {
switch data := data.(type) {
case candidatevalidation.ValidateFromExhaustive:
ci := candidatevalidation.ExecutionError
data.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
ci := pvf.ExecutionError
data.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: pvf.ValidationResult{
InvalidResult: &ci,
},
}
Expand Down Expand Up @@ -766,9 +767,9 @@ func TestValidateAndMakeAvailable(t *testing.T) {
for data := range ch {
switch data := data.(type) {
case candidatevalidation.ValidateFromExhaustive:
data.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
ValidResult: &candidatevalidation.ValidValidationResult{},
data.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: pvf.ValidationResult{
ValidResult: &pvf.ValidValidationResult{},
},
}
case availabilitystore.StoreAvailableData:
Expand Down
21 changes: 11 additions & 10 deletions dot/parachain/backing/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
"github.com/ChainSafe/gossamer/dot/parachain/overseer"
"github.com/ChainSafe/gossamer/dot/parachain/pvf"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto"
Expand Down Expand Up @@ -286,9 +287,9 @@ func TestSecondsValidCandidate(t *testing.T) {
return false
}

badReturn := candidatevalidation.BadReturn
validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
badReturn := pvf.BadReturn
validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: pvf.ValidationResult{
InvalidResult: &badReturn,
},
}
Expand Down Expand Up @@ -350,9 +351,9 @@ func TestSecondsValidCandidate(t *testing.T) {
return false
}

validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
ValidResult: &candidatevalidation.ValidValidationResult{
validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: pvf.ValidationResult{
ValidResult: &pvf.ValidValidationResult{
CandidateCommitments: parachaintypes.CandidateCommitments{
UpwardMessages: []parachaintypes.UpwardMessage{},
HorizontalMessages: []parachaintypes.OutboundHrmpMessage{},
Expand Down Expand Up @@ -525,9 +526,9 @@ func TestCandidateReachesQuorum(t *testing.T) {
return false
}

msgValidate.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
ValidResult: &candidatevalidation.ValidValidationResult{
msgValidate.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: pvf.ValidationResult{
ValidResult: &pvf.ValidValidationResult{
CandidateCommitments: parachaintypes.CandidateCommitments{
HeadData: headData,
UpwardMessages: []parachaintypes.UpwardMessage{},
Expand Down Expand Up @@ -743,7 +744,7 @@ func TestValidationFailDoesNotStopSubsystem(t *testing.T) {
return false
}

msgValidate.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
msgValidate.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Err: errors.New("some internal error"),
}
return true
Expand Down
3 changes: 2 additions & 1 deletion dot/parachain/backing/per_relay_parent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
availabilitystore "github.com/ChainSafe/gossamer/dot/parachain/availability-store"
candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
"github.com/ChainSafe/gossamer/dot/parachain/pvf"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/lib/runtime"
wazero_runtime "github.com/ChainSafe/gossamer/lib/runtime/wazero"
Expand Down Expand Up @@ -297,7 +298,7 @@ func (rpState *perRelayParentState) validateAndMakeAvailable(
return fmt.Errorf("setting pvfExecTimeoutKind: %w", err)
}

chValidationResultRes := make(chan parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult])
chValidationResultRes := make(chan parachaintypes.OverseerFuncRes[pvf.ValidationResult])
subSystemToOverseer <- candidatevalidation.ValidateFromExhaustive{
PersistedValidationData: pvd,
ValidationCode: *validationCode,
Expand Down
179 changes: 65 additions & 114 deletions dot/parachain/candidate-validation/candidate_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"sync"

"github.com/ChainSafe/gossamer/dot/parachain/pvf"
parachainruntime "github.com/ChainSafe/gossamer/dot/parachain/runtime"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/internal/log"
Expand All @@ -31,8 +32,8 @@ type CandidateValidation struct {

SubsystemToOverseer chan<- any
OverseerToSubsystem <-chan any
ValidationHost parachainruntime.ValidationHost
BlockState BlockState
pvfHost *pvf.ValidationHost
}

type BlockState interface {
Expand All @@ -43,6 +44,7 @@ type BlockState interface {
func NewCandidateValidation(overseerChan chan<- any, blockState BlockState) *CandidateValidation {
candidateValidation := CandidateValidation{
SubsystemToOverseer: overseerChan,
pvfHost: pvf.NewValidationHost(),
BlockState: blockState,
}
return &candidateValidation
Expand All @@ -51,6 +53,7 @@ func NewCandidateValidation(overseerChan chan<- any, blockState BlockState) *Can
// Run starts the CandidateValidation subsystem
func (cv *CandidateValidation) Run(context.Context, chan any, chan any) {
cv.wg.Add(1)
go cv.pvfHost.Start()
go cv.processMessages(&cv.wg)
}

Expand All @@ -73,6 +76,7 @@ func (*CandidateValidation) ProcessBlockFinalizedSignal(parachaintypes.BlockFina

// Stop stops the CandidateValidation subsystem
func (cv *CandidateValidation) Stop() {
cv.pvfHost.Stop()
close(cv.stopChan)
cv.wg.Wait()
}
Expand All @@ -86,35 +90,30 @@ func (cv *CandidateValidation) processMessages(wg *sync.WaitGroup) {
logger.Debugf("received message %v", msg)
switch msg := msg.(type) {
case ValidateFromChainState:
runtimeInstance, err := cv.BlockState.GetRuntime(msg.CandidateReceipt.Descriptor.RelayParent)
if err != nil {
logger.Errorf("failed to get runtime: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: err,
}
}
result, err := validateFromChainState(runtimeInstance, msg.Pov, msg.CandidateReceipt)
if err != nil {
logger.Errorf("failed to validate from chain state: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: err,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Data: *result,
}
}
cv.validateFromChainState(msg)

case ValidateFromExhaustive:
result, err := validateFromExhaustive(cv.ValidationHost, msg.PersistedValidationData,
msg.ValidationCode, msg.CandidateReceipt, msg.PoV)
if err != nil {
logger.Errorf("failed to validate from exhaustive: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: err,
taskResult := make(chan *pvf.ValidationTaskResult)
validationTask := &pvf.ValidationTask{
PersistedValidationData: msg.PersistedValidationData,
ValidationCode: &msg.ValidationCode,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.PoV,
ExecutorParams: msg.ExecutorParams,
PvfExecTimeoutKind: msg.PvfExecTimeoutKind,
ResultCh: taskResult,
}
go cv.pvfHost.Validate(validationTask)

result := <-taskResult
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
if result.InternalError != nil {
logger.Errorf("failed to validate from exhaustive: %w", result.InternalError)
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Err: result.InternalError,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Data: *result,
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: *result.Result,
}
}

Expand Down Expand Up @@ -176,104 +175,56 @@ func getValidationData(runtimeInstance parachainruntime.RuntimeInstance, paraID
return nil, nil, fmt.Errorf("getting persisted validation data: %w", mergedError)
}

// validateFromChainState validates a candidate parachain block with provided parameters using relay-chain
// state and using the parachain runtime.
func validateFromChainState(runtimeInstance parachainruntime.RuntimeInstance, pov parachaintypes.PoV,
candidateReceipt parachaintypes.CandidateReceipt) (
*ValidationResult, error) {

persistedValidationData, validationCode, err := getValidationData(runtimeInstance,
candidateReceipt.Descriptor.ParaID)
if err != nil {
return nil, fmt.Errorf("getting validation data: %w", err)
}

parachainRuntimeInstance, err := parachainruntime.SetupVM(*validationCode)
if err != nil {
return nil, fmt.Errorf("setting up VM: %w", err)
}

validationResults, err := validateFromExhaustive(parachainRuntimeInstance, *persistedValidationData,
*validationCode, candidateReceipt, pov)
func (cv *CandidateValidation) validateFromChainState(msg ValidateFromChainState) {
runtimeInstance, err := cv.BlockState.GetRuntime(msg.CandidateReceipt.Descriptor.RelayParent)
if err != nil {
return nil, fmt.Errorf("validating from exhaustive: %w", err)
}
return validationResults, nil
}

// validateFromExhaustive validates a candidate parachain block with provided parameters
func validateFromExhaustive(validationHost parachainruntime.ValidationHost,
persistedValidationData parachaintypes.PersistedValidationData,
validationCode parachaintypes.ValidationCode,
candidateReceipt parachaintypes.CandidateReceipt, pov parachaintypes.PoV) (
*ValidationResult, error) {

validationCodeHash := validationCode.Hash()
// basic checks
validationErr, internalErr := performBasicChecks(&candidateReceipt.Descriptor, persistedValidationData.MaxPovSize,
pov,
validationCodeHash)
if internalErr != nil {
return nil, fmt.Errorf("performing basic checks: %w", internalErr)
}

if validationErr != nil {
validationResult := &ValidationResult{
InvalidResult: validationErr,
logger.Errorf("getting runtime instance: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Err: err,
}
return validationResult, nil //nolint: nilerr
}

validationParams := parachainruntime.ValidationParameters{
ParentHeadData: persistedValidationData.ParentHead,
BlockData: pov.BlockData,
RelayParentNumber: persistedValidationData.RelayParentNumber,
RelayParentStorageRoot: persistedValidationData.RelayParentStorageRoot,
}

validationResult, err := validationHost.ValidateBlock(validationParams)
// TODO: implement functionality to parse errors generated by the runtime when PVF host is implemented, issue #3934
if err != nil {
return nil, fmt.Errorf("executing validate_block: %w", err)
return
}

headDataHash, err := validationResult.HeadData.Hash()
persistedValidationData, validationCode, err := getValidationData(runtimeInstance,
msg.CandidateReceipt.Descriptor.ParaID)
if err != nil {
return nil, fmt.Errorf("hashing head data: %w", err)
logger.Errorf("getting validation data: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Err: err,
}
return
}

if headDataHash != candidateReceipt.Descriptor.ParaHead {
ci := ParaHeadHashMismatch
return &ValidationResult{InvalidResult: &ci}, nil
}
candidateCommitments := parachaintypes.CandidateCommitments{
UpwardMessages: validationResult.UpwardMessages,
HorizontalMessages: validationResult.HorizontalMessages,
NewValidationCode: validationResult.NewValidationCode,
HeadData: validationResult.HeadData,
ProcessedDownwardMessages: validationResult.ProcessedDownwardMessages,
HrmpWatermark: validationResult.HrmpWatermark,
taskResult := make(chan *pvf.ValidationTaskResult)
validationTask := &pvf.ValidationTask{
PersistedValidationData: *persistedValidationData,
ValidationCode: validationCode,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.Pov,
ExecutorParams: msg.ExecutorParams,
PvfExecTimeoutKind: parachaintypes.PvfExecTimeoutKind{},
ResultCh: taskResult,
}
go cv.pvfHost.Validate(validationTask)

// if validation produced a new set of commitments, we treat the candidate as invalid
if candidateReceipt.CommitmentsHash != candidateCommitments.Hash() {
ci := CommitmentsHashMismatch
return &ValidationResult{InvalidResult: &ci}, nil
result := <-taskResult
if result.InternalError != nil {
logger.Errorf("failed to validate from chain state: %w", result.InternalError)
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Err: result.InternalError,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: *result.Result,
}
}
return &ValidationResult{
ValidResult: &ValidValidationResult{
CandidateCommitments: candidateCommitments,
PersistedValidationData: persistedValidationData,
},
}, nil

}

// performBasicChecks Does basic checks of a candidate. Provide the encoded PoV-block.
// Returns ReasonForInvalidity and internal error if any.
func performBasicChecks(candidate *parachaintypes.CandidateDescriptor, maxPoVSize uint32,
pov parachaintypes.PoV, validationCodeHash parachaintypes.ValidationCodeHash) (validationError *ReasonForInvalidity,
internalError error) {
pov parachaintypes.PoV, validationCodeHash parachaintypes.ValidationCodeHash) (validationError *pvf.
ReasonForInvalidity, internalError error) {
povHash, err := pov.Hash()
if err != nil {
return nil, fmt.Errorf("hashing PoV: %w", err)
Expand All @@ -286,23 +237,23 @@ func performBasicChecks(candidate *parachaintypes.CandidateDescriptor, maxPoVSiz
encodedPoVSize := uint32(len(encodedPoV))

if encodedPoVSize > maxPoVSize {
ci := ParamsTooLarge
ci := pvf.ParamsTooLarge
return &ci, nil
}

if povHash != candidate.PovHash {
ci := PoVHashMismatch
ci := pvf.PoVHashMismatch
return &ci, nil
}

if validationCodeHash != candidate.ValidationCodeHash {
ci := CodeHashMismatch
ci := pvf.CodeHashMismatch
return &ci, nil
}

err = candidate.CheckCollatorSignature()
if err != nil {
ci := BadSignature
ci := pvf.BadSignature
return &ci, nil
}
return nil, nil
Expand Down
Loading
Loading