Skip to content

Commit 66abf24

Browse files
committed
Fix Job arrays exceeding queue size limit (#5920) (#6345) [ci fast]
1 parent ae66102 commit 66abf24

File tree

3 files changed

+137
-1
lines changed

3 files changed

+137
-1
lines changed

modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,47 @@ class TaskPollingMonitor implements TaskMonitor {
197197
int getCapacity() { capacity }
198198

199199

200+
/**
201+
* Get the number of queue slots consumed by a task handler.
202+
* Regular tasks consume 1 slot, job arrays consume slots equal to their array size.
203+
*
204+
* @param handler A {@link TaskHandler} for the task
205+
* @return The number of queue slots consumed by this task
206+
*/
207+
private int getTaskSlots(TaskHandler handler) {
208+
return handler.task instanceof TaskArrayRun ?
209+
((TaskArrayRun) handler.task).getArraySize() : 1
210+
}
211+
212+
/**
213+
* Validates that a task handler can be submitted based on queue capacity constraints.
214+
* Performs validation for job arrays that exceed queue capacity.
215+
*
216+
* @param handler A {@link TaskHandler} for the task to validate
217+
* @return {@code true} if the task meets capacity constraints
218+
* @throws IllegalArgumentException if job array size exceeds queue capacity
219+
*/
220+
private boolean checkQueueCapacity(TaskHandler handler) {
221+
// Calculate slots consumed by this task:
222+
// - Regular tasks consume 1 slot
223+
// - Job arrays consume slots equal to their array size
224+
int slots = getTaskSlots(handler)
225+
226+
// Prevent job arrays larger than the entire queue capacity
227+
// This catches configuration errors early with a clear error message
228+
if (slots > capacity) {
229+
throw new IllegalArgumentException(
230+
"Process '${handler.task.name}' declares array size ($slots) " +
231+
"which exceeds the executor queue size ($capacity)")
232+
}
233+
234+
// Check if there's enough remaining capacity for this task:
235+
// - runningQueue.size() = currently running tasks
236+
// - slots = slots needed for this new task
237+
// - capacity = maximum allowed concurrent tasks
238+
return runningQueue.size() + slots <= capacity
239+
}
240+
200241
/**
201242
* Defines the strategy determining if a task can be submitted for execution.
202243
*
@@ -207,7 +248,7 @@ class TaskPollingMonitor implements TaskMonitor {
207248
* by the polling monitor
208249
*/
209250
protected boolean canSubmit(TaskHandler handler) {
210-
(capacity>0 ? runningQueue.size() < capacity : true) && handler.canForkProcess() && handler.isReady()
251+
(capacity > 0 ? checkQueueCapacity(handler) : true) && handler.canForkProcess() && handler.isReady()
211252
}
212253

213254
/**

modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,26 @@ class ParallelPollingMonitorTest extends Specification {
118118
10 | 1 | false | false
119119
}
120120

121+
def 'should inherit array size validation from parent TaskPollingMonitor' () {
122+
given:
123+
def session = Mock(Session)
124+
def throttlingExecutor = Mock(ThrottlingExecutor)
125+
def monitor = new ParallelPollingMonitor(throttlingExecutor, [name: 'foo', session: session, capacity: 5, pollInterval: Duration.of('1min')])
126+
and:
127+
def processor = Mock(TaskProcessor)
128+
def arrayHandler = Mock(TaskHandler) {
129+
getTask() >> Mock(TaskArrayRun) {
130+
getName() >> 'oversized_array'
131+
getArraySize() >> 10
132+
getProcessor() >> processor
133+
}
134+
}
135+
136+
when:
137+
monitor.canSubmit(arrayHandler)
138+
then:
139+
def e = thrown(IllegalArgumentException)
140+
e.message.contains("Process 'oversized_array' declares array size (10) which exceeds the executor queue size (5)")
141+
}
142+
121143
}

modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import nextflow.executor.ExecutorConfig
2222
import nextflow.util.Duration
2323
import nextflow.util.RateUnit
2424
import spock.lang.Specification
25+
import spock.lang.Unroll
2526
/**
2627
*
2728
* @author Paolo Di Tommaso <[email protected]>
@@ -150,4 +151,76 @@ class TaskPollingMonitorTest extends Specification {
150151
3 * session.notifyTaskSubmit(handler)
151152
}
152153

154+
@Unroll
155+
def 'should throw error if job array size exceeds queue size [capacity: #CAPACITY, array: #ARRAY_SIZE]' () {
156+
given:
157+
def session = Mock(Session)
158+
def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: CAPACITY, pollInterval: Duration.of('1min')))
159+
and:
160+
def processor = Mock(TaskProcessor)
161+
def arrayHandler = Mock(TaskHandler) {
162+
getTask() >> Mock(TaskArrayRun) {
163+
getName() >> TASK_NAME
164+
getArraySize() >> ARRAY_SIZE
165+
getProcessor() >> processor
166+
}
167+
}
168+
169+
when:
170+
monitor.canSubmit(arrayHandler)
171+
then:
172+
def e = thrown(IllegalArgumentException)
173+
e.message.contains("Process '$TASK_NAME' declares array size ($ARRAY_SIZE) which exceeds the executor queue size ($CAPACITY)")
174+
175+
where:
176+
CAPACITY | ARRAY_SIZE | TASK_NAME
177+
10 | 15 | 'test_array'
178+
5 | 10 | 'large_array'
179+
1 | 2 | 'small_array'
180+
}
181+
182+
@Unroll
183+
def 'should validate array size accounting in queue capacity [capacity: #CAPACITY, running: #RUNNING_COUNT, array: #ARRAY_SIZE]' () {
184+
given:
185+
def session = Mock(Session)
186+
def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: CAPACITY, pollInterval: Duration.of('1min')))
187+
and:
188+
def processor = Mock(TaskProcessor)
189+
def regularHandler = Mock(TaskHandler) {
190+
getTask() >> Mock(TaskRun) {
191+
getProcessor() >> processor
192+
}
193+
canForkProcess() >> CAN_FORK
194+
isReady() >> IS_READY
195+
}
196+
def arrayHandler = Mock(TaskHandler) {
197+
getTask() >> Mock(TaskArrayRun) {
198+
getArraySize() >> ARRAY_SIZE
199+
getProcessor() >> processor
200+
}
201+
canForkProcess() >> CAN_FORK
202+
isReady() >> IS_READY
203+
}
204+
205+
and:
206+
RUNNING_COUNT.times { monitor.runningQueue.add(regularHandler) }
207+
208+
expect:
209+
monitor.runningQueue.size() == RUNNING_COUNT
210+
monitor.canSubmit(regularHandler) == REGULAR_EXPECTED
211+
monitor.canSubmit(arrayHandler) == ARRAY_EXPECTED
212+
213+
where:
214+
CAPACITY | RUNNING_COUNT | ARRAY_SIZE | CAN_FORK | IS_READY | REGULAR_EXPECTED | ARRAY_EXPECTED
215+
10 | 6 | 5 | true | true | true | false // Array too big (6+5>10)
216+
10 | 5 | 5 | true | true | true | true // Array fits exactly (5+5=10)
217+
10 | 9 | 1 | true | true | true | true // Both fit (9+1=10)
218+
10 | 10 | 1 | true | true | false | false // Queue full (10+1>10)
219+
5 | 4 | 1 | true | true | true | true // Both fit (4+1=5)
220+
5 | 4 | 2 | true | true | true | false // Array too big (4+2>5)
221+
0 | 5 | 10 | true | true | true | true // Unlimited capacity
222+
10 | 5 | 3 | false | true | false | false // Cannot fork
223+
10 | 5 | 3 | true | false | false | false // Not ready
224+
}
225+
153226
}

0 commit comments

Comments
 (0)