Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] getTasksToSubmit consider tasks inserted by 30 seconds or more #7861

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
""" Service for interacting with TransformationDB
"""

from DIRAC import S_OK, S_ERROR
import datetime

from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Security.Properties import SecurityProperty
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.JEncode import encode as jencode
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.TransformationSystem.Client import TransformationFilesStatus
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.Client.Operation import Operation


TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted(
set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES)
Expand Down Expand Up @@ -394,25 +395,45 @@ def export_extendTransformation(self, transName, nTasks):
types_getTasksToSubmit = [[int, str], int]

def export_getTasksToSubmit(self, transName, numTasks, site=""):
"""Get information necessary for submission for a given number of tasks for a given transformation"""
"""
Retrieve the necessary information for the submission of a specified number of tasks
for a given transformation. This includes reserving tasks to avoid race conditions.
:param int | str transName: Name of the transformation
:param int numTasks: Number of tasks to retrieve for submission
:param str site: Optional site specification
:return: S_OK Dictionary containing transformation and task submission details
"""
# Get the transformation details
res = self.transformationDB.getTransformation(transName)
if not res["OK"]:
return res
transDict = res["Value"]

submitDict = {}

# Apply a delay to avoid race conditions
older = datetime.datetime.now() - datetime.timedelta(seconds=30)

# Retrieve tasks that are ready for submission
res = self.transformationDB.getTasksForSubmission(
transName, numTasks=numTasks, site=site, statusList=["Created"]
transName, numTasks=numTasks, site=site, statusList=["Created"], older=older
)
if not res["OK"]:
return res
tasksDict = res["Value"]

# Reserve each task for submission
for taskID, taskDict in tasksDict.items():
res = self.transformationDB.reserveTask(transName, int(taskID))
if not res["OK"]:
return res
else:
submitDict[taskID] = taskDict
# Add reserved task to the submission dictionary
submitDict[taskID] = taskDict

# Add the job dictionary to the transformation details
transDict["JobDictionary"] = submitDict

return S_OK(transDict)

####################################################################
Expand Down
Loading