diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy index c2fa04cb63..eb1c8f71c5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy @@ -735,7 +735,8 @@ class BashWrapperBuilder { if( fixOwnership() ) builder.addEnv( 'NXF_OWNER=$(id -u):$(id -g)' ) - for( String var : containerConfig.getEnvWhitelist() ) { + final envWhitelist = new ArrayList<>(containerConfig.getEnvWhitelist()) + for( String var : envWhitelist ) { builder.addEnv(var) } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorConfig.groovy index 073be5e905..5aceecd97d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorConfig.groovy @@ -39,6 +39,12 @@ class ExecutorConfig implements ConfigScope { """) final String account + @ConfigOption + @Description(""" + Determines how long to wait for a job array to fill to the specified array size before submitting a partial array (default: `60 min`). + """) + final Duration arrayTimeout + @ConfigOption @Description(""" *Used only by the local executor.* @@ -141,7 +147,7 @@ class ExecutorConfig implements ConfigScope { Determines how often to fetch the queue status from the scheduler (default: `1 min`). """) final Duration queueStatInterval - + @Description(""" The `executor.retry` scope controls the behavior of retrying failed job submissions. @@ -162,6 +168,7 @@ class ExecutorConfig implements ConfigScope { ExecutorConfig(Map opts) { account = opts.account + arrayTimeout = opts.arrayTimeout as Duration ?: Duration.of('60min') cpus = opts.cpus as Integer dumpInterval = opts.dumpInterval as Duration ?: Duration.of('5min') exitReadTimeout = opts.exitReadTimeout as Duration ?: Duration.of('270sec') @@ -183,6 +190,10 @@ class ExecutorConfig implements ConfigScope { this.opts = opts } + Duration getArrayTimeout(String execName) { + getExecConfigProp(execName, 'arrayTimeout', null) as Duration + } + Duration getExitReadTimeout(String execName) { getExecConfigProp(execName, 'exitReadTimeout', null) as Duration } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy index 18c15e2897..660945bf33 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy @@ -17,6 +17,10 @@ package nextflow.processor import java.nio.file.Files +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock @@ -65,25 +69,32 @@ class TaskArrayCollector { private int arraySize + private long arrayTimeoutMillis + + private ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor() + + private ScheduledFuture timeoutFuture + private Lock sync = new ReentrantLock() private List array private boolean closed = false - TaskArrayCollector(TaskProcessor processor, Executor executor, int arraySize) { + TaskArrayCollector(TaskProcessor processor, Executor executor, int arraySize, long arrayTimeoutMillis) { if( executor !instanceof TaskArrayExecutor ) throw new IllegalArgumentException("Executor '${executor.name}' does not support job arrays") this.processor = processor this.executor = (TaskArrayExecutor)executor this.arraySize = arraySize + this.arrayTimeoutMillis = arrayTimeoutMillis this.array = new ArrayList<>(arraySize) } /** * Add a task to the current array, and submit the array when it - * reaches the desired size. + * reaches the desired size or the max interval has passed. * * @param task */ @@ -97,13 +108,38 @@ class TaskArrayCollector { return } - // add task to the array + // reset timeout on first task of job array + if( array.isEmpty() ) { + final action = { submitAfterTimeout() } + this.timeoutFuture = timeoutScheduler.schedule(action, arrayTimeoutMillis, TimeUnit.MILLISECONDS) + } + + // add task array.add(task) // submit job array when it is ready if( array.size() == arraySize ) { - executor.submit(createTaskArray(array)) - array = new ArrayList<>(arraySize) + if( timeoutFuture ) { + timeoutFuture.cancel(false) + this.timeoutFuture = null + } + submit() + } + } + finally { + sync.unlock() + } + } + + /** + * Submit partial job array after timeout has elapsed. + */ + private void submitAfterTimeout() { + sync.lock() + try { + if( !array.isEmpty() && !closed ) { + log.debug "Submitting partial job array after timeout (${array.size()} tasks)" + submit() } } finally { @@ -111,12 +147,27 @@ class TaskArrayCollector { } } + /** + * Submit job array and clear timeout. + */ + private void submit() { + final arrayCopy = new ArrayList(array) // Create defensive copy to avoid ConcurrentModificationException + executor.submit(createTaskArray(arrayCopy)) + this.array = new ArrayList<>(arraySize) + this.timeoutFuture = null + } + /** * Close the collector, submitting any remaining tasks as a partial job array. */ void close() { sync.lock() try { + if( timeoutFuture ) { + timeoutFuture.cancel(false) + timeoutFuture = null + } + if( array.size() == 1 ) { executor.submit(array.first()) } @@ -124,10 +175,12 @@ class TaskArrayCollector { executor.submit(createTaskArray(array)) array = null } + closed = true } finally { sync.unlock() + timeoutScheduler.shutdownNow() } } @@ -151,7 +204,7 @@ class TaskArrayCollector { // create wrapper script final script = createArrayTaskScript(handlers) log.debug "Creating task array run >> $workDir\n$script" - + // create config for job array final rawConfig = new HashMap(ARRAY_DIRECTIVES.size()) for( final key : ARRAY_DIRECTIVES ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index f0434e69c7..d2e96ed209 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -298,7 +298,8 @@ class TaskProcessor { this.forksCount = maxForks ? new LongAdder() : null this.isFair0 = config.getFair() final arraySize = config.getArray() - this.arrayCollector = arraySize > 0 ? new TaskArrayCollector(this, executor, arraySize) : null + final arrayTimeoutMillis = executor.config.getArrayTimeout().toMillis() + this.arrayCollector = arraySize > 0 ? new TaskArrayCollector(this, executor, arraySize, arrayTimeoutMillis) : null log.debug "Creating process '$name': maxForks=${maxForks}; fair=${isFair0}; array=${arraySize}" }