Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,8 @@ class BashWrapperBuilder {
if( fixOwnership() )
builder.addEnv( 'NXF_OWNER=$(id -u):$(id -g)' )

for( String var : containerConfig.getEnvWhitelist() ) {
final envWhitelist = new ArrayList<>(containerConfig.getEnvWhitelist())
for( String var : envWhitelist ) {
builder.addEnv(var)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class ExecutorConfig implements ConfigScope {
""")
final String account

@ConfigOption
@Description("""
Determines how long to wait for a job array to fill to the specified array size before submitting a partial array (default: `60 min`).
""")
final Duration arrayTimeout

@ConfigOption
@Description("""
*Used only by the local executor.*
Expand Down Expand Up @@ -141,7 +147,7 @@ class ExecutorConfig implements ConfigScope {
Determines how often to fetch the queue status from the scheduler (default: `1 min`).
""")
final Duration queueStatInterval

@Description("""
The `executor.retry` scope controls the behavior of retrying failed job submissions.

Expand All @@ -162,6 +168,7 @@ class ExecutorConfig implements ConfigScope {

ExecutorConfig(Map opts) {
account = opts.account
arrayTimeout = opts.arrayTimeout as Duration ?: Duration.of('60min')
cpus = opts.cpus as Integer
dumpInterval = opts.dumpInterval as Duration ?: Duration.of('5min')
exitReadTimeout = opts.exitReadTimeout as Duration ?: Duration.of('270sec')
Expand All @@ -183,6 +190,10 @@ class ExecutorConfig implements ConfigScope {
this.opts = opts
}

Duration getArrayTimeout(String execName) {
getExecConfigProp(execName, 'arrayTimeout', null) as Duration
}

Duration getExitReadTimeout(String execName) {
getExecConfigProp(execName, 'exitReadTimeout', null) as Duration
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package nextflow.processor

import java.nio.file.Files
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

Expand Down Expand Up @@ -65,25 +69,32 @@ class TaskArrayCollector {

private int arraySize

private long arrayTimeoutMillis

private ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor()

private ScheduledFuture<?> timeoutFuture

private Lock sync = new ReentrantLock()

private List<TaskRun> array

private boolean closed = false

TaskArrayCollector(TaskProcessor processor, Executor executor, int arraySize) {
TaskArrayCollector(TaskProcessor processor, Executor executor, int arraySize, long arrayTimeoutMillis) {
if( executor !instanceof TaskArrayExecutor )
throw new IllegalArgumentException("Executor '${executor.name}' does not support job arrays")

this.processor = processor
this.executor = (TaskArrayExecutor)executor
this.arraySize = arraySize
this.arrayTimeoutMillis = arrayTimeoutMillis
this.array = new ArrayList<>(arraySize)
}

/**
* Add a task to the current array, and submit the array when it
* reaches the desired size.
* reaches the desired size or the max interval has passed.
*
* @param task
*/
Expand All @@ -97,37 +108,79 @@ class TaskArrayCollector {
return
}

// add task to the array
// reset timeout on first task of job array
if( array.isEmpty() ) {
final action = { submitAfterTimeout() }
this.timeoutFuture = timeoutScheduler.schedule(action, arrayTimeoutMillis, TimeUnit.MILLISECONDS)
}

// add task
array.add(task)

// submit job array when it is ready
if( array.size() == arraySize ) {
executor.submit(createTaskArray(array))
array = new ArrayList<>(arraySize)
if( timeoutFuture ) {
timeoutFuture.cancel(false)
this.timeoutFuture = null
}
submit()
}
}
finally {
sync.unlock()
}
}

/**
* Submit partial job array after timeout has elapsed.
*/
private void submitAfterTimeout() {
sync.lock()
try {
if( !array.isEmpty() && !closed ) {
log.debug "Submitting partial job array after timeout (${array.size()} tasks)"
submit()
}
}
finally {
sync.unlock()
}
}

/**
* Submit job array and clear timeout.
*/
private void submit() {
final arrayCopy = new ArrayList<TaskRun>(array) // Create defensive copy to avoid ConcurrentModificationException
executor.submit(createTaskArray(arrayCopy))
this.array = new ArrayList<>(arraySize)
this.timeoutFuture = null
}

/**
* Close the collector, submitting any remaining tasks as a partial job array.
*/
void close() {
sync.lock()
try {
if( timeoutFuture ) {
timeoutFuture.cancel(false)
timeoutFuture = null
}

if( array.size() == 1 ) {
executor.submit(array.first())
}
else if( array.size() > 0 ) {
executor.submit(createTaskArray(array))
array = null
}

closed = true
}
finally {
sync.unlock()
timeoutScheduler.shutdownNow()
}
}

Expand All @@ -151,7 +204,7 @@ class TaskArrayCollector {
// create wrapper script
final script = createArrayTaskScript(handlers)
log.debug "Creating task array run >> $workDir\n$script"

// create config for job array
final rawConfig = new HashMap<String,Object>(ARRAY_DIRECTIVES.size())
for( final key : ARRAY_DIRECTIVES ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ class TaskProcessor {
this.forksCount = maxForks ? new LongAdder() : null
this.isFair0 = config.getFair()
final arraySize = config.getArray()
this.arrayCollector = arraySize > 0 ? new TaskArrayCollector(this, executor, arraySize) : null
final arrayTimeoutMillis = executor.config.getArrayTimeout().toMillis()
this.arrayCollector = arraySize > 0 ? new TaskArrayCollector(this, executor, arraySize, arrayTimeoutMillis) : null
log.debug "Creating process '$name': maxForks=${maxForks}; fair=${isFair0}; array=${arraySize}"
}

Expand Down