diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index 73cb6c85a9a..6175a27ce3c 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -1,18 +1,19 @@ """ Service for interacting with TransformationDB """ -from DIRAC import S_OK, S_ERROR +import datetime + +from DIRAC import S_ERROR, S_OK +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Security.Properties import SecurityProperty from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.JEncode import encode as jencode from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.RequestManagementSystem.Client.Operation import Operation +from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.TransformationSystem.Client import TransformationFilesStatus from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.RequestManagementSystem.Client.Request import Request -from DIRAC.RequestManagementSystem.Client.Operation import Operation - TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted( set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES) @@ -394,25 +395,45 @@ def export_extendTransformation(self, transName, nTasks): types_getTasksToSubmit = [[int, str], int] def export_getTasksToSubmit(self, transName, numTasks, site=""): - """Get information necessary for submission for a given number of tasks for a given transformation""" + """ + Retrieve the necessary information for the submission of a specified number of tasks + for a given transformation. This includes reserving tasks to avoid race conditions. + + :param int | str transName: Name of the transformation + :param int numTasks: Number of tasks to retrieve for submission + :param str site: Optional site specification + :return: S_OK Dictionary containing transformation and task submission details + """ + # Get the transformation details res = self.transformationDB.getTransformation(transName) if not res["OK"]: return res transDict = res["Value"] + submitDict = {} + + # Apply a delay to avoid race conditions + older = datetime.datetime.now() - datetime.timedelta(seconds=30) + + # Retrieve tasks that are ready for submission res = self.transformationDB.getTasksForSubmission( - transName, numTasks=numTasks, site=site, statusList=["Created"] + transName, numTasks=numTasks, site=site, statusList=["Created"], older=older ) if not res["OK"]: return res tasksDict = res["Value"] + + # Reserve each task for submission for taskID, taskDict in tasksDict.items(): res = self.transformationDB.reserveTask(transName, int(taskID)) if not res["OK"]: return res - else: - submitDict[taskID] = taskDict + # Add reserved task to the submission dictionary + submitDict[taskID] = taskDict + + # Add the job dictionary to the transformation details transDict["JobDictionary"] = submitDict + return S_OK(transDict) ####################################################################