Skip to content

Commit

Permalink
implement function timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardmack committed Sep 5, 2024
1 parent 3e5ac28 commit 3e6f025
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
55 changes: 50 additions & 5 deletions dot/parachain/candidate-validation/worker.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package candidatevalidation

import (
"time"

parachainruntime "github.com/ChainSafe/gossamer/dot/parachain/runtime"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
)

// TODO(ed): figure out a better name for this that describes what it does
type worker struct {
workerID parachaintypes.ValidationCodeHash
instance *parachainruntime.Instance
Expand All @@ -16,6 +17,7 @@ type workerTask struct {
work parachainruntime.ValidationParameters
maxPoVSize uint32
candidateReceipt *parachaintypes.CandidateReceipt
timeoutKind parachaintypes.PvfExecTimeoutKind
}

func newWorker(validationCode parachaintypes.ValidationCode) (*worker, error) {
Expand All @@ -31,6 +33,26 @@ func newWorker(validationCode parachaintypes.ValidationCode) (*worker, error) {
}, nil
}

type resultWithError struct {
result *parachainruntime.ValidationResult
err error
}

func determineTimeout(timeoutKind parachaintypes.PvfExecTimeoutKind) time.Duration {
value, err := timeoutKind.Value()
if err != nil {
return 2 * time.Second
}
switch value.(type) {
case parachaintypes.Backing:
return 2 * time.Second
case parachaintypes.Approval:
return 12 * time.Second
default:
return 2 * time.Second
}
}

func (w *worker) executeRequest(task *workerTask) (*ValidationResult, error) {
logger.Debugf("[EXECUTING] worker %x task %v", w.workerID, task.work)
candidateHash, err := parachaintypes.GetCandidateHash(task.candidateReceipt)
Expand All @@ -43,10 +65,33 @@ func (w *worker) executeRequest(task *workerTask) (*ValidationResult, error) {
logger.Debugf("candidate %x already processed", candidateHash)
return processed, nil
}
validationResult, err := w.instance.ValidateBlock(task.work)
if err != nil {
logger.Errorf("executing validate_block: %w", err)
reasonForInvalidity := ExecutionError

var validationResult *parachainruntime.ValidationResult
validationResultCh := make(chan (*resultWithError))
timeoutDuration := determineTimeout(task.timeoutKind)

go func() {
result, err := w.instance.ValidateBlock(task.work)
if err != nil {
validationResultCh <- &resultWithError{result: nil, err: err}
}
validationResultCh <- &resultWithError{
result: result,
}
}()

select {
case validationResultWErr := <-validationResultCh:
if validationResultWErr.err != nil {
logger.Errorf("executing validate_block: %w", err)
reasonForInvalidity := ExecutionError
return &ValidationResult{InvalidResult: &reasonForInvalidity}, nil //nolint
}
validationResult = validationResultWErr.result

case <-time.After(timeoutDuration):
logger.Errorf("validation timed out")
reasonForInvalidity := Timeout
return &ValidationResult{InvalidResult: &reasonForInvalidity}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions dot/parachain/candidate-validation/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
)

type workerPool struct {

// todo, make sure other functions work with paraID
workers map[parachaintypes.ValidationCodeHash]*worker
}

Expand Down Expand Up @@ -150,10 +148,12 @@ func (v *workerPool) submitRequest(msg *ValidationTask) (*ValidationResult, erro
RelayParentNumber: msg.PersistedValidationData.RelayParentNumber,
RelayParentStorageRoot: msg.PersistedValidationData.RelayParentStorageRoot,
}

workTask := &workerTask{
work: validationParams,
maxPoVSize: msg.PersistedValidationData.MaxPovSize,
candidateReceipt: msg.CandidateReceipt,
timeoutKind: msg.PvfExecTimeoutKind,
}
return syncWorker.executeRequest(workTask)

Expand Down

0 comments on commit 3e6f025

Please sign in to comment.