Skip to content

Commit

Permalink
sweep: #7861 getTasksToSubmit consider tasks inserted by 30 seconds o…
Browse files Browse the repository at this point in the history
…r more
  • Loading branch information
fstagni committed Oct 31, 2024
1 parent b5bc25c commit c90bc53
Showing 1 changed file with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
""" Service for interacting with TransformationDB
"""
from DIRAC import S_OK, S_ERROR
import datetime

from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Security.Properties import SecurityProperty
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.TransformationSystem.Client import TransformationFilesStatus
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.Client.Operation import Operation


TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted(
set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES)
Expand Down Expand Up @@ -377,25 +378,45 @@ def export_extendTransformation(self, transName, nTasks):
types_getTasksToSubmit = [[int, str], int]

def export_getTasksToSubmit(self, transName, numTasks, site=""):
"""Get information necessary for submission for a given number of tasks for a given transformation"""
"""
Retrieve the necessary information for the submission of a specified number of tasks
for a given transformation. This includes reserving tasks to avoid race conditions.
:param int | str transName: Name of the transformation
:param int numTasks: Number of tasks to retrieve for submission
:param str site: Optional site specification
:return: S_OK Dictionary containing transformation and task submission details
"""
# Get the transformation details
res = self.transformationDB.getTransformation(transName)
if not res["OK"]:
return res
transDict = res["Value"]

submitDict = {}

# Apply a delay to avoid race conditions
older = datetime.datetime.now() - datetime.timedelta(seconds=30)

# Retrieve tasks that are ready for submission
res = self.transformationDB.getTasksForSubmission(
transName, numTasks=numTasks, site=site, statusList=["Created"]
transName, numTasks=numTasks, site=site, statusList=["Created"], older=older
)
if not res["OK"]:
return res
tasksDict = res["Value"]

# Reserve each task for submission
for taskID, taskDict in tasksDict.items():
res = self.transformationDB.reserveTask(transName, int(taskID))
if not res["OK"]:
return res
else:
submitDict[taskID] = taskDict
# Add reserved task to the submission dictionary
submitDict[taskID] = taskDict

# Add the job dictionary to the transformation details
transDict["JobDictionary"] = submitDict

return S_OK(transDict)

####################################################################
Expand Down

0 comments on commit c90bc53

Please sign in to comment.