From 111f2cabfb2edbe6c5168a51bb8ff8d09f0b6cf5 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Wed, 30 Oct 2024 16:16:05 +0100 Subject: [PATCH 1/2] fix: getTasksToSubmit consider tasks inserted by 30 seconds or more --- .../Service/TransformationManagerHandler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index 73cb6c85a9a..049286b30be 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -1,6 +1,8 @@ """ Service for interacting with TransformationDB """ +import datetime + from DIRAC import S_OK, S_ERROR from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Security.Properties import SecurityProperty @@ -400,8 +402,10 @@ def export_getTasksToSubmit(self, transName, numTasks, site=""): return res transDict = res["Value"] submitDict = {} + # applying few seconds delay to avoid race conditions + older = datetime.datetime.now() - datetime.timedelta(seconds=30) res = self.transformationDB.getTasksForSubmission( - transName, numTasks=numTasks, site=site, statusList=["Created"] + transName, numTasks=numTasks, site=site, statusList=["Created"], older=older ) if not res["OK"]: return res From d8928c9a154b4c6bf305b6f4201376e208b28694 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Wed, 30 Oct 2024 16:30:29 +0100 Subject: [PATCH 2/2] style: added few comments, re-ordered imports --- .../Service/TransformationManagerHandler.py | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index 049286b30be..6175a27ce3c 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -3,18 +3,17 @@ import datetime -from DIRAC import S_OK, S_ERROR +from DIRAC import S_ERROR, S_OK +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Security.Properties import SecurityProperty from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.JEncode import encode as jencode from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.RequestManagementSystem.Client.Operation import Operation +from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.TransformationSystem.Client import TransformationFilesStatus from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.RequestManagementSystem.Client.Request import Request -from DIRAC.RequestManagementSystem.Client.Operation import Operation - TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted( set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES) @@ -396,27 +395,45 @@ def export_extendTransformation(self, transName, nTasks): types_getTasksToSubmit = [[int, str], int] def export_getTasksToSubmit(self, transName, numTasks, site=""): - """Get information necessary for submission for a given number of tasks for a given transformation""" + """ + Retrieve the necessary information for the submission of a specified number of tasks + for a given transformation. This includes reserving tasks to avoid race conditions. + + :param int | str transName: Name of the transformation + :param int numTasks: Number of tasks to retrieve for submission + :param str site: Optional site specification + :return: S_OK Dictionary containing transformation and task submission details + """ + # Get the transformation details res = self.transformationDB.getTransformation(transName) if not res["OK"]: return res transDict = res["Value"] + submitDict = {} - # applying few seconds delay to avoid race conditions + + # Apply a delay to avoid race conditions older = datetime.datetime.now() - datetime.timedelta(seconds=30) + + # Retrieve tasks that are ready for submission res = self.transformationDB.getTasksForSubmission( transName, numTasks=numTasks, site=site, statusList=["Created"], older=older ) if not res["OK"]: return res tasksDict = res["Value"] + + # Reserve each task for submission for taskID, taskDict in tasksDict.items(): res = self.transformationDB.reserveTask(transName, int(taskID)) if not res["OK"]: return res - else: - submitDict[taskID] = taskDict + # Add reserved task to the submission dictionary + submitDict[taskID] = taskDict + + # Add the job dictionary to the transformation details transDict["JobDictionary"] = submitDict + return S_OK(transDict) ####################################################################