Skip to content

Commit

Permalink
Merge pull request #7792 from fstagni/cherry-pick-2-93ad9405b-integra…
Browse files Browse the repository at this point in the history
…tion

[sweep:integration] feat: SD will always bundle proxy
  • Loading branch information
fstagni authored Sep 13, 2024
2 parents a9c0f75 + 8f88381 commit 41c93fa
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ Let's take an example::
maxCPUTime = 200
MaxTotalJobs = 5
MaxWaitingJobs = 10
BundleProxy = True
RemoveOutput = True
}
# This queue has Tag = GPU. So it will accept:
Expand Down
1 change: 0 additions & 1 deletion docs/source/AdministratorGuide/Tutorials/installWMS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ Then, as ``diracuser`` with the ``dirac_admin`` proxy, we need to define a CE in
CPUTime = 40000
MaxTotalJobs = 5
MaxWaitingJobs = 10
BundleProxy = True
BatchError = /home/diracpilot/localsite/error
ExecutableArea = /home/diracpilot/localsite/submission
RemoveOutput = True
Expand Down
22 changes: 7 additions & 15 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ def beginExecution(self):
self.log.always("MaxPilotsToSubmit:", self.maxPilotsToSubmit)

# Build the dictionary of queues that are going to be used: self.queueDict
result = self._buildQueueDict(siteNames, ceTypes, ces, tags)
if not result:
if not (result := self._buildQueueDict(siteNames, ceTypes, ces, tags))["OK"]:
return result

# Stop the execution if there is no usable queue
Expand Down Expand Up @@ -449,16 +448,11 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
"""
self.log.info("Going to submit pilots", f"(a maximum of {pilotsToSubmit} pilots to {queue} queue)")

# Get parameters to generate the pilot executable
bundleProxy = self.queueDict[queue].get("BundleProxy", False)
proxy = None
if bundleProxy:
proxy = ce.proxy
jobExecDir = self.queueDict[queue]["ParametersDict"].get("JobExecDir", "")
envVariables = self.queueDict[queue]["ParametersDict"].get("EnvironmentVariables", None)

# Generate the executable
executable = self._getExecutable(queue, proxy=proxy, jobExecDir=jobExecDir, envVariables=envVariables)
executable = self._getExecutable(queue, proxy=ce.proxy, jobExecDir=jobExecDir, envVariables=envVariables)

# Submit the job
submitResult = ce.submitJob(executable, "", pilotsToSubmit)
Expand Down Expand Up @@ -564,13 +558,11 @@ def _addPilotReferences(self, queue: str, pilotList: list[str], stampDict: dict[
return result
return S_OK()

def _getExecutable(
self, queue: str, proxy: X509Chain = None, jobExecDir: str = "", envVariables: dict[str, str] = None
):
def _getExecutable(self, queue: str, proxy: X509Chain, jobExecDir: str = "", envVariables: dict[str, str] = None):
"""Prepare the full executable for queue
:param queue: queue name
:param proxy: flag that say if to bundle or not the proxy
:param proxy: proxy to bundle
:param jobExecDir: pilot execution dir (normally an empty string)
:returns: a string the options for the pilot
Expand All @@ -580,6 +572,7 @@ def _getExecutable(
if not pilotOptions:
self.log.warn("Pilots will be submitted without additional options")
pilotOptions = []

pilotOptions = " ".join(pilotOptions)
self.log.verbose(f"pilotOptions: {pilotOptions}")

Expand Down Expand Up @@ -614,7 +607,7 @@ def _getPilotOptions(self, queue: str) -> list[str]:
setup = gConfig.getValue("/DIRAC/Setup", "unknown")
if setup == "unknown":
self.log.error("Setup is not defined in the configuration")
return [None, None]
return []
pilotOptions.append(f"-S {setup}")
opsHelper = Operations(vo=self.vo, setup=setup)

Expand Down Expand Up @@ -687,7 +680,7 @@ def _writePilotScript(
self,
workingDirectory: str,
pilotOptions: str,
proxy: X509Chain = None,
proxy: X509Chain,
pilotExecDir: str = "",
envVariables: dict[str, str] = None,
):
Expand Down Expand Up @@ -717,7 +710,6 @@ def _writePilotScript(
location=location,
CVMFS_locations=CVMFS_locations,
)

return _writePilotWrapperFile(workingDirectory=workingDirectory, localPilot=localPilot)

#####################################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@

import datetime
import os
from unittest.mock import MagicMock

import pytest
from diraccfg import CFG

from DIRAC import gLogger, gConfig
from DIRAC import S_OK, gConfig, gLogger
from DIRAC.ConfigurationSystem.Client import ConfigurationData
from DIRAC.Core.Utilities.ProcessPool import S_OK
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus

from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector
from DIRAC.WorkloadManagementSystem.Client import PilotStatus


CONFIG = """
Registry
{
Expand Down Expand Up @@ -261,13 +260,35 @@ def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory):
"-e 1,2,3",
} == set(pilotOptions)

proxyObject_mock = MagicMock()
proxyObject_mock.dumpAllToString.return_value = S_OK("aProxy")

# Write pilot script
res = sd._writePilotScript(pilotWrapperDirectory, pilotOptions)
res = sd._writePilotScript(pilotWrapperDirectory, pilotOptions, proxyObject_mock)

# Make sure the file exists
assert os.path.exists(res) and os.path.isfile(res)


def test__submitPilotsToQueue(sd):
"""Testing SiteDirector()._submitPilotsToQueue()"""
# Create a MagicMock that does not have the workingDirectory
# attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes)
# This is to use the SiteDirector's working directory, not the CE one
ceMock = MagicMock()
del ceMock.workingDirectory
proxyObject_mock = MagicMock()
proxyObject_mock.dumpAllToString.return_value = S_OK("aProxy")
ceMock.proxy = proxyObject_mock

sd.queueCECache = {"ce1.site1.com_condor": {"CE": ceMock, "Hash": "3d0dd0c60fffa900c511d7442e9c7634"}}
sd.queueSlots = {"ce1.site1.com_condor": {"AvailableSlots": 10}}
sd._buildQueueDict()
sd.sendSubmissionAccounting = False
sd.sendSubmissionMonitoring = False
assert sd._submitPilotsToQueue(1, ceMock, "ce1.site1.com_condor")["OK"]


def test_updatePilotStatus(sd):
"""Updating the status of some fake pilot references"""
# 1. We have not submitted any pilots, there is nothing to update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ def getQueuesResolved(siteDict, queueCECache, vo=None, checkPlatform=False, inst
if checkPlatform:
setPlatform(ceDict, queueDict[queueName]["ParametersDict"])

bundleProxy = queueDict[queueName]["ParametersDict"].get("BundleProxy", ceDict.get("BundleProxy"))
if bundleProxy and bundleProxy.lower() in ["true", "yes", "1"]:
queueDict[queueName]["BundleProxy"] = True

return S_OK(queueDict)


Expand Down

0 comments on commit 41c93fa

Please sign in to comment.