Skip to content

Commit

Permalink
sweep: DIRACGrid#7861 getTasksToSubmit consider tasks inserted by 30 …
Browse files Browse the repository at this point in the history
…seconds or more
  • Loading branch information
fstagni committed Nov 6, 2024
1 parent a461a40 commit be49b15
Showing 1 changed file with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
""" Service for interacting with TransformationDB
"""
import datetime

from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
Expand Down Expand Up @@ -393,25 +394,45 @@ def export_extendTransformation(self, transName, nTasks):
types_getTasksToSubmit = [[int, str], int]

def export_getTasksToSubmit(self, transName, numTasks, site=""):
"""Get information necessary for submission for a given number of tasks for a given transformation"""
"""
Retrieve the necessary information for the submission of a specified number of tasks
for a given transformation. This includes reserving tasks to avoid race conditions.
:param int | str transName: Name of the transformation
:param int numTasks: Number of tasks to retrieve for submission
:param str site: Optional site specification
:return: S_OK Dictionary containing transformation and task submission details
"""
# Get the transformation details
res = self.transformationDB.getTransformation(transName)
if not res["OK"]:
return res
transDict = res["Value"]

submitDict = {}

# Apply a delay to avoid race conditions
older = datetime.datetime.now() - datetime.timedelta(seconds=30)

# Retrieve tasks that are ready for submission
res = self.transformationDB.getTasksForSubmission(
transName, numTasks=numTasks, site=site, statusList=["Created"]
transName, numTasks=numTasks, site=site, statusList=["Created"], older=older
)
if not res["OK"]:
return res
tasksDict = res["Value"]

# Reserve each task for submission
for taskID, taskDict in tasksDict.items():
res = self.transformationDB.reserveTask(transName, int(taskID))
if not res["OK"]:
return res
else:
submitDict[taskID] = taskDict
# Add reserved task to the submission dictionary
submitDict[taskID] = taskDict

# Add the job dictionary to the transformation details
transDict["JobDictionary"] = submitDict

return S_OK(transDict)

####################################################################
Expand Down

0 comments on commit be49b15

Please sign in to comment.