From 3d357dc60c63610d08cb613844dfdfd6ab9c7bc2 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 31 Mar 2026 12:10:15 +0200 Subject: [PATCH] refactor: moved getJobParameters in JobParametersDB --- .../Agent/JobCleaningAgent.py | 5 +- .../Agent/PushJobAgent.py | 4 +- .../Agent/StalledJobAgent.py | 3 +- .../DB/JobParametersDB.py | 62 +++++++++++++++++++ .../Service/JobMonitoringHandler.py | 8 +-- .../Utilities/JobParameters.py | 62 ------------------- 6 files changed, 73 insertions(+), 71 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index d8287c07dd4..de342224c05 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -1,4 +1,4 @@ -""" The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle. +"""The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle. This agent will take care of: - removing all jobs that are in status JobStatus.DELETED @@ -22,6 +22,7 @@ than 0. """ + import datetime import os @@ -37,10 +38,10 @@ from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE -from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters class JobCleaningAgent(AgentModule): diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 68720c67ff5..e06928d48ae 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -1,4 +1,4 @@ -""" The Push Job Agent class inherits from Job Agent and aims to support job submission in +"""The Push Job Agent class inherits from Job Agent and aims to support job submission in sites with no external connectivity (e.g. some supercomputers). .. literalinclude:: ../ConfigTemplate.cfg @@ -32,6 +32,7 @@ from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus, PilotStatus from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( getJobWrapper, @@ -41,7 +42,6 @@ transferInputSandbox, ) from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials -from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index 5ddf97e5f45..23092ef200c 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -8,6 +8,7 @@ :dedent: 2 :caption: StalledJobAgent options """ + import concurrent.futures import datetime @@ -20,9 +21,9 @@ from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities.TimeUtilities import fromString, second, toEpoch from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_KILL -from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py index 3b074c845d7..2d67419d479 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py @@ -33,6 +33,68 @@ } +def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> dict: + """Utility to get a job parameter for a list of jobIDs pertaining to a VO. + If the jobID is not in the JobParametersDB, it will be looked up in the JobDB. + + Requires direct access to the JobParametersDB and JobDB. + + :param jobIDs: list of jobIDs + :param parName: name of the parameter to be retrieved + :param vo: VO of the jobIDs + :return: dictionary with jobID as key and the parameter as value + :rtype: dict + """ + + from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB + + elasticJobParametersDB = JobParametersDB() + jobDB = JobDB() + + if vo: # a user is connecting, with a proxy + res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName) + if not res["OK"]: + return res + parameters = res["Value"] + else: # a service is connecting, no proxy, e.g. StalledJobAgent + q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})" + res = jobDB._query(q) + if not res["OK"]: + return res + if not res["Value"]: + return S_OK({}) + # get the VO for each jobID + voDict = {} + for jobID, vo in res["Value"]: + if vo not in voDict: + voDict[vo] = [] + voDict[vo].append(jobID) + # get the parameters for each VO + parameters = {} + for vo, jobIDs in voDict.items(): + res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName) + if not res["OK"]: + return res + parameters.update(res["Value"]) + + # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends + res = jobDB.getJobParameters(jobIDs, parName) + if not res["OK"]: + return res + parametersM = res["Value"] + + # and now combine + final = dict(parametersM) + # if job in JobDB, update with parameters from ES if any + for jobID in final: + final[jobID].update(parameters.get(jobID, {})) + # if job in ES and not in JobDB, take ES + for jobID in parameters: + if jobID not in final: + final[jobID] = parameters[jobID] + return S_OK(final) + + class JobParametersDB(ElasticDB): def __init__(self, parentLogger=None): """Standard Constructor""" diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index 9743c91db94..d8e9bc4167c 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -1,7 +1,7 @@ -""" JobMonitoringHandler is the implementation of the JobMonitoring service - in the DISET framework +"""JobMonitoringHandler is the implementation of the JobMonitoring service +in the DISET framework - The following methods are available in the Service interface +The following methods are available in the Service interface """ from DIRAC import S_ERROR, S_OK @@ -10,7 +10,7 @@ from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.JEncode import strToIntDict from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters class JobMonitoringHandlerMixin: diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py index 92b95f3a51d..e1d048741e6 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py @@ -141,68 +141,6 @@ def getNumberOfGPUs(siteName=None, gridCE=None, queue=None): return 0 -def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> dict: - """Utility to get a job parameter for a list of jobIDs pertaining to a VO. - If the jobID is not in the JobParametersDB, it will be looked up in the JobDB. - - Requires direct access to the JobParametersDB and JobDB. - - :param jobIDs: list of jobIDs - :param parName: name of the parameter to be retrieved - :param vo: VO of the jobIDs - :return: dictionary with jobID as key and the parameter as value - :rtype: dict - """ - from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB - from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB - - elasticJobParametersDB = JobParametersDB() - jobDB = JobDB() - - if vo: # a user is connecting, with a proxy - res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName) - if not res["OK"]: - return res - parameters = res["Value"] - else: # a service is connecting, no proxy, e.g. StalledJobAgent - q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})" - res = jobDB._query(q) - if not res["OK"]: - return res - if not res["Value"]: - return S_OK({}) - # get the VO for each jobID - voDict = {} - for jobID, vo in res["Value"]: - if vo not in voDict: - voDict[vo] = [] - voDict[vo].append(jobID) - # get the parameters for each VO - parameters = {} - for vo, jobIDs in voDict.items(): - res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName) - if not res["OK"]: - return res - parameters.update(res["Value"]) - - # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends - res = jobDB.getJobParameters(jobIDs, parName) - if not res["OK"]: - return res - parametersM = res["Value"] - - # and now combine - final = dict(parametersM) - # if job in JobDB, update with parameters from ES if any - for jobID in final: - final[jobID].update(parameters.get(jobID, {})) - # if job in ES and not in JobDB, take ES - for jobID in parameters: - if jobID not in final: - final[jobID] = parameters[jobID] - return S_OK(final) - - def getAvailableRAM(siteName=None, gridCE=None, queue=None): """Gets the available RAM on a certain CE/queue/node (what the pilot administers)