diff --git a/README.md b/README.md index 14c29b2..d0e4608 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ # Dirac CWL Prototype ![Workflow tests](https://github.com/DIRACGrid/dirac-cwl/actions/workflows/main.yml/badge.svg?branch=main) -![Schema Generation](https://github.com/DIRACGrid/dirac-cwl/actions/workflows/generate-schemas.yml/badge.svg?branch=main) [![Conda Version](https://img.shields.io/conda/vn/conda-forge/dirac-cwl.svg)](https://anaconda.org/conda-forge/dirac-cwl) [![Conda Recipe](https://img.shields.io/badge/recipe-dirac--cwl-green.svg)](https://anaconda.org/conda-forge/dirac-cwl) +![Schema Generation](https://github.com/DIRACGrid/dirac-cwl/actions/workflows/generate-schemas.yml/badge.svg?branch=main) [![Conda Version](https://img.shields.io/conda/vn/conda-forge/dirac-cwl.svg)](https://anaconda.org/conda-forge/dirac-cwl) [![Conda Recipe](https://img.shields.io/badge/recipe-dirac--cwl-green.svg)](https://anaconda.org/conda-forge/dirac-cwl) This Python prototype introduces a command-line interface (CLI) designed for the end-to-end execution of Common Workflow Language (CWL) workflows at different scales. It enables users to locally test CWL workflows, and then run them as jobs, transformations and/or productions. diff --git a/src/dirac_cwl/commands/__init__.py b/src/dirac_cwl/commands/__init__.py index 01e8b17..aa213e4 100644 --- a/src/dirac_cwl/commands/__init__.py +++ b/src/dirac_cwl/commands/__init__.py @@ -1,5 +1,6 @@ """Command classes for workflow pre/post-processing operations.""" from .core import PostProcessCommand, PreProcessCommand +from .upload_log_file import UploadLogFile -__all__ = ["PreProcessCommand", "PostProcessCommand"] +__all__ = ["PreProcessCommand", "PostProcessCommand", "UploadLogFile"] diff --git a/src/dirac_cwl/commands/upload_log_file.py b/src/dirac_cwl/commands/upload_log_file.py new file mode 100644 index 0000000..3e8f0a9 --- /dev/null +++ b/src/dirac_cwl/commands/upload_log_file.py @@ -0,0 +1,162 @@ +"""Post-processing command for uploading logging information to a Storage Element.""" + +import glob +import os +import random +import stat +import time +import zipfile +from urllib.parse import urljoin + +from DIRAC import S_ERROR, S_OK, siteName +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.Core.Utilities.Adler import fileAdler +from DIRAC.Core.Utilities.ReturnValues import returnSingleResult +from DIRAC.DataManagementSystem.Client.FailoverTransfer import FailoverTransfer +from DIRAC.DataManagementSystem.Utilities.ResolveSE import getDestinationSEList +from DIRAC.Resources.Catalog.PoolXMLFile import getGUID +from DIRAC.Resources.Storage.StorageElement import StorageElement +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport + +from dirac_cwl.commands import PostProcessCommand + + +class UploadLogFile(PostProcessCommand): + """Post-processing command for log file uploading.""" + + def execute(self, job_path, **kwargs): + """Execute the log uploading process. + + :param job_path: Path to the job working directory. + :param kwargs: Additional keyword arguments. + """ + # Obtain workflow information + job_id = kwargs.get("job_id", None) + production_id = kwargs.get("production_id", None) + namespace = kwargs.get("namespace", None) + config_version = kwargs.get("config_version", None) + + if not job_path or not production_id or not namespace or not config_version: + return S_ERROR("Not enough information to perform the log upload") + + ops = Operations() + log_extensions = ops.getValue("LogFiles/Extensions", []) + log_se = ops.getValue("LogStorage/LogSE", "LogSE") + + job_report = JobReport(job_id) + + output_files = self.obtain_output_files(job_path, log_extensions) + + if not output_files: + return S_OK("No files to upload") + + # Zip files + zip_name = job_id.zfill(8) + ".zip" + zip_path = os.path.join(job_path, zip_name) + + try: + self.zip_files(zip_path, output_files) + except (AttributeError, OSError, ValueError) as e: + job_report.setApplicationStatus("Failed to create zip of log files") + return S_OK(f"Failed to zip files: {repr(e)}") + + # Obtain the log destination + zip_lfn = self.get_zip_lfn(production_id, job_id, namespace, config_version) + + # Upload to the SE + result = returnSingleResult(StorageElement(log_se).putFile({zip_lfn: zip_path})) + + if not result["OK"]: # Failed to uplaod to the LogSE + result = self.generate_failover_transfer(zip_path, zip_name, zip_lfn) + + if not result["OK"]: + job_report.setApplicationStatus("Failed To Upload Logs") + return S_ERROR("Failed to upload to FailoverSE") + + # Set the Log URL parameter + result = returnSingleResult(StorageElement(log_se).getURL(zip_path, protocol="https")) + if not result["OK"]: + # The rule for interpreting what is to be deflated can be found in /eos/lhcb/grid/prod/lhcb/logSE/.htaccess + logHttpsURL = urljoin("https://lhcb-dirac-logse.web.cern.ch/lhcb-dirac-logse/", zip_lfn) + else: + logHttpsURL = result["Value"] + + logHttpsURL = logHttpsURL.replace(".zip", "/") + job_report.setJobParameter("Log URL", f'Log file directory') + + return S_OK("Log Files uploaded") + + def zip_files(self, outputFile, files=None, directory=None): + """Zip list of files.""" + with zipfile.ZipFile(outputFile, "w") as zipped: + for fileIn in files: + # ZIP does not support timestamps before 1980, so for those we simply "touch" + st = os.stat(fileIn) + mtime = time.localtime(st.st_mtime) + dateTime = mtime[0:6] + if dateTime[0] < 1980: + os.utime(fileIn, None) # same as "touch" + + zipped.write(fileIn) + + def obtain_output_files(self, job_path, extensions=[]): + """Obtain the files to be added to the log zip from the outputs.""" + log_file_extensions = extensions + + if not log_file_extensions: + log_file_extensions = [ + "*.txt", + "*.log", + "*.out", + "*.output", + "*.xml", + "*.sh", + "*.info", + "*.err", + "prodConf*.py", + "prodConf*.json", + ] + + files = [] + + for extension in log_file_extensions: + glob_list = glob.glob(extension, root_dir=job_path, recursive=True) + for check in glob_list: + path = os.path.join(job_path, check) + if os.path.isfile(path): + os.chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH + stat.S_IXOTH) + files.append(path) + + return files + + def get_zip_lfn(self, production_id, job_id, namespace, config_version): + """Form a logical file name from certain information from the workflow.""" + production_id = str(production_id).zfill(8) + job_id = str(job_id).zfill(8) + jobindex = str(int(int(job_id) / 10000)).zfill(4) + + log_path = os.path.join("/lhcb", namespace, config_version, "LOG", production_id, jobindex, "") + path = os.path.join(log_path, f"{job_id}.zip") + return path + + def generate_failover_transfer(self, zip_path, zip_name, zip_lfn): + """Prepare a failover transfer .""" + failoverSEs = getDestinationSEList("Tier1-Failover", siteName()) + random.shuffle(failoverSEs) + + fileMetaDict = { + "Size": os.path.getsize(zip_path), + "LFN": zip_lfn, + "GUID": getGUID(zip_path), + "Checksum": fileAdler(zip_path), + "ChecksumType": "ADLER32", + } + + return FailoverTransfer().transferAndRegisterFile( + fileName=zip_name, + localPath=zip_path, + lfn=zip_lfn, + destinationSEList=failoverSEs, + fileMetaDict=fileMetaDict, + masterCatalogOnly=True, + ) diff --git a/test/test_commands.py b/test/test_commands.py new file mode 100644 index 0000000..6b9e8dc --- /dev/null +++ b/test/test_commands.py @@ -0,0 +1,275 @@ +""".""" + +import os +import tempfile +from urllib.parse import urljoin + +import pytest +from DIRACCommon.Core.Utilities.ReturnValues import S_ERROR, S_OK +from pytest_mock import MockerFixture + +from dirac_cwl.commands import UploadLogFile + + +class TestUploadLogFile: + """Collection of tests for the UploadLogFile command.""" + + FILENAMES = ["file.txt", "file.log", "file.err", "file.out", "file.extra"] + JOB_ID = "8042" + PRODUCTION_ID = "95376" + NAMESPACE = "MC" + CONFIG_VERSION = "2016" + + @pytest.fixture + def basedir(self): + """Fixture to initialize the working directory.""" + with tempfile.TemporaryDirectory() as tmpdir: + for file in self.FILENAMES: + with open(os.path.join(tmpdir, file), "x") as f: + f.write("EMPTY") + + yield tmpdir + + def test_correct_file_finding(self, basedir): + """Test output file finding.""" + files = UploadLogFile().obtain_output_files(basedir) + files_names = [os.path.basename(file_path) for file_path in files] + + assert set(self.FILENAMES).difference(files_names) == {"file.extra"} + + def test_correct_file_extension_finding(self, basedir): + """Test output file finding.""" + extensions = ["*.extra"] + files = UploadLogFile().obtain_output_files(basedir, extensions) + files_names = [os.path.basename(file_path) for file_path in files] + + assert set(self.FILENAMES).difference(files_names) == {"file.txt", "file.log", "file.err", "file.out"} + + def test_upload_ok(self, basedir, mocker: MockerFixture): + """Test a correct upload.""" + base_lfn = f"/lhcb/{self.NAMESPACE}/{self.CONFIG_VERSION}/LOG/{self.PRODUCTION_ID.zfill(8)}/0000/" + zip_name = self.JOB_ID.zfill(8) + ".zip" + + expected_lfn = os.path.join(base_lfn, zip_name) + expected_path = os.path.join(basedir, zip_name) + + # Mock Operations + mock_ops = mocker.patch("dirac_cwl.commands.upload_log_file.Operations") + mock_ops.return_value.getValue = lambda value, default=None: default + + # Mock JobReport + mock_job_report = mocker.patch("dirac_cwl.commands.upload_log_file.JobReport") + mock_set_app_status = mocker.MagicMock() + mock_set_job_parameter = mocker.MagicMock() + mock_job_report.return_value.setApplicationStatus = mock_set_app_status + mock_job_report.return_value.setJobParameter = mock_set_job_parameter + + # Mock StorageElement + mock_se = mocker.patch("dirac_cwl.commands.upload_log_file.StorageElement") + mock_put_file = mocker.MagicMock() + mock_get_url = mocker.MagicMock() + mock_put_file.return_value = S_OK({"Successful": {expected_lfn: "Borked"}, "Failed": {}}) + mock_get_url.return_value = S_OK(urljoin("https://lhcb-dirac-logse.web.cern.ch/", expected_lfn)) + mock_se.return_value.putFile = mock_put_file + mock_se.return_value.getURL = mock_get_url + + command = UploadLogFile() + + # Mock failover + mock_failover = mocker.patch.object(command, "generate_failover_transfer") + mock_failover.return_value = S_OK() + + result = command.execute( + basedir, + job_id=self.JOB_ID, + production_id=self.PRODUCTION_ID, + namespace=self.NAMESPACE, + config_version=self.CONFIG_VERSION, + ) + + assert result["OK"] + mock_get_url.assert_called_once_with(expected_path, protocol="https") + mock_put_file.assert_called_once_with({expected_lfn: expected_path}) + mock_failover.assert_not_called() + mock_set_app_status.assert_not_called() + mock_set_job_parameter.assert_called_once() + + def test_upload_ok_to_failover(self, basedir, mocker: MockerFixture): + """Test a failure to upload to the LogSE but a correct one to the Failover.""" + base_lfn = f"/lhcb/{self.NAMESPACE}/{self.CONFIG_VERSION}/LOG/{self.PRODUCTION_ID.zfill(8)}/0000/" + zip_name = self.JOB_ID.zfill(8) + ".zip" + + expected_lfn = os.path.join(base_lfn, zip_name) + expected_path = os.path.join(basedir, zip_name) + + # Mock Operations + mock_ops = mocker.patch("dirac_cwl.commands.upload_log_file.Operations") + mock_ops.return_value.getValue = lambda value, default=None: default + + # Mock JobReport + mock_job_report = mocker.patch("dirac_cwl.commands.upload_log_file.JobReport") + mock_set_app_status = mocker.MagicMock() + mock_set_job_parameter = mocker.MagicMock() + mock_job_report.return_value.setApplicationStatus = mock_set_app_status + mock_job_report.return_value.setJobParameter = mock_set_job_parameter + + # Mock StorageElement + mock_se = mocker.patch("dirac_cwl.commands.upload_log_file.StorageElement") + mock_put_file = mocker.MagicMock() + mock_get_url = mocker.MagicMock() + mock_put_file.return_value = S_OK({"Successful": {}, "Failed": {expected_lfn: "Borked"}}) + mock_get_url.return_value = S_OK(urljoin("https://lhcb-dirac-logse.web.cern.ch/", expected_lfn)) + mock_se.return_value.putFile = mock_put_file + mock_se.return_value.getURL = mock_get_url + + command = UploadLogFile() + + # Mock failover + mock_failover = mocker.patch.object(command, "generate_failover_transfer") + mock_failover.return_value = S_OK() + + result = command.execute( + basedir, + job_id=self.JOB_ID, + production_id=self.PRODUCTION_ID, + namespace=self.NAMESPACE, + config_version=self.CONFIG_VERSION, + ) + + assert result["OK"] + mock_get_url.assert_called_once_with(expected_path, protocol="https") + mock_put_file.assert_called_once_with({expected_lfn: expected_path}) + mock_failover.assert_called_once_with(expected_path, zip_name, expected_lfn) + mock_set_app_status.assert_not_called() + mock_set_job_parameter.assert_called_once() + + def test_upload_fail(self, basedir, mocker: MockerFixture): + """Test both a failure to upload to the LogSE and the FailoverSE.""" + base_lfn = f"/lhcb/{self.NAMESPACE}/{self.CONFIG_VERSION}/LOG/{self.PRODUCTION_ID.zfill(8)}/0000/" + zip_name = self.JOB_ID.zfill(8) + ".zip" + + expected_lfn = os.path.join(base_lfn, zip_name) + expected_path = os.path.join(basedir, zip_name) + + # Mock JobReport + mock_job_report = mocker.patch("dirac_cwl.commands.upload_log_file.JobReport") + mock_set_app_status = mocker.MagicMock() + mock_set_job_parameter = mocker.MagicMock() + mock_job_report.return_value.setApplicationStatus = mock_set_app_status + mock_job_report.return_value.setJobParameter = mock_set_job_parameter + + # Mock StorageElement + mock_se = mocker.patch("dirac_cwl.commands.upload_log_file.StorageElement") + mock_put_file = mocker.MagicMock() + mock_get_url = mocker.MagicMock() + mock_put_file.return_value = S_OK({"Successful": {}, "Failed": {expected_lfn: "Borked"}}) + mock_get_url.return_value = S_OK(urljoin("https://lhcb-dirac-logse.web.cern.ch/", expected_lfn)) + mock_se.return_value.putFile = mock_put_file + mock_se.return_value.getURL = mock_get_url + + command = UploadLogFile() + + # Mock failover + mock_failover = mocker.patch.object(command, "generate_failover_transfer") + mock_failover.return_value = S_ERROR() + + result = command.execute( + basedir, + job_id=self.JOB_ID, + production_id=self.PRODUCTION_ID, + namespace=self.NAMESPACE, + config_version=self.CONFIG_VERSION, + ) + + assert not result["OK"] + mock_get_url.assert_not_called() + mock_put_file.assert_called_once_with({expected_lfn: expected_path}) + mock_failover.assert_called_once_with(expected_path, zip_name, expected_lfn) + mock_set_app_status.assert_called_once() + mock_set_job_parameter.assert_not_called() + + def test_no_files_to_zip(self, basedir, mocker): + """Test execution when the job did not return any files.""" + import shutil + + shutil.rmtree(basedir) + + # Mock JobReport + mock_job_report = mocker.patch("dirac_cwl.commands.upload_log_file.JobReport") + mock_set_app_status = mocker.MagicMock() + mock_set_job_parameter = mocker.MagicMock() + mock_job_report.return_value.setApplicationStatus = mock_set_app_status + mock_job_report.return_value.setJobParameter = mock_set_job_parameter + + result = UploadLogFile().execute( + basedir, + job_id=self.JOB_ID, + production_id=self.PRODUCTION_ID, + namespace=self.NAMESPACE, + config_version=self.CONFIG_VERSION, + ) + + assert result["OK"] + assert result["Value"] == "No files to upload" + mock_set_app_status.assert_not_called() + + def test_failed_to_zip(self, basedir, mocker: MockerFixture): + """Test failure while zipping.""" + command = UploadLogFile() + + # Mocker zip + mock_zip = mocker.patch.object(command, "zip_files") + mock_zip.side_effect = [AttributeError(), OSError(), ValueError()] + + # Mock JobReport + mock_job_report = mocker.patch("dirac_cwl.commands.upload_log_file.JobReport") + mock_set_app_status = mocker.MagicMock() + mock_set_job_parameter = mocker.MagicMock() + mock_job_report.return_value.setApplicationStatus = mock_set_app_status + mock_job_report.return_value.setJobParameter = mock_set_job_parameter + + # Test raising AttributeError + result = command.execute( + basedir, + job_id=self.JOB_ID, + production_id=self.PRODUCTION_ID, + namespace=self.NAMESPACE, + config_version=self.CONFIG_VERSION, + ) + + assert result["OK"] + assert "Failed to zip files" in result["Value"] + assert "AttributeError" in result["Value"] + mock_set_app_status.assert_called_once_with("Failed to create zip of log files") + mock_set_app_status.reset_mock() + + result = command.execute( + basedir, + job_id=self.JOB_ID, + production_id=self.PRODUCTION_ID, + namespace=self.NAMESPACE, + config_version=self.CONFIG_VERSION, + ) + + # Test raising OSError + assert result["OK"] + assert "Failed to zip files" in result["Value"] + assert "OSError" in result["Value"] + mock_set_app_status.assert_called_once_with("Failed to create zip of log files") + mock_set_app_status.reset_mock() + + result = command.execute( + basedir, + job_id=self.JOB_ID, + production_id=self.PRODUCTION_ID, + namespace=self.NAMESPACE, + config_version=self.CONFIG_VERSION, + ) + + # Test raising ValueError + assert result["OK"] + assert "Failed to zip files" in result["Value"] + assert "ValueError" in result["Value"] + mock_set_app_status.assert_called_once_with("Failed to create zip of log files") + + mock_set_job_parameter.assert_not_called()