Skip to content

feat: Azure Batch eagerly terminates jobs after all tasks have been submitted #6159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

adamrtalbot
Copy link
Collaborator

@adamrtalbot adamrtalbot commented Jun 4, 2025

Summary

Fixes Azure Batch "job leak" issue where jobs remain in Active state even after task completion, causing quota exhaustion and preventing multiple pipelines from running simultaneously.

Problem: Jobs consume quota slots unnecessarily, blocking other workflows
Solution: Leverage Azure Batch's native auto-termination to release quota immediately when tasks complete

How Azure Batch Eager Job Termination Works

Problem Addressed

Azure Batch has a limitation where jobs remain in an "Active" state even after all their tasks complete. This causes:

  • Quota exhaustion: Active jobs count against Azure Batch service quotas
  • Pipeline blocking: Multiple Nextflow pipelines can't run simultaneously due to quota limits
  • Resource waste: Jobs consume quota slots unnecessarily

Solution Implementation

Job Auto-Termination Configuration

// In AzBatchOpts.groovy
@ConfigOption
@Description("When the workflow completes, set all jobs to terminate on task completion (default: true)")
final Boolean terminateJobsOnCompletion

Default behavior: terminateJobsOnCompletion = true (enabled by default)

Job Termination Mechanism

The service implements a two-phase termination approach:

Phase 1: Set Jobs to Auto-Terminate
protected void terminateJobs() {
    for( String jobId : allJobIds.values() ) {
        final job = apply(() -> client.getJob(jobId))
        final poolInfo = job.poolInfo
        
        final jobParameter = new BatchJobUpdateContent()
            .setOnAllTasksComplete(OnAllBatchTasksComplete.TERMINATE_JOB)  // Key setting
            .setPoolInfo(poolInfo)
        
        apply(() -> client.updateJob(jobId, jobParameter))
    }
}
Phase 2: Cleanup on Workflow Completion
@Override
void close() {
    // Terminate all jobs to prevent them from occupying quota
    if( config.batch().terminateJobsOnCompletion ) {
        terminateJobs()
    }
    
    // Delete all jobs (if configured)
    if( config.batch().deleteJobsOnCompletion ) {
        cleanupJobs()
    }
}

Azure Batch Native Feature Integration

How Auto-Termination Works

  • Uses Azure Batch's native OnAllBatchTasksComplete.TERMINATE_JOB setting
  • This tells Azure Batch: "When all tasks in this job finish, automatically terminate the job"
  • Jobs transition from "Active" → "Terminating" → "Completed"

Eager Termination Flow

  1. Job Creation: Jobs created normally with pool assignments
  2. Task Submission: Tasks submitted to jobs as usual
  3. Workflow Completion: When Nextflow workflow completes
  4. Batch Update: Nextflow updates all existing jobs with OnAllTasksComplete = TERMINATE_JOB
  5. Auto-Termination: Azure Batch automatically terminates jobs as their tasks complete
  6. Quota Release: Terminated jobs no longer consume quota

Key Benefits

Resource Management

  • Quota preservation: Jobs don't consume quota after completion
  • Multiple pipelines: Allows running multiple Nextflow workflows simultaneously
  • Clean resource usage: Prevents resource leaks in Azure Batch

Operational Improvements

  • Zero user impact: Completely transparent to workflow users
  • Backward compatible: Can be disabled if needed
  • Automatic cleanup: No manual intervention required

Configuration Options

Users can control the behavior:

azure {
    batch {
        terminateJobsOnCompletion = true   // Enable eager termination (default)
        deleteJobsOnCompletion = false    // Optionally delete jobs entirely
        deleteTasksOnCompletion = true    // Clean up individual tasks
    }
}

Technical Implementation Details

Job Lifecycle Management

  • Job Reuse: Same job ID reused for same Process+PoolId combination
  • Pool Independence: Each pool can have its own jobs
  • Batch Updates: Uses Azure Batch updateJob API for termination setting

Error Handling

catch (HttpResponseException e) {
    if (e.response.statusCode == 409) {
        log.debug "Azure Batch job ${jobId} already terminated, skipping termination"
    } else {
        log.warn "Unable to terminate Azure Batch job ${jobId} - Status: ${e.response.statusCode}"
    }
}

Impact

This implementation provides an elegant solution to Azure Batch's job quota problem by leveraging Azure's native auto-termination feature. It ensures that jobs automatically terminate when their tasks complete, preventing quota exhaustion while maintaining full compatibility with existing workflows.

Related

Copy link

netlify bot commented Jun 4, 2025

Deploy Preview for nextflow-docs-staging canceled.

Name Link
🔨 Latest commit ac54e24
🔍 Latest deploy log https://app.netlify.com/projects/nextflow-docs-staging/deploys/689e29f7dce07d0008155545

@adamrtalbot

This comment was marked as outdated.

@adamrtalbot
Copy link
Collaborator Author

Integration tests failing, looks unrelated.

@pditommaso

This comment was marked as outdated.

@adamrtalbot

This comment was marked as outdated.

@pditommaso pditommaso force-pushed the master branch 2 times, most recently from b4b321e to 069653d Compare June 4, 2025 18:54
@bentsherman bentsherman self-requested a review June 16, 2025 12:34
@adamrtalbot
Copy link
Collaborator Author

Related issue on Slack: https://nfcore.slack.com/archives/C02T98A23U7/p1753954588096009

Hi !
Unsure if I should post here or in #nextflow-plugins, as it is concerns help about a plugin.
Concerned plugin is nf-azure , where, in the current state jobs are only deleted after completion of the workflow with AzBatchService.cleanupJobs()
The problem is that a job with all completed tasks still count towards the job quota ; thus allowing only 3-4 pipelines to run instead of 50+ (with 2-3 tasks each)
This is why I want to periodically run a mid-workflow cleanup in the AzBatchService such as :

protected void cleanupCompletedJobsMidRun() {
        for (String jobId : allJobIds.values()) {
            try {
                def tasks = client.listTasks(jobId)
                if (tasks.every { it.state.toString() in ['COMPLETED'] }) {
                    log.trace "Deleting Azure job ${jobId} mid-run"
                    apply(() -> client.deleteJob(jobId))
                }
            }
            catch (Exception e) {
                log.debug "Skipping mid-run cleanup for ${jobId} - ${e.message ?: e}"
            }
        }
    }

My problem is that it implies a modification of the nf-azure plugin, which being a core plugin, must be handled differently that a custom plugin derived from nf-hello
Apart from the natural plugin packaging, my main help wanted is on the overriding of the proper nf-azure plugin in PluginsFacade , so I have my own executor in :

protected List<PluginSpec> defaultPluginsConf(Map config) {
        // retrieve the list from the env var
        final commaSepList = env.get('NXF_PLUGINS_DEFAULT')
        if( commaSepList && commaSepList !in ['true','false'] ) {
            // if the plugin id in the list does *not* contain the @version suffix, it picks the version
            // specified in the defaults list. Otherwise parse the provider id@version string to the corresponding spec
            return commaSepList
                    .tokenize(',')
                    .collect( it-> defaultPlugins.hasPlugin(it) ? defaultPlugins.getPlugin(it) : PluginSpec.parse(it) )
        }

        final plugins = new ArrayList<PluginSpec>()
        final workDir = config.workDir as String
        final bucketDir = config.bucketDir as String
        final executor = Bolts.navigate(config, 'process.executor')

        if( executor == 'awsbatch' || workDir?.startsWith('s3://') || bucketDir?.startsWith('s3://') || env.containsKey('NXF_ENABLE_AWS_SES') )
            plugins << defaultPlugins.getPlugin('nf-amazon')

        if( executor == 'google-lifesciences' || executor == 'google-batch' || workDir?.startsWith('gs://') || bucketDir?.startsWith('gs://')  )
            plugins << defaultPlugins.getPlugin('nf-google')

        if( executor == 'azurebatch' || workDir?.startsWith('az://') || bucketDir?.startsWith('az://') )
            plugins << defaultPlugins.getPlugin('nf-azure')

...
            
        return plugins
    }

Is this problem easily solvable, or should we focus on balancing the load over multiple batch accounts instead of relying on only one ?

@ghislaindemael
Copy link

ghislaindemael commented Aug 6, 2025

Hi !
Posting my comment here as indicated in Slack.
Our 'use case' is relatively simple, we just run big pipelines (aka with many jobs), and with completed jobs being deleted only when all tasks are completed (or cancelled), we reach our quota relatively quickly, allowing us to have only a few pipelines running with 2-3 active jobs instead of dozens.

Should we not care about resuming a workflow, since task outputs are stored outside of the workdir, a simple flag to delete the job once the last task has finished it's lifecycle would be all we need.

The downside is that to "resume" a workflow, me must find a way to tell Nextflow that task outputs already exist and are stored at X location, as well as supply them. Which leads to the fact that the easiest solution would be to relaunch the whole workflow should we have an error arise somewhere during processing.

@adamrtalbot

This comment was marked as outdated.

@adamrtalbot
Copy link
Collaborator Author

since task outputs are stored outside of the workdir

@ghislaindemael I'm not sure I understand this; Task outputs have to be in the working directory. Even if they're published to a new location, they are copied out after the task is completed by Nextflow itself.

@ghislaindemael
Copy link

@adamrtalbot

My error, indeed I meant as we publish the results outside of the workdir (e.g. in Blob Storage), we can query them from here and thus delete them from the Batch VMs to remove the load and free up jobs for the quota.

@adamrtalbot
Copy link
Collaborator Author

we can query them from here and thus delete them from the Batch VMs to remove the load and free up jobs for the quota.

To clarify the flow here:

  1. A job is created on Azure Batch by Nextflow and assigned to a Node Pool
  2. One or more tasks are added to the Job
  3. Each task is assigned to a node
  4. The task starts
    • It downloads the input files from Azure Blob storage to the local node
    • It creates the output files on the local node
    • It uploads the output files back to Azure Blob storage at the working directory
  5. The task completes
    • The output files on the local node are deleted to clear space for future tasks
    • (Optional): The output files are copied from the working directory to the publishing directory by Nextflow
  6. When the pipeline completes, Nextflow will terminate the job created in 1, preventing any new tasks being added to the job and clearing quota

This PR strictly refers to 1 and 9 and does not interact with any files. If you are having issues with file storage, running out of space, etc., this would be an different issue.

@luanjot
Copy link

luanjot commented Aug 12, 2025

@adamrtalbot we are also looking for a solution for this. We have executions that have hundreds of jobs sometimes, so, even in proper executions without errors, we are limited to 2 or 3 parallel executions per batch account. In our case, at any given moment, we would have something like:

Run1:
200 jobs marked as finished, e.g. [100%] X of X ✔
2 jobs ongoing with pending tasks
20 jobs not started

Run2:
57 jobs marked as finished, e.g. [100%] X of X ✔
4 jobs ongoing with pending tasks
200 jobs not started

So, from Azure's perspective, we have 200+2+57+4=263 jobs ongoing. As the runs progress, we have more and more jobs open and we reach the limit very quickly.

We are seeing if we can modify / extend the nf-azure plugin to handle this by adding some sort of cron job that deletes the jobs marked as finished (with a ✔), but you seem to say that this might cause issues when resuming the tasks? Why is that? It seems to me that the job names are different when it resumes the execution, no?

@adamrtalbot
Copy link
Collaborator Author

but you seem to say that this might cause issues when resuming the tasks?

Not resume, but retry with an errorStrategy: https://www.nextflow.io/docs/latest/reference/process.html#errorstrategy

Here is the flow that may cause issues:

  1. A job is created for a process
  2. 5 tasks are submitted to the job
  3. Nextflow decides no more tasks will be submitted and closes the Job (sets to terminateOnCompletion)
  4. 4 tasks successfully complete, 1 task fails
  5. All tasks have completed, Azure Batch terminates the job
  6. The failed task tries to retry
  7. It gets submitted to a terminated job!

@luanjot
Copy link

luanjot commented Aug 12, 2025

So using a cron job that validates when all tasks have completed successfully would work, right? In this case, the tasks have already completed. Or can we add it to the bit that runs and "decides" that all the tasks have been completed?

@adamrtalbot
Copy link
Collaborator Author

So using a cron job that validates when all tasks have completed successfully would work, right? In this case, the tasks have already completed. Or can we add it to the bit that runs and "decides" that all the tasks have been completed?

You might have the same issue, in that you terminate a job before you can resubmit a task.

@adamrtalbot
Copy link
Collaborator Author

Note none of this will help you if you just have too many active jobs. A job needs to be active to run a task, so if you just have a lot of work to do this wont help.

Really, the issue is with the terrible design by Azure but given they just fired most of their genomics staff I doubt they will bother to help 🤷 .

@luanjot
Copy link

luanjot commented Aug 12, 2025

I have too many active jobs because nextflow does not close them, not because they are actually active.

Any job that has been marked by nextflow with a ✔ is, in my opinion, finished and will not be re-used for anything never again, however, nextflow does not close it until the full execution of the run is finished. This is the behaviour that I think is incorrect. If the run takes 2 days to run, the first task that finished 47 hours ago is still marked as "active" in batch because Nextflow does not close it even though it will never be used again. I think it is Nextflow that is not using the Batch account properly.

@adamrtalbot
Copy link
Collaborator Author

Any job that has been marked by nextflow with a ✔ is, in my opinion, finished and will not be re-used for anything never again, however, nextflow does not close it until the full execution of the run is finished. This is the behaviour that I think is incorrect. If the run takes 2 days to run, the first task that finished 47 hours ago is still marked as "active" in batch because Nextflow does not close it even though it will never be used again. I think it is Nextflow that is not using the Batch account properly.

Right - so with especially long running pipelines you have many jobs in active state which do not do anything.

Unfortunately, there isn't a way of Nextflow knowing the future and determining if another task will be submitted to the job which makes it tricky to know when to close a job.

Here's an alternative implementation (which has the added benefit of making @pditommaso happy because it wont use the trace observer!):

  1. Create Job
  2. Submit task
  3. IMMEDIATELY set Job to terminate onAllBatchTasksComplete
  4. Add a try/catch so if you try to submit a Task to a Job in completed state, a new Job will be created, allowing more jobs to start.

This should eagerly terminate jobs while still allowing users to submit all tasks as normal.

@luanjot
Copy link

luanjot commented Aug 12, 2025

Unfortunately, there isn't a way of Nextflow knowing the future and determining if another task will be submitted to the job which makes it tricky to know when to close a job.

Then how does it "decide" to add the ✔?

@pditommaso
Copy link
Member

The check is shown when no more tasks for that process need to be executed ie. the process execution is complete

@adamrtalbot
Copy link
Collaborator Author

The check is shown when no more tasks for that process need to be executed ie. the process execution is complete

Excellent, can we use that logic to terminate the Azure Job?

@pditommaso
Copy link
Member

It could be done with a TraceObserver(V2). If i'm not wrong you already made a pr for that

@adamrtalbot adamrtalbot requested a review from a team as a code owner August 12, 2025 12:59
@adamrtalbot
Copy link
Collaborator Author

adamrtalbot commented Aug 12, 2025

It could be done with a TraceObserver(V2). If i'm not wrong you already made a pr for that

I have, it's this one but I've modified the behaviour now.

@luanjot I've updated the behaviour so now:

  • All Jobs are immediately set to terminate after all Tasks complete
  • Any subsequent Tasks are still submitted to the same Job
  • If a Job is found to be in terminated state when a Task is added, Nextflow will just create a new Job (also set to automatically terminate).

The advantage here is:

  • The Job will eagerly terminate as soon as all tasks have finished
  • This behaviour is controlled by Azure Batch, so we can defer any of that logic to their side. If Nextflow dies, the job still terminates.
  • From a user perspective, very little changes other than azure.batch.terminateJobsOnCompletion should work earlier.

…ubmitted

Azure Batch "job leak" is still an issue. This commit fixes #5839 which allows Nextflow to set jobs to auto terminate when all tasks have been submitted. This means that eventually jobs will move into terminated state even if something prevents nextflow reaching a graceful shutdown. Very early implementation and needs some refinement.

Signed-off-by: adamrtalbot <[email protected]>
Signed-off-by: adamrtalbot <[email protected]>
Signed-off-by: adamrtalbot <[email protected]>
…sOnCompletion; refactor task submission; reduce logging

- Remove Azure Batch process observer:
  - Delete AzBatchProcessObserver, its factory, and tests
  - Remove registration from META-INF/extensions.idx
- Reuse `azure.batch.terminateJobsOnCompletion` to set Azure Batch job `OnAllTasksComplete` (eager auto-termination)
- Refactor task submission flow in AzBatchService:
  - Extract helpers: `submitTaskToJob`, `recreateJobForTask`, `setAutoTerminateIfEnabled`
  - Handle 409 conflicts by recreating the job and retrying submission

Signed-off-by: adamrtalbot <[email protected]>
@adamrtalbot adamrtalbot force-pushed the 5839_azure_batch_jobs_terminate_upon_completion branch from d201e31 to 69da31c Compare August 12, 2025 16:46
@adamrtalbot
Copy link
Collaborator Author

@luanjot and @ghislaindemael, I would appreciate it if you could give this development branch some extra testing, provided you can do a local build of Nextflow. I've ran it through a bunch of pipelines but more UAT is always helpful!

@pditommaso
Copy link
Member

It looks neat but not getting the general logic. Can you provide some hint ?

…ogic

- Add class-level documentation explaining three-layer auto-termination strategy
- Document eager auto-termination approach in setAutoTerminateIfEnabled() method
- Explain 409 conflict resolution in submitTaskToJob() method
- Document job recreation logic in recreateJobForTask() method
- Add comments explaining relationship between eager and graceful shutdown approaches
- Add test coverage for auto-termination scenarios while avoiding unmockable Azure SDK classes
- Keep comments concise but comprehensive to improve code maintainability

Addresses Paolo Di Tommaso's feedback in PR #6159 requesting explanation of the
Azure Batch job auto-termination logic to prevent job leak and improve quota cleanup.

Signed-off-by: adamrtalbot <[email protected]>
@adamrtalbot
Copy link
Collaborator Author

Added some comments in df102da referring to #6159 (comment)

…rvice.groovy [ci skip]

Signed-off-by: Paolo Di Tommaso <[email protected]>
@pditommaso
Copy link
Member

@adamrtalbot i've updated the PR description for you 😎

…rvice.groovy [ci skip]

Co-authored-by: Adam Talbot <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
@pditommaso
Copy link
Member

One thing that i', not getting, if the job is created with the flag TERMINATE_JOB on creation

OnAllBatchTasksComplete.TERMINATE_JOB

why it still needed to update the job with the same flag on completion?

https://github.com/nextflow-io/nextflow/blob/5839_azure_batch_jobs_terminate_upon_completion/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy#L1068-L1071

@adamrtalbot
Copy link
Collaborator Author

One thing that i', not getting, if the job is created with the flag TERMINATE_JOB on creation

OnAllBatchTasksComplete.TERMINATE_JOB

why it still needed to update the job with the same flag on completion?

https://github.com/nextflow-io/nextflow/blob/5839_azure_batch_jobs_terminate_upon_completion/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy#L1068-L1071

Good point - it's the previous old code hanging around.

On one hand, it's a good idea to go around afterwards and definitely mark them as complete, because it has zero risk and can cause many issues if it isn't done.

On the other hand, it's pointlessly doing more API calls.

This is only situation I can think of where a job wouldn't be set to autoterminate:

  1. Nextflow creates a Job
  2. Nextflow tries to add a Task to a Job, but it fails (e.g. API error)
  3. Nextflow terminates
  4. Nextflow leaves job in active state
  5. Quota is consumed 😨

In which case, leaving it in will help.

@pditommaso
Copy link
Member

From my point of view it would be better to keep only the cleanup on jon creation (why it should fail?).

The update on terminate does not scale well with a large number of tasks

Remove redundant job termination logic that was executed during pipeline
shutdown. Jobs are now configured for auto-termination at creation time,
making the shutdown termination unnecessary.

Changes:
- Remove terminateJobs() method and its call from close()
- Remove setAutoTerminateIfEnabled() method
- Remove redundant auto-termination calls after task submission
- Jobs now terminate automatically when all tasks complete

This simplifies the codebase and prevents unnecessary API calls while
ensuring jobs don't consume quota after completion.

Signed-off-by: adamrtalbot <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Nextflow eagerly terminates Azure Batch jobs during execution
5 participants