Skip to content

Commit 3bcb411

Browse files
committed
Revert "Fix Job arrays exceeding queueSize (#6314)"
This reverts commit fcf5b87.
1 parent ff00e2d commit 3bcb411

File tree

5 files changed

+7
-81
lines changed

5 files changed

+7
-81
lines changed

modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor {
5656

5757
@Override
5858
protected boolean canSubmit(TaskHandler handler) {
59-
return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire(handler.getForksCount()))
59+
return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire())
6060
}
6161

6262
protected RateLimiter createSubmitRateLimit() {
@@ -96,7 +96,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor {
9696

9797
@Override
9898
boolean evict(TaskHandler handler) {
99-
semaphore?.release(handler.getForksCount())
99+
semaphore?.release()
100100
return super.evict(handler)
101101
}
102102
}

modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -254,13 +254,6 @@ abstract class TaskHandler {
254254
return record
255255
}
256256

257-
/**
258-
* Determine the number of forks consumed by the task handler.
259-
*/
260-
int getForksCount() {
261-
return task instanceof TaskArrayRun ? task.getArraySize() : 1
262-
}
263-
264257
/**
265258
* Determine if a process can be forked i.e. can launch
266259
* a parallel task execution. This is only enforced when
@@ -273,7 +266,7 @@ abstract class TaskHandler {
273266
*/
274267
boolean canForkProcess() {
275268
final max = task.processor.maxForks
276-
return !max ? true : task.processor.forksCount + getForksCount() <= max
269+
return !max ? true : task.processor.forksCount < max
277270
}
278271

279272
/**
@@ -290,18 +283,14 @@ abstract class TaskHandler {
290283
* Increment the number of current forked processes
291284
*/
292285
final void incProcessForks() {
293-
def count = task.processor.forksCount
294-
if( count != null )
295-
count.add(getForksCount())
286+
task.processor.forksCount?.increment()
296287
}
297288

298289
/**
299290
* Decrement the number of current forked processes
300291
*/
301292
final void decProcessForks() {
302-
def count = task.processor.forksCount
303-
if( count != null )
304-
count.add(-getForksCount())
293+
task.processor.forksCount?.decrement()
305294
}
306295

307296
/**

modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,7 @@ class TaskPollingMonitor implements TaskMonitor {
207207
* by the polling monitor
208208
*/
209209
protected boolean canSubmit(TaskHandler handler) {
210-
int slots = handler.getForksCount()
211-
if( capacity > 0 && slots > capacity )
212-
throw new IllegalArgumentException("Job array ${handler.task.name} exceeds the queue size (array size: $slots, queue size: $capacity)")
213-
if( capacity > 0 && runningQueue.size() + slots > capacity )
214-
return false
215-
return handler.canForkProcess() && handler.isReady()
210+
(capacity>0 ? runningQueue.size() < capacity : true) && handler.canForkProcess() && handler.isReady()
216211
}
217212

218213
/**

modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class ParallelPollingMonitorTest extends Specification {
8383
def retry = new AtomicInteger()
8484

8585
def session = Mock(Session)
86-
def handler = Spy(TaskHandler)
86+
def handler = Mock(TaskHandler)
8787

8888
def opts = new ThrottlingExecutor.Options()
8989
.retryOn(IllegalArgumentException)

modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,12 @@
1616

1717
package nextflow.processor
1818

19-
import java.util.concurrent.atomic.LongAdder
2019

2120
import nextflow.Session
2221
import nextflow.executor.ExecutorConfig
2322
import nextflow.util.Duration
2423
import nextflow.util.RateUnit
25-
import nextflow.util.ThrottlingExecutor
2624
import spock.lang.Specification
27-
import spock.lang.Unroll
2825
/**
2926
*
3027
* @author Paolo Di Tommaso <[email protected]>
@@ -153,59 +150,4 @@ class TaskPollingMonitorTest extends Specification {
153150
3 * session.notifyTaskSubmit(handler)
154151
}
155152

156-
@Unroll
157-
def 'should check whether job can be submitted' () {
158-
given:
159-
def session = Mock(Session)
160-
def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: CAPACITY, pollInterval: Duration.of('1min')))
161-
and:
162-
def processor = Mock(TaskProcessor) {
163-
getForksCount() >> new LongAdder()
164-
getMaxForks() >> FORKS
165-
}
166-
def handler = Spy(TaskHandler)
167-
handler.task = Mock(TaskRun) {
168-
getProcessor() >> processor
169-
}
170-
def arrayHandler = Spy(TaskHandler) {
171-
isReady() >> true
172-
}
173-
arrayHandler.task = Mock(TaskArrayRun) {
174-
getArraySize() >> ARRAY
175-
getProcessor() >> processor
176-
}
177-
178-
and:
179-
SUBMIT.times { monitor.runningQueue.add(handler) }
180-
181-
expect:
182-
monitor.runningQueue.size() == SUBMIT
183-
monitor.canSubmit(arrayHandler) == EXPECTED
184-
where:
185-
CAPACITY | SUBMIT | FORKS | ARRAY | EXPECTED
186-
0 | 0 | 0 | 2 | true
187-
0 | 0 | 1 | 2 | false
188-
10 | 5 | 0 | 5 | true
189-
10 | 8 | 0 | 5 | false
190-
10 | 0 | 1 | 5 | false
191-
}
192-
193-
def 'should throw error if job array size exceeds queue size' () {
194-
given:
195-
def session = Mock(Session)
196-
def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: 2, pollInterval: Duration.of('1min')))
197-
and:
198-
def arrayHandler = Spy(TaskHandler)
199-
arrayHandler.task = Mock(TaskArrayRun) {
200-
getName() >> 'bar'
201-
getArraySize() >> 4
202-
}
203-
204-
when:
205-
monitor.canSubmit(arrayHandler)
206-
then:
207-
def e = thrown(IllegalArgumentException)
208-
e.message == 'Job array bar exceeds the queue size (array size: 4, queue size: 2)'
209-
}
210-
211153
}

0 commit comments

Comments
 (0)