Skip to content

Commit 69da31c

Browse files
committed
refactor(nf-azure): remove AzBatchProcessObserver; reuse terminateJobsOnCompletion; refactor task submission; reduce logging
- Remove Azure Batch process observer: - Delete AzBatchProcessObserver, its factory, and tests - Remove registration from META-INF/extensions.idx - Reuse `azure.batch.terminateJobsOnCompletion` to set Azure Batch job `OnAllTasksComplete` (eager auto-termination) - Refactor task submission flow in AzBatchService: - Extract helpers: `submitTaskToJob`, `recreateJobForTask`, `setAutoTerminateIfEnabled` - Handle 409 conflicts by recreating the job and retrying submission Signed-off-by: adamrtalbot <[email protected]>
1 parent dd4fa45 commit 69da31c

File tree

6 files changed

+59
-212
lines changed

6 files changed

+59
-212
lines changed

docs/reference/config.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ The following settings are available:
381381
`azure.batch.poolIdentityClientId`
382382
: :::{versionadded} 25.05.0-edge
383383
:::
384-
: The client ID for an Azure [managed identity](https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/overview) that is available on all Azure Batch node pools. This identity is used by Fusion to authenticate to Azure storage. If set to `'auto'`, Fusion will use the first available managed identity.
384+
: When the workflow completes, set all jobs to terminate on task completion. (default: `true`).
385385

386386
`azure.batch.pools.<name>.autoScale`
387387
: Enable autoscaling feature for the pool identified with `<name>`.
@@ -450,7 +450,7 @@ The following settings are available:
450450
`azure.batch.terminateJobsOnCompletion`
451451
: :::{versionadded} 23.05.0-edge
452452
:::
453-
: When the workflow completes, set all jobs to terminate on task completion (default: `true`).
453+
: Set all jobs to terminate on task completion (default: `true`).
454454

455455
`azure.managedIdentity.clientId`
456456
: The client ID for an Azure [managed identity](https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/overview). Defaults to environment variable `AZURE_MANAGED_IDENTITY_USER`.

plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy

Lines changed: 0 additions & 74 deletions
This file was deleted.

plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy

Lines changed: 0 additions & 37 deletions
This file was deleted.

plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ class AzBatchService implements Closeable {
336336
}
337337

338338
protected AzureNamedKeyCredential createBatchCredentialsWithKey() {
339-
log.debug "[AZURE BATCH] Creating Azure Batch client using shared key creddentials"
339+
log.debug "[AZURE BATCH] Creating Azure Batch client using shared key credentials"
340340

341341
if( !config.batch().endpoint )
342342
throw new IllegalArgumentException("Missing Azure Batch endpoint -- Specify it in the nextflow.config file using the setting 'azure.batch.endpoint'")
@@ -568,9 +568,57 @@ class AzBatchService implements Closeable {
568568
}
569569

570570
AzTaskKey runTask(String poolId, String jobId, TaskRun task) {
571-
final taskToAdd = createTask(poolId, jobId, task)
572-
apply(() -> client.createTask(jobId, taskToAdd))
573-
return new AzTaskKey(jobId, taskToAdd.getId())
571+
final BatchTaskCreateContent taskToAdd = createTask(poolId, jobId, task)
572+
final AzTaskKey submitted = submitTaskToJob(poolId, jobId, task, taskToAdd)
573+
setAutoTerminateIfEnabled(submitted.jobId, taskToAdd.getId())
574+
return submitted
575+
}
576+
577+
/**
578+
* Submit a task and transparently recover from a 409 job state conflict by
579+
* creating a fresh job and retrying the submission.
580+
*/
581+
protected AzTaskKey submitTaskToJob(String poolId, String jobId, TaskRun task, BatchTaskCreateContent taskToAdd) {
582+
try {
583+
apply(() -> client.createTask(jobId, taskToAdd))
584+
return new AzTaskKey(jobId, taskToAdd.getId())
585+
}
586+
catch( HttpResponseException e ) {
587+
if( e.response.statusCode == 409 && config.batch().terminateJobsOnCompletion ) {
588+
final String newJobId = recreateJobForTask(poolId, task, jobId, taskToAdd.getId())
589+
apply(() -> client.createTask(newJobId, taskToAdd))
590+
return new AzTaskKey(newJobId, taskToAdd.getId())
591+
}
592+
throw e
593+
}
594+
}
595+
596+
/**
597+
* Create a new Azure Batch job for the given process and update internal bookkeeping
598+
* so subsequent tasks for the same (process,pool) use the fresh job id.
599+
*/
600+
protected String recreateJobForTask(String poolId, TaskRun task, String oldJobId, String taskId) {
601+
log.debug "Job ${oldJobId} is in completed/terminating state, creating a new job for task ${taskId}"
602+
allJobIds.values().removeAll { it == oldJobId }
603+
final String newJobId = createJob0(poolId, task)
604+
final AzJobKey mapKey = new AzJobKey(task.processor, poolId)
605+
allJobIds[mapKey] = newJobId
606+
return newJobId
607+
}
608+
609+
/**
610+
* If configured, set the job to auto-terminate when all tasks complete.
611+
*/
612+
protected void setAutoTerminateIfEnabled(String jobId, String taskId) {
613+
if( !config.batch().terminateJobsOnCompletion )
614+
return
615+
log.trace "Eagerly setting job ${jobId} to auto-terminate after submitting task ${taskId}"
616+
try {
617+
setJobAutoTermination(jobId)
618+
}
619+
catch( Exception e ) {
620+
log.trace "Failed to eagerly set auto-termination for job ${jobId} after task submission - ${e.message ?: e}"
621+
}
574622
}
575623

576624
protected List<String> getShareVolumeMounts(AzVmPoolSpec spec) {
@@ -984,6 +1032,7 @@ class AzBatchService implements Closeable {
9841032
* @param jobId The Azure Batch job ID to set for auto-termination
9851033
*/
9861034
void setJobAutoTermination(String jobId) {
1035+
log.trace "Eagerly setting Azure Batch job ${jobId} to auto-terminate when all tasks complete"
9871036
setJobTermination(jobId)
9881037
}
9891038

@@ -994,22 +1043,20 @@ class AzBatchService implements Closeable {
9941043
*/
9951044
protected void setJobTermination(String jobId) {
9961045
try {
997-
log.trace "Setting Azure job ${jobId} to terminate on completion"
998-
9991046
final job = apply(() -> client.getJob(jobId))
1047+
10001048
final poolInfo = job.poolInfo
10011049

10021050
final jobParameter = new BatchJobUpdateContent()
10031051
.setOnAllTasksComplete(OnAllBatchTasksComplete.TERMINATE_JOB)
10041052
.setPoolInfo(poolInfo)
1005-
10061053
apply(() -> client.updateJob(jobId, jobParameter))
10071054
}
10081055
catch (HttpResponseException e) {
10091056
if (e.response.statusCode == 409) {
1010-
log.debug "Azure Batch job ${jobId} already terminated, skipping auto-termination setup"
1057+
log.debug "Azure Batch job ${jobId} already terminated or in terminal state, skipping auto-termination setup"
10111058
} else {
1012-
log.warn "Unable to set auto-termination for Azure Batch job ${jobId} - Status: ${e.response.statusCode}, Reason: ${e.message ?: e}"
1059+
log.warn "Unable to set auto-termination for Azure Batch job ${jobId} - HTTP Status: ${e.response.statusCode}, Reason: ${e.message ?: e}"
10131060
}
10141061
}
10151062
catch (Exception e) {

plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy

Lines changed: 0 additions & 89 deletions
This file was deleted.

plugins/nf-azure/src/test/nextflow/cloud/azure/config/AzureConfigTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class AzureConfigTest extends Specification {
119119
location: LOCATION,
120120
autoPoolMode: true,
121121
allowPoolCreation: true,
122-
terminateJobsOnCompletion: false,
122+
terminateJobsOnCompletion: false,
123123
deleteJobsOnCompletion: true,
124124
deletePoolsOnCompletion: true,
125125
deleteTasksOnCompletion: false,

0 commit comments

Comments
 (0)