Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/parachain): create worker pool for PVF host #4101

Merged
merged 30 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
50cc3ab
add pvf worker pool skeleton
edwardmack Jul 10, 2024
3427da0
add test to validation host
edwardmack Jul 19, 2024
6537104
create worker pool skeleton for PVF host
edwardmack Jul 23, 2024
7e314f3
add validation logic to workers
edwardmack Aug 6, 2024
0b2b5fc
address lint issues.
edwardmack Aug 6, 2024
20fd4c4
add functionality for validate from chainstate to pvf host and tests
edwardmack Aug 10, 2024
6c05d82
add updated mock files
edwardmack Aug 10, 2024
a6b2652
clean-up unused comments
edwardmack Aug 12, 2024
e978c38
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Aug 16, 2024
0655dd3
address merge conflicts
edwardmack Aug 16, 2024
674fefa
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Aug 21, 2024
01905af
resolve merge conflicts
edwardmack Aug 21, 2024
afde6fd
refactor ValidationHost to Host, and validationWorkerPool to workerPool
edwardmack Aug 21, 2024
0f25072
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Aug 23, 2024
dbf3cf8
refactor go concurrancy
edwardmack Aug 28, 2024
c95c3e7
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Aug 29, 2024
79ba043
revert run to match interface
edwardmack Aug 29, 2024
ce721e7
remove redundant structs for simplicty, removed go routines to simpli…
edwardmack Aug 29, 2024
48faacd
removed pvf package, simplified code removed concurrency
edwardmack Aug 29, 2024
391c6b4
added missing mock file
edwardmack Aug 29, 2024
7988888
address deep source comments
edwardmack Aug 29, 2024
5820e3f
address PR comments
edwardmack Sep 3, 2024
3e5ac28
make private functions/structs private
edwardmack Sep 4, 2024
020fb6f
refactor submitRequest to executeRequest
edwardmack Sep 6, 2024
b8bbeb2
address PR comments
edwardmack Sep 11, 2024
25d95bd
address PR comments, rename variables and functions
edwardmack Sep 13, 2024
6677479
fix test
edwardmack Sep 13, 2024
9429632
remove printf from debugging
edwardmack Sep 13, 2024
f869cba
address PR comments
edwardmack Sep 17, 2024
d71e539
Merge branch 'feat/parachain' into ed/feat/candidate_validation_pvf_h…
edwardmack Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dot/parachain/backing/candidate_backing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ func TestValidateAndMakeAvailable(t *testing.T) {
ci := candidatevalidation.ExecutionError
data.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
InvalidResult: &ci,
Invalid: &ci,
},
}
default:
Expand Down Expand Up @@ -768,7 +768,7 @@ func TestValidateAndMakeAvailable(t *testing.T) {
case candidatevalidation.ValidateFromExhaustive:
data.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
ValidResult: &candidatevalidation.ValidValidationResult{},
Valid: &candidatevalidation.Valid{},
},
}
case availabilitystore.StoreAvailableData:
Expand Down
4 changes: 2 additions & 2 deletions dot/parachain/backing/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func validResponseForValidateFromExhaustive(

msgValidate.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
ValidResult: &candidatevalidation.ValidValidationResult{
Valid: &candidatevalidation.Valid{
CandidateCommitments: parachaintypes.CandidateCommitments{
HeadData: headData,
UpwardMessages: []parachaintypes.UpwardMessage{},
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestSecondsValidCandidate(t *testing.T) {
badReturn := candidatevalidation.BadReturn
validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
InvalidResult: &badReturn,
Invalid: &badReturn,
},
}
return true
Expand Down
8 changes: 4 additions & 4 deletions dot/parachain/backing/per_relay_parent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ func (rpState *perRelayParentState) validateAndMakeAvailable(
bgValidationResult = backgroundValidationResult{
outputs: &backgroundValidationOutputs{
candidateReceipt: candidateReceipt,
candidateCommitments: validationResultRes.Data.ValidResult.CandidateCommitments,
persistedValidationData: validationResultRes.Data.ValidResult.PersistedValidationData,
candidateCommitments: validationResultRes.Data.Valid.CandidateCommitments,
persistedValidationData: validationResultRes.Data.Valid.PersistedValidationData,
},
candidate: nil,
err: nil,
Expand All @@ -358,11 +358,11 @@ func (rpState *perRelayParentState) validateAndMakeAvailable(
}

} else { // Invalid
logger.Error(validationResultRes.Data.InvalidResult.Error())
logger.Error(validationResultRes.Data.Invalid.Error())
bgValidationResult = backgroundValidationResult{
outputs: nil,
candidate: &candidateReceipt,
err: fmt.Errorf(validationResultRes.Data.InvalidResult.Error()),
err: fmt.Errorf(validationResultRes.Data.Invalid.Error()),
}
}

Expand Down
195 changes: 43 additions & 152 deletions dot/parachain/candidate-validation/candidate_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,15 @@ import (

parachainruntime "github.com/ChainSafe/gossamer/dot/parachain/runtime"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/ChainSafe/gossamer/pkg/scale"
)

var logger = log.NewFromGlobal(log.AddContext("pkg", "parachain-candidate-validation"))

var (
ErrValidationCodeMismatch = errors.New("validation code hash does not match")
ErrValidationInputOverLimit = errors.New("validation input is over the limit")
)

// CandidateValidation is a parachain subsystem that validates candidate parachain blocks
type CandidateValidation struct {
SubsystemToOverseer chan<- any
ValidationHost parachainruntime.ValidationHost
BlockState BlockState
pvfHost *host // pvfHost is the host for the parachain validation function
}

type BlockState interface {
Expand All @@ -38,6 +29,7 @@ type BlockState interface {
func NewCandidateValidation(overseerChan chan<- any, blockState BlockState) *CandidateValidation {
candidateValidation := CandidateValidation{
SubsystemToOverseer: overseerChan,
pvfHost: newValidationHost(),
BlockState: blockState,
}
return &candidateValidation
Expand All @@ -48,7 +40,6 @@ func (cv *CandidateValidation) Run(ctx context.Context, overseerToSubsystem <-ch
for {
select {
case msg := <-overseerToSubsystem:
logger.Debugf("received message %v", msg)
cv.processMessage(msg)
case <-ctx.Done():
if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -77,36 +68,25 @@ func (*CandidateValidation) ProcessBlockFinalizedSignal(parachaintypes.BlockFina
}

// Stop stops the CandidateValidation subsystem
func (cv *CandidateValidation) Stop() {
func (*CandidateValidation) Stop() {
}

// processMessage processes messages sent to the CandidateValidation subsystem
func (cv *CandidateValidation) processMessage(msg any) {
switch msg := msg.(type) {
case ValidateFromChainState:
runtimeInstance, err := cv.BlockState.GetRuntime(msg.CandidateReceipt.Descriptor.RelayParent)
if err != nil {
logger.Errorf("failed to get runtime: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: err,
}
return
}
result, err := validateFromChainState(runtimeInstance, msg.Pov, msg.CandidateReceipt)
if err != nil {
logger.Errorf("failed to validate from chain state: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: err,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Data: *result,
}
cv.validateFromChainState(msg)
case ValidateFromExhaustive:
validationTask := &ValidationTask{
PersistedValidationData: msg.PersistedValidationData,
ValidationCode: &msg.ValidationCode,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.PoV,
ExecutorParams: msg.ExecutorParams,
PvfExecTimeoutKind: msg.PvfExecTimeoutKind,
}

case ValidateFromExhaustive:
result, err := validateFromExhaustive(cv.ValidationHost, msg.PersistedValidationData,
msg.ValidationCode, msg.CandidateReceipt, msg.PoV)
result, err := cv.pvfHost.validate(validationTask)
if err != nil {
logger.Errorf("failed to validate from exhaustive: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Expand Down Expand Up @@ -171,134 +151,45 @@ func getValidationData(runtimeInstance parachainruntime.RuntimeInstance, paraID
return nil, nil, fmt.Errorf("getting persisted validation data: %w", mergedError)
}

// validateFromChainState validates a candidate parachain block with provided parameters using relay-chain
// state and using the parachain runtime.
func validateFromChainState(runtimeInstance parachainruntime.RuntimeInstance, pov parachaintypes.PoV,
candidateReceipt parachaintypes.CandidateReceipt) (
*ValidationResult, error) {

persistedValidationData, validationCode, err := getValidationData(runtimeInstance,
candidateReceipt.Descriptor.ParaID)
// validateFromChainState validates a parachain block from chain state message
func (cv *CandidateValidation) validateFromChainState(msg ValidateFromChainState) {
runtimeInstance, err := cv.BlockState.GetRuntime(msg.CandidateReceipt.Descriptor.RelayParent)
if err != nil {
return nil, fmt.Errorf("getting validation data: %w", err)
}

parachainRuntimeInstance, err := parachainruntime.SetupVM(*validationCode)
if err != nil {
return nil, fmt.Errorf("setting up VM: %w", err)
}

validationResults, err := validateFromExhaustive(parachainRuntimeInstance, *persistedValidationData,
*validationCode, candidateReceipt, pov)
if err != nil {
return nil, fmt.Errorf("validating from exhaustive: %w", err)
}
return validationResults, nil
}

// validateFromExhaustive validates a candidate parachain block with provided parameters
func validateFromExhaustive(validationHost parachainruntime.ValidationHost,
persistedValidationData parachaintypes.PersistedValidationData,
validationCode parachaintypes.ValidationCode,
candidateReceipt parachaintypes.CandidateReceipt, pov parachaintypes.PoV) (
*ValidationResult, error) {

validationCodeHash := validationCode.Hash()
// basic checks
validationErr, internalErr := performBasicChecks(&candidateReceipt.Descriptor, persistedValidationData.MaxPovSize,
pov,
validationCodeHash)
if internalErr != nil {
return nil, fmt.Errorf("performing basic checks: %w", internalErr)
}

if validationErr != nil {
validationResult := &ValidationResult{
InvalidResult: validationErr,
logger.Errorf("getting runtime instance: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: fmt.Errorf("getting runtime instance: %w", err),
}
return validationResult, nil //nolint: nilerr
}

validationParams := parachainruntime.ValidationParameters{
ParentHeadData: persistedValidationData.ParentHead,
BlockData: pov.BlockData,
RelayParentNumber: persistedValidationData.RelayParentNumber,
RelayParentStorageRoot: persistedValidationData.RelayParentStorageRoot,
}

validationResult, err := validationHost.ValidateBlock(validationParams)
// TODO: implement functionality to parse errors generated by the runtime when PVF host is implemented, issue #3934
if err != nil {
return nil, fmt.Errorf("executing validate_block: %w", err)
return
}

headDataHash, err := validationResult.HeadData.Hash()
if err != nil {
return nil, fmt.Errorf("hashing head data: %w", err)
}

if headDataHash != candidateReceipt.Descriptor.ParaHead {
ci := ParaHeadHashMismatch
return &ValidationResult{InvalidResult: &ci}, nil
}
candidateCommitments := parachaintypes.CandidateCommitments{
UpwardMessages: validationResult.UpwardMessages,
HorizontalMessages: validationResult.HorizontalMessages,
NewValidationCode: validationResult.NewValidationCode,
HeadData: validationResult.HeadData,
ProcessedDownwardMessages: validationResult.ProcessedDownwardMessages,
HrmpWatermark: validationResult.HrmpWatermark,
}

// if validation produced a new set of commitments, we treat the candidate as invalid
if candidateReceipt.CommitmentsHash != candidateCommitments.Hash() {
ci := CommitmentsHashMismatch
return &ValidationResult{InvalidResult: &ci}, nil
}
return &ValidationResult{
ValidResult: &ValidValidationResult{
CandidateCommitments: candidateCommitments,
PersistedValidationData: persistedValidationData,
},
}, nil

}

// performBasicChecks Does basic checks of a candidate. Provide the encoded PoV-block.
// Returns ReasonForInvalidity and internal error if any.
func performBasicChecks(candidate *parachaintypes.CandidateDescriptor, maxPoVSize uint32,
pov parachaintypes.PoV, validationCodeHash parachaintypes.ValidationCodeHash) (validationError *ReasonForInvalidity,
internalError error) {
povHash, err := pov.Hash()
if err != nil {
return nil, fmt.Errorf("hashing PoV: %w", err)
}

encodedPoV, err := scale.Marshal(pov)
persistedValidationData, validationCode, err := getValidationData(runtimeInstance,
msg.CandidateReceipt.Descriptor.ParaID)
if err != nil {
return nil, fmt.Errorf("encoding PoV: %w", err)
}
encodedPoVSize := uint32(len(encodedPoV))

if encodedPoVSize > maxPoVSize {
ci := ParamsTooLarge
return &ci, nil
}

if povHash != candidate.PovHash {
ci := PoVHashMismatch
return &ci, nil
logger.Errorf("getting validation data: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: fmt.Errorf("getting validation data: %w", err),
}
return
}

if validationCodeHash != candidate.ValidationCodeHash {
ci := CodeHashMismatch
return &ci, nil
validationTask := &ValidationTask{
PersistedValidationData: *persistedValidationData,
ValidationCode: validationCode,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.Pov,
ExecutorParams: msg.ExecutorParams,
// todo: implement PvfExecTimeoutKind, so that validate can be called with a timeout see issue: #3429
PvfExecTimeoutKind: parachaintypes.PvfExecTimeoutKind{},
}

err = candidate.CheckCollatorSignature()
result, err := cv.pvfHost.validate(validationTask)
if err != nil {
ci := BadSignature
return &ci, nil
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: err,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Data: *result,
}
}
return nil, nil
}
Loading
Loading