Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,39 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : ''
final requesterPays = opts.requesterPays ? ' --request-payer requester' : ''
final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}${requesterPays}"
final cmd = "trap \"{ ret=\$?; $aws ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $aws ${workDir}/${TaskRun.CMD_RUN} - | bash 2>&1 | tee ${TaskRun.CMD_LOG}"

/*
* Enhanced signal handling for AWS Batch tasks to fix nested Nextflow execution issues.
* This implementation addresses the problem of proper signal forwarding when Nextflow
* processes are executed within AWS Batch containers.
*
* References: https://github.com/nextflow-io/nextflow/pull/6414
*
* Trap command breakdown:
*
* 1. TERM signal trap: `trap \"[[ -n \\\$pid ]] && kill -TERM \\\$pid\" TERM`
* - Captures SIGTERM signals sent to the parent shell process
* - Conditionally forwards the TERM signal to the background bash process (stored in $pid)
* - The `[[ -n \\\$pid ]]` test ensures we only attempt to kill if $pid is set and non-empty
* - This prevents attempts to kill process ID 0 or empty values, which could cause unintended behavior
* - Essential for proper cleanup when AWS Batch terminates jobs or when users cancel workflows
*
* 2. EXIT signal trap: `trap \"{ ret=\$?; $aws ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT`
* - Executes cleanup actions when the shell process exits (normal or abnormal termination)
* - Captures the exit status ($?) of the last executed command before cleanup
* - Uploads the command log file to S3 for debugging and monitoring purposes
* - Uses `||true` to prevent the trap from failing if S3 upload fails (ensures exit code preservation)
* - Preserves and returns the original exit status to maintain proper error propagation
*
* 3. Background execution pattern: `bash > >(tee ${TaskRun.CMD_LOG}) 2>&1 & pid=\$!; wait \$pid`
* - Runs the actual task command in background (&) to allow signal handling
* - Redirects both stdout and stderr (2>&1) to process substitution for real-time logging
* - Uses `tee` to simultaneously write logs to file and display to console
* - Stores the background process ID in $pid for signal forwarding
* - `wait $pid` ensures the parent shell waits for task completion and returns proper exit code
* - This pattern allows the parent shell to remain responsive to signals while task executes
*/
final cmd = "trap \"[[ -n \\\$pid ]] && kill -TERM \\\$pid\" TERM; trap \"{ ret=\$?; $aws ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $aws ${workDir}/${TaskRun.CMD_RUN} - | bash > >(tee ${TaskRun.CMD_LOG}) 2>&1 & pid=\$!; wait \$pid"
return cmd
}

Expand All @@ -375,7 +407,30 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : ''
final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : ''
final requesterPays = opts.requesterPays ? ' --request-payer requester' : ''
final cmd = "trap \"{ ret=\$?; $cli cp${sse}${kms}${requesterPays} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $cli cat ${workDir}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG}"

/*
* Enhanced signal handling for AWS Batch tasks using s5cmd (high-performance S3 client).
* This implementation mirrors the s3Cmd method but uses s5cmd instead of aws-cli for
* improved S3 transfer performance.
*
* References: https://github.com/nextflow-io/nextflow/pull/6414
*
* The trap commands follow the same pattern as s3Cmd method:
*
* 1. TERM signal trap: `trap \"[[ -n \\\$pid ]] && kill -TERM \\\$pid\" TERM`
* - Ensures proper signal forwarding to background processes when SIGTERM is received
* - Critical for handling AWS Batch job termination and user-initiated cancellations
*
* 2. EXIT signal trap: `trap \"{ ret=\$?; $cli cp${sse}${kms}${requesterPays} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT`
* - Performs cleanup by uploading task logs using s5cmd instead of aws-cli
* - Maintains exit status preservation for proper error reporting
*
* 3. Background execution with s5cmd: `$cli cat ${workDir}/${TaskRun.CMD_RUN} | bash > >(tee ${TaskRun.CMD_LOG}) 2>&1 & pid=\$!; wait \$pid`
* - Uses s5cmd to stream the task script directly into bash execution
* - Maintains the same signal-responsive background execution pattern
* - Provides real-time logging while allowing proper signal handling
*/
final cmd = "trap \"[[ -n \\\$pid ]] && kill -TERM \\\$pid\" TERM; trap \"{ ret=\$?; $cli cp${sse}${kms}${requesterPays} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $cli cat ${workDir}/${TaskRun.CMD_RUN} | bash > >(tee ${TaskRun.CMD_LOG}) 2>&1 & pid=\$!; wait \$pid"
return cmd
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ class AwsBatchTaskHandlerTest extends Specification {
then:
executor.getAwsOptions()>> Mock(AwsOptions) { getAwsCli() >> 'aws' }
then:
result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; aws s3 cp --only-show-errors .command.log s3://work/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors s3://work/.command.run - | bash 2>&1 | tee .command.log'
result.join(' ') == 'bash -o pipefail -c trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; aws s3 cp --only-show-errors .command.log s3://work/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors s3://work/.command.run - | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid'

when:
result = handler.getSubmitCommand()
Expand All @@ -942,7 +942,7 @@ class AwsBatchTaskHandlerTest extends Specification {
getStorageKmsKeyId() >> 'kms-key-123'
}
then:
result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug .command.log s3://work/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug s3://work/.command.run - | bash 2>&1 | tee .command.log'
result.join(' ') == 'bash -o pipefail -c trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug .command.log s3://work/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug s3://work/.command.run - | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid'
}

def 'should render submit command with s5cmd' () {
Expand All @@ -959,7 +959,7 @@ class AwsBatchTaskHandlerTest extends Specification {
then:
executor.getAwsOptions() >> Mock(AwsOptions) { getS5cmdPath() >> 's5cmd' }
then:
result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; s5cmd cp .command.log s3://work/.command.log||true; exit $ret; }" EXIT; s5cmd cat s3://work/.command.run | bash 2>&1 | tee .command.log'
result.join(' ') == 'bash -o pipefail -c trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; s5cmd cp .command.log s3://work/.command.log||true; exit $ret; }" EXIT; s5cmd cat s3://work/.command.run | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid'

when:
result = handler.getSubmitCommand()
Expand All @@ -970,7 +970,7 @@ class AwsBatchTaskHandlerTest extends Specification {
getStorageKmsKeyId() >> 'kms-key-123'
}
then:
result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; s5cmd --debug cp --sse aws:kms --sse-kms-key-id kms-key-123 .command.log s3://work/.command.log||true; exit $ret; }" EXIT; s5cmd --debug cat s3://work/.command.run | bash 2>&1 | tee .command.log'
result.join(' ') == 'bash -o pipefail -c trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; s5cmd --debug cp --sse aws:kms --sse-kms-key-id kms-key-123 .command.log s3://work/.command.log||true; exit $ret; }" EXIT; s5cmd --debug cat s3://work/.command.run | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid'

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ class AwsBatchExecutorTest extends Specification {

where:
FUSION | DEFAULT_FS | S5CMD | TASK_DIR | EXPECTED
false | false | false | 's3://foo/work/dir' | 'bash -o pipefail -c \'trap "{ ret=$?; aws s3 cp --only-show-errors .command.log s3://foo/work/dir/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors s3://foo/work/dir/.command.run - | bash 2>&1 | tee .command.log\''
false | false | true | 's3://foo/work/dir' | 'bash -o pipefail -c \'trap "{ ret=$?; s5cmd cp .command.log s3://foo/work/dir/.command.log||true; exit $ret; }" EXIT; s5cmd cat s3://foo/work/dir/.command.run | bash 2>&1 | tee .command.log\''
false | false | false | 's3://foo/work/dir' | 'bash -o pipefail -c \'trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; aws s3 cp --only-show-errors .command.log s3://foo/work/dir/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors s3://foo/work/dir/.command.run - | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid\''
false | false | true | 's3://foo/work/dir' | 'bash -o pipefail -c \'trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; s5cmd cp .command.log s3://foo/work/dir/.command.log||true; exit $ret; }" EXIT; s5cmd cat s3://foo/work/dir/.command.run | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid\''
and:
true | false | false | '/fusion/work/dir' | 'bash /fusion/work/dir/.command.run'
false | true | false | '/nfs/work/dir' | 'bash /nfs/work/dir/.command.run 2>&1 > /nfs/work/dir/.command.log'
Expand Down