-
Notifications
You must be signed in to change notification settings - Fork 753
Description
Currently, TaskArrayCollector submits a job array only when the array reaches its configured size. In pipelines with slower task generation or variable workloads, small batches may remain in memory indefinitely if the array never fills, delaying execution. This is especially a problem in pipelines with hanging channels, like channels made by watchPath. We have a few pipelines that run forever, watching a samplesheet for new entries, and processing them as they come in. We would like to use array jobs to reduce strain on the slurm scheduler as much as possible as this has been a problem on our cluster.
Below is a demonstration of the issue:
#!/usr/bin/env nextflow
params.array_size = 10
params.sleep_time = 30
params.outdir = "results"
params.input_dir = "input_files"
params.partition = 'testpartition'
process DUMMY_TASK {
executor 'slurm'
queue 'testpartition'
array params.array_size
cpus 1
memory '1 GB'
time '5m'
debug true
tag "task_${task.index}"
input:
path input_file
output:
path "${input_file.baseName}_output.txt"
script:
"""
echo "SLURM_ARRAY_JOB_ID: \${SLURM_ARRAY_JOB_ID:-'not_set'}" >> ${input_file.baseName}_output.txt
# Simulate processing
sleep ${params.sleep_time}
"""
}
workflow {
input_files = Channel.watchPath("${params.input_dir}/*.txt")
DUMMY_TASK(input_files)
DUMMY_TASK.out.view { file ->
"[${new Date().format('HH:mm:ss')}] Processed: ${file.name}"
}
}
Then run
for i in {1..16};do echo "Input data for task $i" > input_files/task_${i}.txt && echo "Created input_files/task_${i}.txt"; sleep .5;done
In the same directory.
Currently nextflow will hang at
[be/e4bc7e] process > DUMMY_TASK (task_15) [100%] 10 of 10
There should be 16 jobs as we created 16 files. It is waiting for 4 more tasks to reach the array size of 10 before it submits the array job. If these 4 never come, then these 6 tasks will never get submitted as jobs.
My proposal is to create a maximum amount of time to wait for more jobs before submitting the array of jobs. Then the pipeline would work like this:
The first 10 jobs are submitted as an array job as expected. The next 6 jobs are added into the task array. Then, the time specified by executorSubmitTimeout elapses. At this point the remaining 6 jobs are put into a job array and submitted to the cluster.
My WIP fork currently has this implementation:
https://github.com/johnoooh/nextflow/blob/feature/slurm_array_timer/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy
It still needs a few changes. The parameter needs to be added to the config.
I know this is a pretty specific issue, but I don't think it's an unreasonable addition. Let me know if you foresee any issues with this approach?
This discussion was started here #5924