diff --git a/.bumpversion.cfg b/.bumpversion.cfg index d69f167def..043ca8964e 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 54.2.0 +current_version = 54.4.4 commit = True tag = True tag_name = v{new_version} diff --git a/cg/__init__.py b/cg/__init__.py index 8ce4f8cf69..d6908ef7ac 100644 --- a/cg/__init__.py +++ b/cg/__init__.py @@ -1,2 +1,2 @@ __title__ = "cg" -__version__ = "54.2.0" +__version__ = "54.4.4" diff --git a/cg/cli/archive.py b/cg/cli/archive.py index fa74f16b37..2e2344600e 100644 --- a/cg/cli/archive.py +++ b/cg/cli/archive.py @@ -2,6 +2,7 @@ from click.core import ParameterSource from cg.constants.archiving import DEFAULT_SPRING_ARCHIVE_COUNT +from cg.constants.constants import DRY_RUN from cg.meta.archive.archive import SpringArchiveAPI from cg.models.cg_config import CGConfig @@ -64,3 +65,20 @@ def update_job_statuses(context: CGConfig): data_flow_config=context.data_flow, ) spring_archive_api.update_statuses_for_ongoing_tasks() + + +@archive.command("delete-file") +@DRY_RUN +@click.pass_obj +@click.argument("file_path", required=True) +def delete_file(context: CGConfig, dry_run: bool, file_path: str): + """Delete an archived file and remove it from Housekeeper. + The file will not be deleted if it is not confirmed archived. + The file will not be deleted if its archive location can not be determined from the file tags. + """ + spring_archive_api = SpringArchiveAPI( + status_db=context.status_db, + housekeeper_api=context.housekeeper_api, + data_flow_config=context.data_flow, + ) + spring_archive_api.delete_file(file_path=file_path, dry_run=dry_run) diff --git a/cg/cli/workflow/rnafusion/base.py b/cg/cli/workflow/rnafusion/base.py index 5472e5b8cc..342418a4e6 100644 --- a/cg/cli/workflow/rnafusion/base.py +++ b/cg/cli/workflow/rnafusion/base.py @@ -119,7 +119,7 @@ def run( case_id=case_id, params_file=params_file ), "name": case_id, - "compute_env": compute_env or analysis_api.compute_env, + "compute_env": compute_env or analysis_api.get_compute_env(case_id=case_id), "revision": revision or analysis_api.revision, "wait": "SUBMITTED", "id": nf_tower_id, diff --git a/cg/cli/workflow/taxprofiler/base.py b/cg/cli/workflow/taxprofiler/base.py index c9da3117cd..24beb6941f 100644 --- a/cg/cli/workflow/taxprofiler/base.py +++ b/cg/cli/workflow/taxprofiler/base.py @@ -116,7 +116,7 @@ def run( case_id=case_id, params_file=params_file ), "name": case_id, - "compute_env": compute_env or analysis_api.compute_env, + "compute_env": compute_env or analysis_api.get_compute_env(case_id=case_id), "revision": revision or analysis_api.revision, "wait": NfTowerStatus.SUBMITTED, "id": nf_tower_id, diff --git a/cg/constants/delivery.py b/cg/constants/delivery.py index 21e898a1cc..c0ca4bd7cd 100644 --- a/cg/constants/delivery.py +++ b/cg/constants/delivery.py @@ -1,6 +1,7 @@ """Constants for delivery.""" from cg.constants.constants import Pipeline +from cg.constants.housekeeper_tags import AlignmentFileTag, AnalysisTag, HK_DELIVERY_REPORT_TAG ONLY_ONE_CASE_PER_TICKET: list[Pipeline] = [ Pipeline.FASTQ, @@ -37,8 +38,8 @@ ] BALSAMIC_ANALYSIS_SAMPLE_TAGS: list[set[str]] = [ - {"cram"}, - {"cram-index"}, + {AlignmentFileTag.CRAM}, + {AlignmentFileTag.CRAM_INDEX}, ] BALSAMIC_QC_ANALYSIS_CASE_TAGS: list[set[str]] = [ @@ -95,10 +96,10 @@ ] MIP_DNA_ANALYSIS_SAMPLE_TAGS: list[set[str]] = [ - {"bam"}, - {"bam-index"}, - {"cram"}, - {"cram-index"}, + {AlignmentFileTag.BAM}, + {AlignmentFileTag.BAM_BAI}, + {AlignmentFileTag.CRAM}, + {AlignmentFileTag.CRAM_INDEX}, ] MIP_RNA_ANALYSIS_CASE_TAGS: list[set[str]] = [ @@ -114,12 +115,12 @@ ] MIP_RNA_ANALYSIS_SAMPLE_TAGS: list[set[str]] = [ - {"fusion", "star-fusion"}, - {"fusion", "arriba"}, - {"cram"}, - {"cram-index"}, - {"fusion", "vcf"}, - {"fusion", "vcf-index"}, + {AnalysisTag.FUSION, AnalysisTag.STARFUSION}, + {AnalysisTag.FUSION, AnalysisTag.ARRIBA}, + {AlignmentFileTag.CRAM}, + {AlignmentFileTag.CRAM_INDEX}, + {AnalysisTag.FUSION, "vcf"}, + {AnalysisTag.FUSION, "vcf-index"}, {"salmon-quant"}, ] @@ -150,23 +151,23 @@ ] RNAFUSION_ANALYSIS_CASE_TAGS: list[set[str]] = [ - {"fusion", "arriba"}, - {"fusion", "star-fusion"}, - {"fusion", "fusioncatcher"}, - {"fusioncatcher-summary"}, - {"fusioninspector"}, - {"fusionreport", "research"}, - {"fusioninspector-html", "research"}, - {"arriba-visualisation", "research"}, - {"multiqc-html", "rna"}, - {"delivery-report"}, - {"vcf-fusion"}, - {"gene-counts"}, + {AnalysisTag.FUSION, AnalysisTag.ARRIBA}, + {AnalysisTag.FUSION, AnalysisTag.STARFUSION}, + {AnalysisTag.FUSION, AnalysisTag.FUSIONCATCHER}, + {AnalysisTag.FUSIONCATCHER_SUMMARY}, + {AnalysisTag.FUSIONINSPECTOR}, + {AnalysisTag.FUSIONREPORT, AnalysisTag.RESEARCH}, + {AnalysisTag.FUSIONINSPECTOR_HTML, AnalysisTag.RESEARCH}, + {AnalysisTag.ARRIBA_VISUALIZATION, AnalysisTag.RESEARCH}, + {AnalysisTag.MULTIQC_HTML, AnalysisTag.RNA}, + {HK_DELIVERY_REPORT_TAG}, + {AnalysisTag.VCF_FUSION}, + {AnalysisTag.GENE_COUNTS}, ] RNAFUSION_ANALYSIS_SAMPLE_TAGS: list[set[str]] = [ - {"cram"}, - {"cram-index"}, + {AlignmentFileTag.CRAM}, + {AlignmentFileTag.CRAM_INDEX}, ] diff --git a/cg/constants/gene_panel.py b/cg/constants/gene_panel.py index 61e20015aa..92aea16999 100644 --- a/cg/constants/gene_panel.py +++ b/cg/constants/gene_panel.py @@ -41,6 +41,7 @@ class GenePanelMasterList(StrEnum): SOVM: str = "SOVM" STROKE: str = "STROKE" AID: str = "AID" + INHERITED_CANCER: str = "Inherited cancer" @classmethod def get_panel_names(cls, panels=None) -> list[str]: diff --git a/cg/constants/housekeeper_tags.py b/cg/constants/housekeeper_tags.py index 104420ecf5..aeb1591102 100644 --- a/cg/constants/housekeeper_tags.py +++ b/cg/constants/housekeeper_tags.py @@ -52,6 +52,25 @@ class SequencingFileTag(StrEnum): HK_DELIVERY_REPORT_TAG = "delivery-report" +class AnalysisTag(StrEnum): + """Tags for analysis files.""" + + ARRIBA: str = "arriba" + ARRIBA_VISUALIZATION: str = "arriba-visualisation" + FUSION: str = "fusion" + FUSIONCATCHER: str = "fusioncatcher" + FUSIONCATCHER_SUMMARY: str = "fusioncatcher-summary" + FUSIONINSPECTOR: str = "fusioninspector" + FUSIONINSPECTOR_HTML: str = "fusioninspector-html" + FUSIONREPORT: str = "fusionreport" + GENE_COUNTS: str = "gene-counts" + MULTIQC_HTML: str = "multiqc-html" + RESEARCH: str = "research" + RNA: str = "rna" + STARFUSION: str = "star-fusion" + VCF_FUSION: str = "vcf-fusion" + + class HkMipAnalysisTag: CONFIG: list[str] = ["mip-config"] QC_METRICS: list[str] = ["qc-metrics", "deliverable"] diff --git a/cg/io/gzip.py b/cg/io/gzip.py new file mode 100644 index 0000000000..58f641c78d --- /dev/null +++ b/cg/io/gzip.py @@ -0,0 +1,8 @@ +import gzip +from pathlib import Path + + +def read_gzip_first_line(file_path: Path) -> str: + """Return first line of gzip file.""" + with gzip.open(file_path) as file: + return file.readline().decode() diff --git a/cg/meta/archive/archive.py b/cg/meta/archive/archive.py index d27d1c394e..2f76068b75 100644 --- a/cg/meta/archive/archive.py +++ b/cg/meta/archive/archive.py @@ -1,6 +1,7 @@ import logging from typing import Callable, Type +import click from housekeeper.store.models import Archive, File from pydantic import BaseModel, ConfigDict @@ -252,7 +253,7 @@ def sort_archival_ids_on_archive_location( jobs_per_location: dict[ArchiveLocations, list[int]] = {} jobs_and_locations: set[ tuple[int, ArchiveLocations] - ] = self.get_unique_archival_ids_and_their_archive_location(archive_entries) + ] = self.get_unique_archival_ids_and_archive_locations(archive_entries) for archive_location in ArchiveLocations: jobs_per_location[ArchiveLocations(archive_location)] = [ @@ -262,18 +263,14 @@ def sort_archival_ids_on_archive_location( ] return jobs_per_location - def get_unique_archival_ids_and_their_archive_location( + def get_unique_archival_ids_and_archive_locations( self, archive_entries: list[Archive] ) -> set[tuple[int, ArchiveLocations]]: - return set( - [ - ( - archive.archiving_task_id, - ArchiveLocations(self.get_archive_location_from_file(archive.file)), - ) - for archive in archive_entries - ] - ) + ids_and_locations: set[tuple[int, ArchiveLocations]] = set() + for archive in archive_entries: + if location := self.get_archive_location_from_file(archive.file): + ids_and_locations.add((archive.archiving_task_id, location)) + return ids_and_locations def sort_retrieval_ids_on_archive_location( self, archive_entries: list[Archive] @@ -283,7 +280,7 @@ def sort_retrieval_ids_on_archive_location( jobs_per_location: dict[ArchiveLocations, list[int]] = {} jobs_and_locations: set[ tuple[int, ArchiveLocations] - ] = self.get_unique_retrieval_ids_and_their_archive_location(archive_entries) + ] = self.get_unique_retrieval_ids_and_archive_locations(archive_entries) for archive_location in ArchiveLocations: jobs_per_location[ArchiveLocations(archive_location)] = [ job_and_location[0] @@ -292,18 +289,51 @@ def sort_retrieval_ids_on_archive_location( ] return jobs_per_location - def get_unique_retrieval_ids_and_their_archive_location( + def get_unique_retrieval_ids_and_archive_locations( self, archive_entries: list[Archive] ) -> set[tuple[int, ArchiveLocations]]: - return set( - [ - ( - archive.retrieval_task_id, - ArchiveLocations(self.get_archive_location_from_file(archive.file)), - ) - for archive in archive_entries - ] + ids_and_locations: set[tuple[int, ArchiveLocations]] = set() + for archive in archive_entries: + if location := self.get_archive_location_from_file(archive.file): + ids_and_locations.add((archive.retrieval_task_id, location)) + return ids_and_locations + + @staticmethod + def is_file_archived(file: File) -> bool: + return file.archive and file.archive.archived_at + + @staticmethod + def get_archive_location_from_file(file: File) -> ArchiveLocations | None: + for tag_name in [tag.name for tag in file.tags]: + if tag_name in iter(ArchiveLocations): + LOG.info(f"Found archive location {tag_name}") + return tag_name + LOG.warning("No archive location in the file tags") + return None + + def delete_file_from_archive_location( + self, file_and_sample: FileAndSample, archive_location: ArchiveLocations + ) -> None: + archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config) + archive_handler.delete_file(file_and_sample) + + def delete_file(self, file_path: str, dry_run: bool = False) -> None: + """Deletes the specified file where it is archived and deletes the Housekeeper record. + Raises: + Click.Abort if yes is not specified or the user does not confirm the deletion.""" + file: File = self.housekeeper_api.files(path=file_path).first() + if not self.is_file_archived(file): + LOG.warning(f"No archived file found for file {file_path} - exiting") + return + archive_location: ArchiveLocations | None = self.get_archive_location_from_file(file) + if not archive_location: + LOG.warning("No archive location could be determined - exiting") + return + if dry_run: + click.echo(f"Would have deleted file {file_path} from {archive_location}.") + return + file_and_sample: FileAndSample = self.add_samples_to_files([file])[0] + self.delete_file_from_archive_location( + file_and_sample=file_and_sample, archive_location=archive_location ) - - def get_archive_location_from_file(self, file: File) -> str: - return self.status_db.get_sample_by_internal_id(file.version.bundle.name).archive_location + self.housekeeper_api.delete_file(file.id) diff --git a/cg/meta/archive/ddn_dataflow.py b/cg/meta/archive/ddn_dataflow.py index 2e23f669b4..756a806f5d 100644 --- a/cg/meta/archive/ddn_dataflow.py +++ b/cg/meta/archive/ddn_dataflow.py @@ -30,10 +30,15 @@ SOURCE_ATTRIBUTE: str = "source" +def get_request_log(headers: dict, body: dict): + return "Sending request with headers: \n" + f"{headers} \n" + "and body: \n" + f"{body}" + + class DataflowEndpoints(StrEnum): """Enum containing all DDN dataflow endpoints used.""" ARCHIVE_FILES = "files/archive" + DELETE_FILE = "files/delete" GET_AUTH_TOKEN = "auth/token" REFRESH_AUTH_TOKEN = "auth/token/refresh" RETRIEVE_FILES = "files/retrieve" @@ -155,12 +160,7 @@ def post_request(self, url: str, headers: dict) -> "TransferJob": The job ID of the launched transfer task. """ - LOG.info( - "Sending request with headers: \n" - + f"{headers} \n" - + "and body: \n" - + f"{self.model_dump()}" - ) + LOG.info(get_request_log(headers=headers, body=self.model_dump())) response: Response = APIRequest.api_request_from_content( api_method=APIMethods.POST, @@ -222,12 +222,7 @@ def get_job_status(self, url: str, headers: dict) -> GetJobStatusResponse: HTTPError if the response code is not ok. """ - LOG.info( - "Sending request with headers: \n" - + f"{headers} \n" - + "and body: \n" - + f"{self.model_dump()}" - ) + LOG.info(get_request_log(headers=headers, body=self.model_dump())) response: Response = APIRequest.api_request_from_content( api_method=APIMethods.GET, @@ -240,6 +235,32 @@ def get_job_status(self, url: str, headers: dict) -> GetJobStatusResponse: return GetJobStatusResponse.model_validate(response.json()) +class DeleteFileResponse(BaseModel): + message: str + + +class DeleteFilePayload(BaseModel): + global_path: str + + def delete_file(self, url: str, headers: dict) -> DeleteFileResponse: + """Posts to the given URL with the given headers to delete the file or directory at the specified global path. + Returns the parsed response. + Raises: + HTTPError if the response code is not ok. + """ + LOG.info(get_request_log(headers=headers, body=self.model_dump())) + + response: Response = APIRequest.api_request_from_content( + api_method=APIMethods.POST, + url=url, + headers=headers, + json=self.model_dump(), + verify=False, + ) + response.raise_for_status() + return DeleteFileResponse.model_validate(response.json()) + + class DDNDataFlowClient(ArchiveHandler): """Class for archiving and retrieving folders via DDN Dataflow.""" @@ -261,6 +282,13 @@ def __init__(self, config: DataFlowConfig): def _set_auth_tokens(self) -> None: """Retrieves and sets auth and refresh token from the REST-API.""" + auth_token: AuthToken = self._get_auth_token() + self.refresh_token: str = auth_token.refresh + self.auth_token: str = auth_token.access + self.token_expiration: datetime = datetime.fromtimestamp(auth_token.expire) + + def _get_auth_token(self) -> AuthToken: + """Retrieves auth and refresh token from the REST-API.""" response: Response = APIRequest.api_request_from_content( api_method=APIMethods.POST, url=urljoin(base=self.url, url=DataflowEndpoints.GET_AUTH_TOKEN), @@ -274,13 +302,15 @@ def _set_auth_tokens(self) -> None: ) if not response.ok: raise DdnDataflowAuthenticationError(message=response.text) - response_content: AuthToken = AuthToken.model_validate(response.json()) - self.refresh_token: str = response_content.refresh - self.auth_token: str = response_content.access - self.token_expiration: datetime = datetime.fromtimestamp(response_content.expire) + return AuthToken.model_validate(response.json()) def _refresh_auth_token(self) -> None: """Updates the auth token by providing the refresh token to the REST-API.""" + auth_token: AuthToken = self._get_refreshed_auth_token() + self.auth_token: str = auth_token.access + self.token_expiration: datetime = datetime.fromtimestamp(auth_token.expire) + + def _get_refreshed_auth_token(self) -> AuthToken: response: Response = APIRequest.api_request_from_content( api_method=APIMethods.POST, url=urljoin(base=self.url, url=DataflowEndpoints.REFRESH_AUTH_TOKEN), @@ -288,9 +318,7 @@ def _refresh_auth_token(self) -> None: json=RefreshPayload(refresh=self.refresh_token).model_dump(), verify=False, ) - response_content: AuthToken = AuthToken.model_validate(response.json()) - self.auth_token: str = response_content.access - self.token_expiration: datetime = datetime.fromtimestamp(response_content.expire) + return AuthToken.model_validate(response.json()) @property def auth_header(self) -> dict[str, str]: @@ -347,7 +375,9 @@ def create_transfer_request( else (self.archive_repository, self.local_storage, DESTINATION_ATTRIBUTE) ) - transfer_request: TransferPayload = TransferPayload(files_to_transfer=miria_file_data) + transfer_request = TransferPayload( + files_to_transfer=miria_file_data, createFolder=is_archiving_request + ) transfer_request.trim_paths(attribute_to_trim=attribute) transfer_request.add_repositories( source_prefix=source_prefix, destination_prefix=destination_prefix @@ -382,3 +412,19 @@ def is_job_done(self, job_id: int) -> bool: if job_status in FAILED_JOB_STATUSES: raise ArchiveJobFailedError(f"Job with id {job_id} failed with status {job_status}") return False + + @staticmethod + def get_file_name(file: File) -> str: + return Path(file.path).name + + def delete_file(self, file_and_sample: FileAndSample) -> None: + """Deletes the given file via Miria.""" + file_name: str = self.get_file_name(file_and_sample.file) + sample_id: str = file_and_sample.sample.internal_id + delete_file_payload = DeleteFilePayload( + global_path=f"{self.archive_repository}{sample_id}/{file_name}" + ) + delete_file_payload.delete_file( + url=urljoin(self.url, DataflowEndpoints.DELETE_FILE), + headers=dict(self.headers, **self.auth_header), + ) diff --git a/cg/meta/archive/models.py b/cg/meta/archive/models.py index d9b976ebe2..d252b4e487 100644 --- a/cg/meta/archive/models.py +++ b/cg/meta/archive/models.py @@ -64,3 +64,7 @@ def convert_into_transfer_data( def is_job_done(self, job_id: int) -> bool: """Returns true if job has been completed, false otherwise.""" pass + + @abstractmethod + def delete_file(self, file_and_sample: FileAndSample) -> None: + """Deletes a file at the archive location.""" diff --git a/cg/meta/workflow/analysis.py b/cg/meta/workflow/analysis.py index f067e63944..a4f2a39d98 100644 --- a/cg/meta/workflow/analysis.py +++ b/cg/meta/workflow/analysis.py @@ -9,7 +9,7 @@ from housekeeper.store.models import Bundle, Version from cg.apps.environ import environ_email -from cg.constants import EXIT_FAIL, EXIT_SUCCESS, Pipeline, Priority +from cg.constants import EXIT_FAIL, EXIT_SUCCESS, Pipeline, Priority, SequencingFileTag from cg.constants.constants import ( AnalysisType, CaseActions, @@ -24,6 +24,7 @@ from cg.meta.workflow.fastq import FastqHandler from cg.models.analysis import AnalysisModel from cg.models.cg_config import CGConfig +from cg.models.fastq import FastqFileMeta from cg.store.models import Analysis, BedVersion, Case, CaseSample, Sample LOG = logging.getLogger(__name__) @@ -31,11 +32,12 @@ def add_gene_panel_combo(default_panels: set[str]) -> set[str]: """Add gene panels combinations for gene panels being part of gene panel combination and return updated gene panels.""" - all_panels = default_panels + additional_panels = set() for panel in default_panels: if panel in GenePanelCombo.COMBO_1: - all_panels |= GenePanelCombo.COMBO_1.get(panel) - return all_panels + additional_panels |= GenePanelCombo.COMBO_1.get(panel) + default_panels |= additional_panels + return default_panels class AnalysisAPI(MetaAPI): @@ -288,58 +290,59 @@ def get_cases_to_qc(self) -> list[Case]: if self.trailblazer_api.is_latest_analysis_qc(case_id=case.internal_id) ] - def get_sample_fastq_destination_dir(self, case: Case, sample: Sample): + def get_sample_fastq_destination_dir(self, case: Case, sample: Sample) -> Path: """Return the path to the FASTQ destination directory.""" raise NotImplementedError - def gather_file_metadata_for_sample(self, sample_obj: Sample) -> list[dict]: + def gather_file_metadata_for_sample(self, sample: Sample) -> list[FastqFileMeta]: return [ - self.fastq_handler.parse_file_data(file_obj.full_path) - for file_obj in self.housekeeper_api.files( - bundle=sample_obj.internal_id, tags=["fastq"] + self.fastq_handler.parse_file_data(hk_file.full_path) + for hk_file in self.housekeeper_api.files( + bundle=sample.internal_id, tags={SequencingFileTag.FASTQ} ) ] def link_fastq_files_for_sample( - self, case_obj: Case, sample_obj: Sample, concatenate: bool = False + self, case: Case, sample: Sample, concatenate: bool = False ) -> None: """ - Link FASTQ files for a sample to working directory. + Link FASTQ files for a sample to the work directory. If pipeline input requires concatenated fastq, files can also be concatenated """ - linked_reads_paths = {1: [], 2: []} - concatenated_paths = {1: "", 2: ""} - files: list[dict] = self.gather_file_metadata_for_sample(sample_obj=sample_obj) - sorted_files = sorted(files, key=lambda k: k["path"]) - fastq_dir = self.get_sample_fastq_destination_dir(case=case_obj, sample=sample_obj) + linked_reads_paths: dict[int, list[Path]] = {1: [], 2: []} + concatenated_paths: dict[int, str] = {1: "", 2: ""} + fastq_files_meta: list[FastqFileMeta] = self.gather_file_metadata_for_sample(sample=sample) + sorted_fastq_files_meta: list[FastqFileMeta] = sorted( + fastq_files_meta, key=lambda k: k.path + ) + fastq_dir: Path = self.get_sample_fastq_destination_dir(case=case, sample=sample) fastq_dir.mkdir(parents=True, exist_ok=True) - for fastq_data in sorted_files: - fastq_path = Path(fastq_data["path"]) - fastq_name = self.fastq_handler.create_fastq_name( - lane=fastq_data["lane"], - flowcell=fastq_data["flowcell"], - sample=sample_obj.internal_id, - read=fastq_data["read"], - undetermined=fastq_data["undetermined"], - meta=self.get_additional_naming_metadata(sample_obj), + for fastq_file in sorted_fastq_files_meta: + fastq_file_name: str = self.fastq_handler.create_fastq_name( + lane=fastq_file.lane, + flow_cell=fastq_file.flow_cell_id, + sample=sample.internal_id, + read_direction=fastq_file.read_direction, + undetermined=fastq_file.undetermined, + meta=self.get_lims_naming_metadata(sample), ) - destination_path: Path = fastq_dir / fastq_name - linked_reads_paths[fastq_data["read"]].append(destination_path) + destination_path = Path(fastq_dir, fastq_file_name) + linked_reads_paths[fastq_file.read_direction].append(destination_path) concatenated_paths[ - fastq_data["read"] - ] = f"{fastq_dir}/{self.fastq_handler.get_concatenated_name(fastq_name)}" + fastq_file.read_direction + ] = f"{fastq_dir}/{self.fastq_handler.get_concatenated_name(fastq_file_name)}" if not destination_path.exists(): - LOG.info(f"Linking: {fastq_path} -> {destination_path}") - destination_path.symlink_to(fastq_path) + LOG.info(f"Linking: {fastq_file.path} -> {destination_path}") + destination_path.symlink_to(fastq_file.path) else: LOG.warning(f"Destination path already exists: {destination_path}") if not concatenate: return - LOG.info("Concatenation in progress for sample %s.", sample_obj.internal_id) + LOG.info(f"Concatenation in progress for sample: {sample.internal_id}") for read, value in linked_reads_paths.items(): self.fastq_handler.concatenate(linked_reads_paths[read], concatenated_paths[read]) self.fastq_handler.remove_files(value) @@ -435,7 +438,7 @@ def get_date_from_file_path(file_path: Path) -> dt.datetime.date: """ return dt.datetime.fromtimestamp(int(os.path.getctime(file_path))) - def get_additional_naming_metadata(self, sample_obj: Sample) -> str | None: + def get_lims_naming_metadata(self, sample: Sample) -> str | None: return None def get_latest_metadata(self, case_id: str) -> AnalysisModel: diff --git a/cg/meta/workflow/balsamic.py b/cg/meta/workflow/balsamic.py index 03030875e6..d1af0f61f5 100644 --- a/cg/meta/workflow/balsamic.py +++ b/cg/meta/workflow/balsamic.py @@ -23,6 +23,7 @@ BalsamicWGSQCMetrics, ) from cg.models.cg_config import CGConfig +from cg.models.fastq import FastqFileMeta from cg.store.models import Case, CaseSample, Sample from cg.utils import Process from cg.utils.utils import build_command_from_dict, get_string_from_list_by_pattern @@ -146,22 +147,22 @@ def get_sample_fastq_destination_dir(self, case: Case, sample: Sample = None) -> return Path(self.get_case_path(case.internal_id), FileFormat.FASTQ) def link_fastq_files(self, case_id: str, dry_run: bool = False) -> None: - case_obj = self.status_db.get_case_by_internal_id(internal_id=case_id) - for link in case_obj.links: - self.link_fastq_files_for_sample( - case_obj=case_obj, sample_obj=link.sample, concatenate=True - ) + case = self.status_db.get_case_by_internal_id(internal_id=case_id) + for link in case.links: + self.link_fastq_files_for_sample(case=case, sample=link.sample, concatenate=True) def get_concatenated_fastq_path(self, link_object: CaseSample) -> Path: - """Returns path to the concatenated FASTQ file of a sample""" - file_collection: list[dict] = self.gather_file_metadata_for_sample(link_object.sample) + """Returns the path to the concatenated FASTQ file of a sample""" + file_collection: list[FastqFileMeta] = self.gather_file_metadata_for_sample( + link_object.sample + ) fastq_data = file_collection[0] linked_fastq_name = self.fastq_handler.create_fastq_name( - lane=fastq_data["lane"], - flowcell=fastq_data["flowcell"], + lane=fastq_data.lane, + flow_cell=fastq_data.flow_cell_id, sample=link_object.sample.internal_id, - read=fastq_data["read"], - undetermined=fastq_data["undetermined"], + read_direction=fastq_data.read_direction, + undetermined=fastq_data.undetermined, ) concatenated_fastq_name: str = self.fastq_handler.get_concatenated_name(linked_fastq_name) return Path( diff --git a/cg/meta/workflow/fastq.py b/cg/meta/workflow/fastq.py index 5a6055a032..77b4419a49 100644 --- a/cg/meta/workflow/fastq.py +++ b/cg/meta/workflow/fastq.py @@ -13,6 +13,10 @@ import shutil from pathlib import Path +from cg.constants import FileExtensions +from cg.io.gzip import read_gzip_first_line +from cg.models.fastq import FastqFileMeta, GetFastqFileMeta + LOG = logging.getLogger(__name__) DEFAULT_DATE_STR = ( @@ -23,6 +27,10 @@ ) +def _is_undetermined_in_path(file_path: Path) -> bool: + return "Undetermined" in file_path + + class FastqHandler: """Handles fastq file linking""" @@ -90,103 +98,56 @@ def get_concatenated_name(linked_fastq_name: str) -> str: return f"concatenated_{'_'.join(linked_fastq_name.split('_')[-4:])}" @staticmethod - def parse_header(line: str) -> dict: - """Generates a dict with parsed lanes, flowcells and read numbers - Handle illumina's two different header formats + def parse_fastq_header(line: str) -> FastqFileMeta | None: + """Parse and return fastq header metadata. + Handle Illumina's two different header formats @see https://en.wikipedia.org/wiki/FASTQ_format - - @HWUSI-EAS100R:6:73:941:1973#0/1 - - HWUSI-EAS100R the unique instrument name - 6 flowcell lane - 73 tile number within the flowcell lane - 941 'x'-coordinate of the cluster within the tile - 1973 'y'-coordinate of the cluster within the tile - #0 index number for a multiplexed sample (0 for no indexing) - /1 the member of a pair, /1 or /2 (paired-end or mate-pair reads only) - - Versions of the Illumina pipeline since 1.4 appear to use #NNNNNN - instead of #0 for the multiplex ID, where NNNNNN is the sequence of the - multiplex tag. - - With Casava 1.8 the format of the '@' line has changed: - - @EAS139:136:FC706VJ:2:2104:15343:197393 1:Y:18:ATCACG - - EAS139 the unique instrument name - 136 the run id - FC706VJ the flowcell id - 2 flowcell lane - 2104 tile number within the flowcell lane - 15343 'x'-coordinate of the cluster within the tile - 197393 'y'-coordinate of the cluster within the tile - 1 the member of a pair, 1 or 2 (paired-end or mate-pair reads only) - Y Y if the read is filtered, N otherwise - 18 0 when none of the control bits are on, otherwise it is an even number - ATCACG index sequence + Raise: + TypeError if unable to split line into expected parts. """ - - fastq_meta = {"lane": None, "flowcell": None, "readnumber": None} - parts = line.split(":") - if len(parts) == 5: # @HWUSI-EAS100R:6:73:941:1973#0/1 - fastq_meta["lane"] = parts[1] - fastq_meta["flowcell"] = "XXXXXX" - fastq_meta["readnumber"] = parts[-1].split("/")[-1] - if len(parts) == 10: # @EAS139:136:FC706VJ:2:2104:15343:197393 1:Y:18:ATCACG - fastq_meta["lane"] = parts[3] - fastq_meta["flowcell"] = parts[2] - fastq_meta["readnumber"] = parts[6].split(" ")[-1] - if len(parts) == 7: # @ST-E00201:173:HCLCGALXX:1:2106:22516:34834/1 - fastq_meta["lane"] = parts[3] - fastq_meta["flowcell"] = parts[2] - fastq_meta["readnumber"] = parts[-1].split("/")[-1] - - return fastq_meta + try: + return GetFastqFileMeta.header_format.get(len(parts))(parts=parts) + except TypeError as exception: + LOG.error(f"Could not parse header format for header: {line}") + raise exception @staticmethod - def parse_file_data(fastq_path: Path) -> dict: - with gzip.open(fastq_path) as handle: - header_line = handle.readline().decode() - header_info = FastqHandler.parse_header(header_line) - - data = { - "path": fastq_path, - "lane": int(header_info["lane"]), - "flowcell": header_info["flowcell"], - "read": int(header_info["readnumber"]), - "undetermined": ("Undetermined" in fastq_path), - } - matches = re.findall(r"-l[1-9]t([1-9]{2})_", str(fastq_path)) - if len(matches) > 0: - data["flowcell"] = f"{data['flowcell']}-{matches[0]}" - return data + def parse_file_data(fastq_path: Path) -> FastqFileMeta: + header_line: str = read_gzip_first_line(file_path=fastq_path) + fastq_file_meta: FastqFileMeta = FastqHandler.parse_fastq_header(header_line) + fastq_file_meta.path = fastq_path + fastq_file_meta.undetermined = _is_undetermined_in_path(fastq_path) + matches = re.findall(r"-l[1-9]t([1-9]{2})_", str(fastq_path)) + if len(matches) > 0: + fastq_file_meta.flow_cell_id = f"{fastq_file_meta.flow_cell_id}-{matches[0]}" + return fastq_file_meta @staticmethod def create_fastq_name( - lane: str, + lane: int, flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, meta: str | None = None, ) -> str: """Name a FASTQ file with standard conventions and - no naming constrains from pipeline.""" + no naming constrains from the pipeline.""" flow_cell: str = f"{flow_cell}-undetermined" if undetermined else flow_cell date: str = date if isinstance(date, str) else date.strftime("%y%m%d") - return f"{lane}_{date}_{flow_cell}_{sample}_{index}_{read}.fastq.gz" + return f"{lane}_{date}_{flow_cell}_{sample}_{index}_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" class BalsamicFastqHandler(FastqHandler): @staticmethod def create_fastq_name( - lane: str, - flowcell: str, + lane: int, + flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, @@ -194,36 +155,36 @@ def create_fastq_name( ) -> str: """Name a FASTQ file following Balsamic conventions. Naming must be xxx_R_1.fastq.gz and xxx_R_2.fastq.gz""" - flowcell = f"{flowcell}-undetermined" if undetermined else flowcell - date_str = date if isinstance(date, str) else date.strftime("%y%m%d") - return f"{lane}_{date_str}_{flowcell}_{sample}_{index}_R_{read}.fastq.gz" + flow_cell = f"{flow_cell}-undetermined" if undetermined else flow_cell + date: str = date if isinstance(date, str) else date.strftime("%y%m%d") + return f"{lane}_{date}_{flow_cell}_{sample}_{index}_R_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" class MipFastqHandler(FastqHandler): @staticmethod def create_fastq_name( - lane: str, - flowcell: str, + lane: int, + flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, meta: str | None = None, ) -> str: """Name a FASTQ file following MIP conventions.""" - flowcell = f"{flowcell}-undetermined" if undetermined else flowcell - date_str = date if isinstance(date, str) else date.strftime("%y%m%d") - return f"{lane}_{date_str}_{flowcell}_{sample}_{index}_{read}.fastq.gz" + flow_cell = f"{flow_cell}-undetermined" if undetermined else flow_cell + date: str = date if isinstance(date, str) else date.strftime("%y%m%d") + return f"{lane}_{date}_{flow_cell}_{sample}_{index}_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" class MicrosaltFastqHandler(FastqHandler): @staticmethod def create_fastq_name( - lane: str, - flowcell: str, + lane: int, + flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, @@ -231,19 +192,17 @@ def create_fastq_name( ) -> str: """Name a FASTQ file following usalt conventions. Naming must be xxx_R_1.fastq.gz and xxx_R_2.fastq.gz""" - # ACC1234A1_FCAB1ABC2_L1_1.fastq.gz sample_flowcell_lane_read.fastq.gz - - flowcell = f"{flowcell}-undetermined" if undetermined else flowcell - return f"{sample}_{flowcell}_L{lane}_{read}.fastq.gz" + flow_cell = f"{flow_cell}-undetermined" if undetermined else flow_cell + return f"{sample}_{flow_cell}_L{lane}_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" class MutantFastqHandler(FastqHandler): @staticmethod def create_fastq_name( - lane: str, - flowcell: str, + lane: int, + flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, @@ -251,9 +210,7 @@ def create_fastq_name( ) -> str: """Name a FASTQ file following mutant conventions. Naming must be xxx_R_1.fastq.gz and xxx_R_2.fastq.gz""" - # ACC1234A1_FCAB1ABC2_L1_1.fastq.gz sample_flowcell_lane_read.fastq.gz - - return f"{flowcell}_L{lane}_{meta}_{read}.fastq.gz" + return f"{flow_cell}_L{lane}_{meta}_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" @staticmethod def get_concatenated_name(linked_fastq_name: str) -> str: @@ -275,7 +232,7 @@ def create_nanopore_fastq_name( filenr: str, meta: str | None = None, ) -> str: - return f"{flowcell}_{sample}_{meta}_{filenr}.fastq.gz" + return f"{flowcell}_{sample}_{meta}_{filenr}{FileExtensions.FASTQ}{FileExtensions.GZIP}" @staticmethod def parse_nanopore_file_data(fastq_path: Path) -> dict: diff --git a/cg/meta/workflow/microsalt.py b/cg/meta/workflow/microsalt.py index e51283303f..3bd36e91fe 100644 --- a/cg/meta/workflow/microsalt.py +++ b/cg/meta/workflow/microsalt.py @@ -152,7 +152,7 @@ def link_fastq_files(self, case_id: str, sample_id: str | None, dry_run: bool = case_obj: Case = self.status_db.get_case_by_internal_id(internal_id=case_id) samples: list[Sample] = self.get_samples(case_id=case_id, sample_id=sample_id) for sample_obj in samples: - self.link_fastq_files_for_sample(case_obj=case_obj, sample_obj=sample_obj) + self.link_fastq_files_for_sample(case=case_obj, sample=sample_obj) def get_samples(self, case_id: str, sample_id: str | None = None) -> list[Sample]: """Returns a list of samples to configure diff --git a/cg/meta/workflow/mip.py b/cg/meta/workflow/mip.py index 7bb0589c5a..e6dc4993dc 100644 --- a/cg/meta/workflow/mip.py +++ b/cg/meta/workflow/mip.py @@ -143,12 +143,9 @@ def get_sample_fastq_destination_dir(self, case: Case, sample: Sample) -> Path: ) def link_fastq_files(self, case_id: str, dry_run: bool = False) -> None: - case_obj = self.status_db.get_case_by_internal_id(internal_id=case_id) - for link in case_obj.links: - self.link_fastq_files_for_sample( - case_obj=case_obj, - sample_obj=link.sample, - ) + case: Case = self.status_db.get_case_by_internal_id(internal_id=case_id) + for link in case.links: + self.link_fastq_files_for_sample(case=case, sample=link.sample) def write_panel(self, case_id: str, content: list[str]) -> None: """Write the gene panel to case dir.""" @@ -329,3 +326,6 @@ def get_pipeline_version(self, case_id: str) -> str: ) sample_info: MipBaseSampleInfo = MipBaseSampleInfo(**sample_info_raw) return sample_info.mip_version + + def write_managed_variants(self, case_id: str, content: list[str]) -> None: + self._write_managed_variants(out_dir=Path(self.root, case_id), content=content) diff --git a/cg/meta/workflow/mip_dna.py b/cg/meta/workflow/mip_dna.py index 5f89f2765f..acca32dfb8 100644 --- a/cg/meta/workflow/mip_dna.py +++ b/cg/meta/workflow/mip_dna.py @@ -73,6 +73,3 @@ def get_gene_panel(self, case_id: str) -> list[str]: def get_managed_variants(self) -> list[str]: """Create and return the managed variants.""" return self._get_managed_variants(genome_build=GENOME_BUILD_37) - - def write_managed_variants(self, case_id: str, content: list[str]) -> None: - self._write_managed_variants(out_dir=Path(self.root, case_id), content=content) diff --git a/cg/meta/workflow/mip_rna.py b/cg/meta/workflow/mip_rna.py index 22abbf6e76..90f938cb7e 100644 --- a/cg/meta/workflow/mip_rna.py +++ b/cg/meta/workflow/mip_rna.py @@ -56,3 +56,7 @@ def config_sample(self, link_obj, panel_bed: str | None = None) -> dict[str, str def get_gene_panel(self, case_id: str) -> list[str]: """Create and return the aggregated gene panel file.""" return self._get_gene_panel(case_id=case_id, genome_build=GENOME_BUILD_38) + + def get_managed_variants(self) -> list[str]: + """Create and return the managed variants.""" + return self._get_managed_variants(genome_build=GENOME_BUILD_38) diff --git a/cg/meta/workflow/mutant.py b/cg/meta/workflow/mutant.py index b60c99607c..7c99c30082 100644 --- a/cg/meta/workflow/mutant.py +++ b/cg/meta/workflow/mutant.py @@ -2,7 +2,7 @@ import shutil from pathlib import Path -from cg.constants import Pipeline +from cg.constants import Pipeline, SequencingFileTag from cg.constants.constants import FileFormat from cg.io.controller import WriteFile from cg.meta.workflow.analysis import AnalysisAPI @@ -81,16 +81,14 @@ def link_fastq_files(self, case_id: str, dry_run: bool = False) -> None: application = sample_obj.application_version.application if self._is_nanopore(application): self.link_nanopore_fastq_for_sample( - case_obj=case_obj, sample_obj=sample_obj, concatenate=True + case=case_obj, sample=sample_obj, concatenate=True ) continue if not sample_obj.sequencing_qc: LOG.info("Sample %s read count below threshold, skipping!", sample_obj.internal_id) continue else: - self.link_fastq_files_for_sample( - case_obj=case_obj, sample_obj=sample_obj, concatenate=True - ) + self.link_fastq_files_for_sample(case=case_obj, sample=sample_obj, concatenate=True) def get_sample_parameters(self, sample_obj: Sample) -> MutantSampleConfig: return MutantSampleConfig( @@ -140,16 +138,15 @@ def create_case_config(self, case_id: str, dry_run: bool) -> None: ) LOG.info("Saved config to %s", config_path) - def get_additional_naming_metadata(self, sample_obj: Sample) -> str | None: - sample_name = sample_obj.name + def get_lims_naming_metadata(self, sample: Sample) -> str | None: region_code = self.lims_api.get_sample_attribute( - lims_id=sample_obj.internal_id, key="region_code" + lims_id=sample.internal_id, key="region_code" ).split(" ")[0] lab_code = self.lims_api.get_sample_attribute( - lims_id=sample_obj.internal_id, key="lab_code" + lims_id=sample.internal_id, key="lab_code" ).split(" ")[0] - return f"{region_code}_{lab_code}_{sample_name}" + return f"{region_code}_{lab_code}_{sample.name}" def run_analysis(self, case_id: str, dry_run: bool, config_artic: str = None) -> None: if self.get_case_output_path(case_id=case_id).exists(): @@ -194,34 +191,34 @@ def get_cases_to_store(self) -> list[Case]: if Path(self.get_deliverables_file_path(case_id=case.internal_id)).exists() ] - def get_metadata_for_nanopore_sample(self, sample_obj: Sample) -> list[dict]: + def get_metadata_for_nanopore_sample(self, sample: Sample) -> list[dict]: return [ self.fastq_handler.parse_nanopore_file_data(file_obj.full_path) for file_obj in self.housekeeper_api.files( - bundle=sample_obj.internal_id, tags=["fastq"] + bundle=sample.internal_id, tags={SequencingFileTag.FASTQ} ) ] def link_nanopore_fastq_for_sample( - self, case_obj: Case, sample_obj: Sample, concatenate: bool = False + self, case: Case, sample: Sample, concatenate: bool = False ) -> None: """ Link FASTQ files for a nanopore sample to working directory. If pipeline input requires concatenated fastq, files can also be concatenated """ read_paths = [] - files: list[dict] = self.get_metadata_for_nanopore_sample(sample_obj=sample_obj) + files: list[dict] = self.get_metadata_for_nanopore_sample(sample=sample) sorted_files = sorted(files, key=lambda k: k["path"]) - fastq_dir = self.get_sample_fastq_destination_dir(case=case_obj, sample=sample_obj) + fastq_dir = self.get_sample_fastq_destination_dir(case=case, sample=sample) fastq_dir.mkdir(parents=True, exist_ok=True) for counter, fastq_data in enumerate(sorted_files): fastq_path = Path(fastq_data["path"]) fastq_name = self.fastq_handler.create_nanopore_fastq_name( flowcell=fastq_data["flowcell"], - sample=sample_obj.internal_id, + sample=sample.internal_id, filenr=str(counter), - meta=self.get_additional_naming_metadata(sample_obj), + meta=self.get_lims_naming_metadata(sample), ) destination_path: Path = fastq_dir / fastq_name read_paths.append(destination_path) @@ -237,13 +234,13 @@ def link_nanopore_fastq_for_sample( concatenated_fastq_name = self.fastq_handler.create_nanopore_fastq_name( flowcell=fastq_data["flowcell"], - sample=sample_obj.internal_id, + sample=sample.internal_id, filenr=str(1), - meta=self.get_additional_naming_metadata(sample_obj), + meta=self.get_lims_naming_metadata(sample), ) concatenated_path = ( f"{fastq_dir}/{self.fastq_handler.get_concatenated_name(concatenated_fastq_name)}" ) - LOG.info("Concatenation in progress for sample %s.", sample_obj.internal_id) + LOG.info(f"Concatenation in progress for sample {sample.internal_id}.") self.fastq_handler.concatenate(read_paths, concatenated_path) self.fastq_handler.remove_files(read_paths) diff --git a/cg/meta/workflow/nf_analysis.py b/cg/meta/workflow/nf_analysis.py index 0e8bba56f2..b679c77194 100644 --- a/cg/meta/workflow/nf_analysis.py +++ b/cg/meta/workflow/nf_analysis.py @@ -1,11 +1,8 @@ import logging -import operator from datetime import datetime from pathlib import Path from typing import Any -from cg.store.models import Sample - from cg.constants import Pipeline from cg.constants.constants import FileExtensions, FileFormat, WorkflowManager from cg.constants.nextflow import NFX_WORK_DIR @@ -15,8 +12,10 @@ from cg.meta.workflow.analysis import AnalysisAPI from cg.meta.workflow.nf_handlers import NextflowHandler, NfTowerHandler from cg.models.cg_config import CGConfig +from cg.models.fastq import FastqFileMeta from cg.models.nf_analysis import FileDeliverable, PipelineDeliverables from cg.models.rnafusion.rnafusion import CommandArgs +from cg.store.models import Sample from cg.utils import Process LOG = logging.getLogger(__name__) @@ -38,7 +37,7 @@ def __init__(self, config: CGConfig, pipeline: Pipeline): self.tower_pipeline: str | None = None self.account: str | None = None self.email: str | None = None - self.compute_env: str | None = None + self.compute_env_base: str | None = None self.revision: str | None = None self.nextflow_binary_path: str | None = None @@ -80,6 +79,10 @@ def get_sample_sheet_path(self, case_id: str) -> Path: FileExtensions.CSV ) + def get_compute_env(self, case_id: str) -> str: + """Get the compute environment for the head job based on the case priority.""" + return f"{self.compute_env_base}-{self.get_slurm_qos_for_case(case_id=case_id)}" + @staticmethod def get_nextflow_config_path(nextflow_config: str | None = None) -> Path | None: """Path to Nextflow config file.""" @@ -133,7 +136,7 @@ def get_workdir_path(self, case_id: str, work_dir: Path | None = None) -> Path: @staticmethod def extract_read_files( - metadata: list, forward_read: bool = False, reverse_read: bool = False + metadata: list[FastqFileMeta], forward_read: bool = False, reverse_read: bool = False ) -> list[str]: """Extract a list of fastq file paths for either forward or reverse reads.""" if forward_read and not reverse_read: @@ -142,8 +145,12 @@ def extract_read_files( read_direction = 2 else: raise ValueError("Either forward or reverse needs to be specified") - sorted_metadata: list = sorted(metadata, key=operator.itemgetter("path")) - return [d["path"] for d in sorted_metadata if d["read"] == read_direction] + sorted_metadata: list = sorted(metadata, key=lambda k: k.path) + return [ + fastq_file.path + for fastq_file in sorted_metadata + if fastq_file.read_direction == read_direction + ] def verify_sample_sheet_exists(self, case_id: str, dry_run: bool = False) -> None: """Raise an error if sample sheet file is not found.""" diff --git a/cg/meta/workflow/rnafusion.py b/cg/meta/workflow/rnafusion.py index efcccd8ca3..1014f62322 100644 --- a/cg/meta/workflow/rnafusion.py +++ b/cg/meta/workflow/rnafusion.py @@ -19,6 +19,7 @@ MetricsDeliverablesCondition, MultiqcDataJson, ) +from cg.models.fastq import FastqFileMeta from cg.models.nf_analysis import PipelineDeliverables from cg.models.rnafusion.rnafusion import ( RnafusionAnalysis, @@ -46,11 +47,11 @@ def __init__( self.profile: str = config.rnafusion.profile self.conda_env: str = config.rnafusion.conda_env self.conda_binary: str = config.rnafusion.conda_binary - self.tower_binary_path: str = config.rnafusion.tower_binary_path + self.tower_binary_path: str = config.tower_binary_path self.tower_pipeline: str = config.rnafusion.tower_pipeline self.account: str = config.rnafusion.slurm.account self.email: str = config.rnafusion.slurm.mail_user - self.compute_env: str = config.rnafusion.compute_env + self.compute_env_base: str = config.rnafusion.compute_env self.revision: str = config.rnafusion.revision self.nextflow_binary_path: str = config.rnafusion.binary_path @@ -72,7 +73,7 @@ def get_sample_sheet_content_per_sample( self, sample: Sample, case_id: str, strandedness: Strandedness ) -> list[list[str]]: """Get sample sheet content per sample.""" - sample_metadata: list[dict] = self.gather_file_metadata_for_sample(sample_obj=sample) + sample_metadata: list[FastqFileMeta] = self.gather_file_metadata_for_sample(sample=sample) fastq_forward_read_paths: list[str] = self.extract_read_files( metadata=sample_metadata, forward_read=True ) diff --git a/cg/meta/workflow/taxprofiler.py b/cg/meta/workflow/taxprofiler.py index 79d2c53d15..e12350e961 100644 --- a/cg/meta/workflow/taxprofiler.py +++ b/cg/meta/workflow/taxprofiler.py @@ -8,6 +8,7 @@ from cg.constants.sequencing import SequencingPlatform from cg.meta.workflow.nf_analysis import NfAnalysisAPI from cg.models.cg_config import CGConfig +from cg.models.fastq import FastqFileMeta from cg.models.taxprofiler.taxprofiler import ( TaxprofilerParameters, TaxprofilerSampleSheetEntry, @@ -35,18 +36,19 @@ def __init__( self.revision: str = config.taxprofiler.revision self.hostremoval_reference: Path = Path(config.taxprofiler.hostremoval_reference) self.databases: Path = Path(config.taxprofiler.databases) - self.tower_binary_path: str = config.taxprofiler.tower_binary_path + self.tower_binary_path: str = config.tower_binary_path self.tower_pipeline: str = config.taxprofiler.tower_pipeline self.account: str = config.taxprofiler.slurm.account self.email: str = config.taxprofiler.slurm.mail_user self.nextflow_binary_path: str = config.taxprofiler.binary_path + self.compute_env_base: str = config.taxprofiler.compute_env def get_sample_sheet_content_per_sample( self, sample: Sample, instrument_platform: SequencingPlatform.ILLUMINA, fasta: str = "" ) -> list[list[str]]: """Get sample sheet content per sample.""" sample_name: str = sample.name - sample_metadata: list[str] = self.gather_file_metadata_for_sample(sample) + sample_metadata: list[FastqFileMeta] = self.gather_file_metadata_for_sample(sample) fastq_forward_read_paths: list[str] = self.extract_read_files( metadata=sample_metadata, forward_read=True ) diff --git a/cg/models/cg_config.py b/cg/models/cg_config.py index abeccfd70d..c932a7f542 100644 --- a/cg/models/cg_config.py +++ b/cg/models/cg_config.py @@ -158,7 +158,6 @@ class RareDiseaseConfig(CommonAppConfig): revision: str root: str slurm: SlurmConfig - tower_binary_path: str tower_pipeline: str @@ -174,22 +173,21 @@ class RnafusionConfig(CommonAppConfig): launch_directory: str revision: str slurm: SlurmConfig - tower_binary_path: str tower_pipeline: str class TaxprofilerConfig(CommonAppConfig): - root: str binary_path: str + conda_binary: str | None = None conda_env: str - profile: str + compute_env: str + databases: str + hostremoval_reference: str pipeline_path: str + profile: str revision: str - conda_binary: str | None = None - hostremoval_reference: str - databases: str + root: str slurm: SlurmConfig - tower_binary_path: str tower_pipeline: str @@ -259,6 +257,7 @@ class CGConfig(BaseModel): environment: Literal["production", "stage"] = "stage" flow_cells_dir: str madeline_exe: str + tower_binary_path: str max_flowcells: int | None data_input: DataInput | None = None # Base APIs that always should exist diff --git a/cg/models/fastq.py b/cg/models/fastq.py new file mode 100644 index 0000000000..df3470f65d --- /dev/null +++ b/cg/models/fastq.py @@ -0,0 +1,38 @@ +from pathlib import Path +from typing import Callable + +from pydantic import BaseModel + + +class FastqFileMeta(BaseModel): + path: Path | None = None + lane: int + read_direction: int + undetermined: bool | None = None + flow_cell_id: str + + +def _get_header_meta_casava_five_parts(parts: list[str]) -> FastqFileMeta: + return FastqFileMeta( + lane=parts[1], flow_cell_id="XXXXXX", read_direction=parts[-1].split("/")[-1] + ) + + +def _get_header_meta_casava_ten_parts(parts: list[str]) -> FastqFileMeta: + return FastqFileMeta( + lane=parts[3], flow_cell_id=parts[2], read_direction=parts[6].split(" ")[-1] + ) + + +def _get_header_meta_casava_seven_parts(parts: list[str]) -> FastqFileMeta: + return FastqFileMeta( + lane=parts[3], flow_cell_id=parts[2], read_direction=parts[-1].split("/")[-1] + ) + + +class GetFastqFileMeta: + header_format: dict[int, Callable] = { + 5: _get_header_meta_casava_five_parts, + 7: _get_header_meta_casava_seven_parts, + 10: _get_header_meta_casava_ten_parts, + } diff --git a/poetry.lock b/poetry.lock index 87e99d63e1..718e3a6dd6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -844,13 +844,13 @@ files = [ [[package]] name = "housekeeper" -version = "4.10.14" +version = "4.10.16" description = "Housekeeper takes care of files" optional = false python-versions = "*" files = [ - {file = "housekeeper-4.10.14-py2.py3-none-any.whl", hash = "sha256:f292783e524483e526b861f3f345618080b8eca864549711c99b0959828bd4f9"}, - {file = "housekeeper-4.10.14.tar.gz", hash = "sha256:d155a37c4d352f895ab90c45f44d0e51fec6c76ae59c74e0211db6af434114c4"}, + {file = "housekeeper-4.10.16-py2.py3-none-any.whl", hash = "sha256:ef85d34f61df05065b0fa5a67ee8505ef5b84dd5f655cfe44909898fafeef5ee"}, + {file = "housekeeper-4.10.16.tar.gz", hash = "sha256:cbd7bced914408e8a557f7817ead45bc61d3cf63c8a402aafac0982643f8e55d"}, ] [package.dependencies] @@ -2382,4 +2382,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.13" -content-hash = "2f9e1bc07a1c8f49e9bc8a2c4672c07ca8f683a290f9fca9448f84c5f814d3e5" +content-hash = "d27cd4e4e346e271787d5fc98231ff2d561da8fd34fd6d5c11f0ec5fb3a64bd1" diff --git a/pyproject.toml b/pyproject.toml index 7798aec396..59b7855f5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "cg" -version = "54.2.0" +version = "54.4.4" description = "Clinical Genomics command center" authors = ["Clinical Genomics "] readme = "README.md" @@ -69,7 +69,7 @@ urllib3 = "*" # Apps genologics = "*" -housekeeper = ">=4.10.14" +housekeeper = ">=4.10.16" [tool.poetry.dev-dependencies] diff --git a/tests/apps/mip/conftest.py b/tests/apps/mip/conftest.py index 452b6f841f..79957db827 100644 --- a/tests/apps/mip/conftest.py +++ b/tests/apps/mip/conftest.py @@ -3,11 +3,13 @@ import pytest +from cg.constants import FileExtensions + def create_file(tmpdir, flowcell, lane, read, file_content): """actual file on disk""" - file_name = f"S1_FC000{flowcell}_L00{lane}_R_{read}.fastq.gz" + file_name = f"S1_FC000{flowcell}_L00{lane}_R_{read}{FileExtensions.FASTQ}{FileExtensions.GZIP}" file_path = tmpdir / file_name file_path.write(file_content) return file_path diff --git a/tests/conftest.py b/tests/conftest.py index ddba5c6ba4..9890fd9152 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -312,6 +312,7 @@ def base_config_dict() -> dict: return { "database": "sqlite:///", "madeline_exe": "path/to/madeline", + "tower_binary_path": "path/to/tower", "delivery_path": "path/to/delivery", "flow_cells_dir": "path/to/flow_cells", "demultiplexed_flow_cells_dir": "path/to/demultiplexed_flow_cells_dir", @@ -2411,6 +2412,7 @@ def context_config( "sender_password": "", }, "madeline_exe": "echo", + "tower_binary_path": Path("path", "to", "bin", "tw").as_posix(), "pon_path": str(cg_dir), "backup": { "pdc_archiving_directory": pdc_archiving_directory.dict(), @@ -2558,7 +2560,6 @@ def context_config( "account": "development", "mail_user": "test.email@scilifelab.se", }, - "tower_binary_path": Path("path", "to", "bin", "tw").as_posix(), "tower_pipeline": "raredisease", }, "rnafusion": { @@ -2576,7 +2577,6 @@ def context_config( "account": "development", "mail_user": "test.email@scilifelab.se", }, - "tower_binary_path": Path("path", "to", "bin", "tw").as_posix(), "tower_pipeline": "rnafusion", }, "pigz": {"binary_path": "/bin/pigz"}, @@ -2597,7 +2597,6 @@ def context_config( "account": "development", "mail_user": "taxprofiler.email@scilifelab.se", }, - "tower_binary_path": Path("path", "to", "bin", "tw").as_posix(), "tower_pipeline": "taxprofiler", }, "scout": { @@ -3698,3 +3697,14 @@ def raredisease_context( helpers.add_relationship(status_db, case=case_not_enough_reads, sample=sample_not_enough_reads) return cg_context + + +@pytest.fixture +def fastq_file_meta_raw(flow_cell_name: str) -> dict: + return { + "path": Path("a", f"file{FileExtensions.FASTQ}{FileExtensions.GZIP}"), + "lane": str(1), + "read_direction": str(2), + "flow_cell_id": flow_cell_name, + "undetermined": None, + } diff --git a/tests/fixtures/io/casava_five_parts.fastq.gz b/tests/fixtures/io/casava_five_parts.fastq.gz new file mode 100644 index 0000000000..7b3d02ece8 Binary files /dev/null and b/tests/fixtures/io/casava_five_parts.fastq.gz differ diff --git a/tests/fixtures/io/casava_seven_parts.fastq.gz b/tests/fixtures/io/casava_seven_parts.fastq.gz new file mode 100644 index 0000000000..c7911dccee Binary files /dev/null and b/tests/fixtures/io/casava_seven_parts.fastq.gz differ diff --git a/tests/fixtures/io/casava_ten_parts.fastq.gz b/tests/fixtures/io/casava_ten_parts.fastq.gz new file mode 100644 index 0000000000..2d5e03ccc9 Binary files /dev/null and b/tests/fixtures/io/casava_ten_parts.fastq.gz differ diff --git a/tests/fixtures/io/example.gz b/tests/fixtures/io/example.gz new file mode 100644 index 0000000000..8237ec5a51 Binary files /dev/null and b/tests/fixtures/io/example.gz differ diff --git a/tests/io/conftest.py b/tests/io/conftest.py index ed88ef37e6..209bf0bfd1 100644 --- a/tests/io/conftest.py +++ b/tests/io/conftest.py @@ -106,3 +106,9 @@ def xml_file_path(fixtures_dir: Path) -> Path: def xml_temp_path(cg_dir: Path) -> Path: """Return a temp file path to use when writing xml.""" return Path(cg_dir, "write_xml.xml") + + +@pytest.fixture +def gzip_file_path(fixtures_dir: Path) -> Path: + """Return a file path to example Gzip file.""" + return Path(fixtures_dir, "io", "example.gz") diff --git a/tests/io/test_io_gzip.py b/tests/io/test_io_gzip.py new file mode 100644 index 0000000000..bcc24e88e6 --- /dev/null +++ b/tests/io/test_io_gzip.py @@ -0,0 +1,19 @@ +from pathlib import Path + +from cg.io.gzip import read_gzip_first_line + + +def test_read_gzip_first_line(gzip_file_path: Path): + """ + Test reading first line from a gzip file into a string. + """ + # GIVEN a gzip file + + # WHEN reading the file + line: str = read_gzip_first_line(file_path=gzip_file_path) + + # THEN assert a str is returned + assert isinstance(line, str) + + # THEN the content should match the expected line + assert line == "- ipsum, sit, amet" diff --git a/tests/meta/archive/conftest.py b/tests/meta/archive/conftest.py index e56ed4f802..dc260eb6ee 100644 --- a/tests/meta/archive/conftest.py +++ b/tests/meta/archive/conftest.py @@ -1,3 +1,4 @@ +import http from datetime import datetime, timedelta from pathlib import Path from typing import Any @@ -81,7 +82,7 @@ def retrieve_request_json( """Returns the body for a retrieval http post towards the DDN Miria API.""" return { "osType": "Unix/MacOS", - "createFolder": True, + "createFolder": False, "pathInfo": [ { "destination": local_storage_repository @@ -362,7 +363,7 @@ def archive_context( file: File = real_housekeeper_api.add_file( path=path_to_spring_file_with_ongoing_archival, version_obj=bundle.versions[0], - tags=[SequencingFileTag.SPRING], + tags=[SequencingFileTag.SPRING, ArchiveLocations.KAROLINSKA_BUCKET], ) file.id = 1234 real_housekeeper_api.add_archives(files=[file], archive_task_id=archival_job_id) @@ -378,3 +379,22 @@ def path_to_spring_file_to_archive() -> str: @pytest.fixture def path_to_spring_file_with_ongoing_archival() -> str: return "/home/path/to/ongoing/spring/file.spring" + + +@pytest.fixture +def failed_response() -> Response: + response = Response() + response.status_code = http.HTTPStatus.FORBIDDEN + return response + + +@pytest.fixture +def failed_delete_file_response(failed_response: Response) -> Response: + failed_response._content = b'{"detail":"Given token not valid for any token type","code":"token_not_valid","messages":[{"tokenClass":"AccessToken","tokenType":"access","message":"Token is invalid or expired"}]}' + return failed_response + + +@pytest.fixture +def ok_delete_file_response(ok_response: Response) -> Response: + ok_response._content = b'{"message":"Object has been deleted"}' + return ok_response diff --git a/tests/meta/archive/test_archive_api.py b/tests/meta/archive/test_archive_api.py index 82be2e59c8..8defc705ba 100644 --- a/tests/meta/archive/test_archive_api.py +++ b/tests/meta/archive/test_archive_api.py @@ -2,6 +2,7 @@ import pytest from housekeeper.store.models import File +from requests import HTTPError, Response from cg.constants.archiving import ArchiveLocations from cg.constants.constants import APIMethods @@ -9,6 +10,8 @@ from cg.io.controller import APIRequest from cg.meta.archive.archive import ARCHIVE_HANDLERS, FileAndSample, SpringArchiveAPI from cg.meta.archive.ddn_dataflow import ( + FAILED_JOB_STATUSES, + ONGOING_JOB_STATUSES, AuthToken, DDNDataFlowClient, GetJobStatusPayload, @@ -202,7 +205,11 @@ def test_archive_all_non_archived_spring_files( @pytest.mark.parametrize( "job_status, should_date_be_set", - [(JobStatus.COMPLETED, True), (JobStatus.RUNNING, False)], + [ + (JobStatus.COMPLETED, True), + (ONGOING_JOB_STATUSES[0], False), + (FAILED_JOB_STATUSES[0], False), + ], ) def test_get_archival_status( spring_archive_api: SpringArchiveAPI, @@ -240,12 +247,19 @@ def test_get_archival_status( ) # THEN The Archive entry should have been updated - assert bool(file.archive.archived_at) == should_date_be_set + if job_status == FAILED_JOB_STATUSES[0]: + assert not file.archive + else: + assert bool(file.archive.archived_at) == should_date_be_set @pytest.mark.parametrize( "job_status, should_date_be_set", - [(JobStatus.COMPLETED, True), (JobStatus.RUNNING, False)], + [ + (JobStatus.COMPLETED, True), + (ONGOING_JOB_STATUSES[0], False), + (FAILED_JOB_STATUSES[0], False), + ], ) def test_get_retrieval_status( spring_archive_api: SpringArchiveAPI, @@ -259,6 +273,9 @@ def test_get_retrieval_status( job_status, should_date_be_set, ): + """Tests that the three different categories of retrieval statuses we have identified, + i.e. failed, ongoing and successful, are handled correctly.""" + # GIVEN a file with an ongoing archival file: File = spring_archive_api.housekeeper_api.files().first() spring_archive_api.housekeeper_api.add_archives(files=[file], archive_task_id=archival_job_id) @@ -266,7 +283,7 @@ def test_get_retrieval_status( file_id=file.id, retrieval_task_id=retrieval_job_id ) - # WHEN querying the task id and getting a "COMPLETED" response + # WHEN querying the task id with mock.patch.object( AuthToken, "model_validate", @@ -287,7 +304,10 @@ def test_get_retrieval_status( ) # THEN The Archive entry should have been updated - assert bool(file.archive.retrieved_at) == should_date_be_set + if job_status == FAILED_JOB_STATUSES[0]: + assert not file.archive.retrieval_task_id + else: + assert bool(file.archive.retrieved_at) == should_date_be_set def test_retrieve_samples( @@ -345,3 +365,86 @@ def test_retrieve_samples( # THEN the Archive entry should have a retrieval task id set for file in files: assert file.archive.retrieval_task_id + + +def test_delete_file_raises_http_error( + spring_archive_api: SpringArchiveAPI, + failed_delete_file_response: Response, + test_auth_token: AuthToken, + archival_job_id: int, +): + """Tests that an HTTP error is raised when the Miria response is unsuccessful for a delete file request, + and that the file is not removed from Housekeeper.""" + + # GIVEN a spring file which is archived via Miria + spring_file: File = spring_archive_api.housekeeper_api.files( + tags={SequencingFileTag.SPRING, ArchiveLocations.KAROLINSKA_BUCKET} + ).first() + spring_file_path: str = spring_file.path + if not spring_file.archive: + spring_archive_api.housekeeper_api.add_archives( + files=[spring_file], archive_task_id=archival_job_id + ) + spring_archive_api.housekeeper_api.set_archive_archived_at( + file_id=spring_file.id, archiving_task_id=archival_job_id + ) + + # GIVEN that the request returns a failed response + with mock.patch.object( + DDNDataFlowClient, + "_get_auth_token", + return_value=test_auth_token, + ), mock.patch.object( + APIRequest, + "api_request_from_content", + return_value=failed_delete_file_response, + ), pytest.raises( + HTTPError + ): + # WHEN trying to delete the file via Miria and in Housekeeper + + # THEN an HTTPError should be raised + spring_archive_api.delete_file(file_path=spring_file.path) + + # THEN the file should still be in Housekeeper + assert spring_archive_api.housekeeper_api.files(path=spring_file_path) + + +def test_delete_file_success( + spring_archive_api: SpringArchiveAPI, + ok_delete_file_response: Response, + test_auth_token: AuthToken, + archival_job_id: int, +): + """Tests that given a successful response from Miria, the file is deleted and removed from Housekeeper.""" + + # GIVEN a spring file which is archived via Miria + spring_file: File = spring_archive_api.housekeeper_api.files( + tags={SequencingFileTag.SPRING, ArchiveLocations.KAROLINSKA_BUCKET} + ).first() + spring_file_id: int = spring_file.id + if not spring_file.archive: + spring_archive_api.housekeeper_api.add_archives( + files=[spring_file], archive_task_id=archival_job_id + ) + spring_archive_api.housekeeper_api.set_archive_archived_at( + file_id=spring_file.id, archiving_task_id=archival_job_id + ) + + # GIVEN that the delete request returns a successful response + with mock.patch.object( + DDNDataFlowClient, + "_get_auth_token", + return_value=test_auth_token, + ), mock.patch.object( + APIRequest, + "api_request_from_content", + return_value=ok_delete_file_response, + ): + # WHEN trying to delete the file via Miria and in Housekeeper + + # THEN no error is raised + spring_archive_api.delete_file(file_path=spring_file.path) + + # THEN the file is removed from Housekeeper + assert not spring_archive_api.housekeeper_api.get_file(spring_file_id) diff --git a/tests/meta/archive/test_archive_cli.py b/tests/meta/archive/test_archive_cli.py index bda5322c66..8f78d4336a 100644 --- a/tests/meta/archive/test_archive_cli.py +++ b/tests/meta/archive/test_archive_cli.py @@ -4,15 +4,17 @@ import pytest from click.testing import CliRunner from housekeeper.store.models import Archive, File -from requests import Response +from requests import HTTPError, Response -from cg.cli.archive import archive_spring_files, update_job_statuses -from cg.constants import EXIT_SUCCESS +from cg.cli.archive import archive_spring_files, delete_file, update_job_statuses +from cg.constants import EXIT_SUCCESS, SequencingFileTag +from cg.constants.archiving import ArchiveLocations from cg.io.controller import APIRequest from cg.meta.archive.ddn_dataflow import ( FAILED_JOB_STATUSES, ONGOING_JOB_STATUSES, AuthToken, + DDNDataFlowClient, GetJobStatusPayload, GetJobStatusResponse, JobStatus, @@ -104,6 +106,7 @@ def test_get_archival_job_status( # GIVEN an archive entry with an ongoing archival assert len(archive_context.housekeeper_api.get_archive_entries()) == 1 + assert not archive_context.housekeeper_api.get_archive_entries()[0].archived_at # WHEN invoking update_job_statuses with mock.patch.object( @@ -195,3 +198,92 @@ def test_get_retrieval_job_status( assert not archive_context.housekeeper_api.get_archive_entries( retrieval_task_id=retrieval_job_id )[0].retrieved_at + + +def test_delete_file_raises_http_error( + cli_runner: CliRunner, + archive_context: CGConfig, + failed_delete_file_response: Response, + test_auth_token: AuthToken, + archival_job_id: int, +): + """Tests that an HTTP error is raised when the Miria response is unsuccessful for a delete file request, + and that the file is not removed from Housekeeper.""" + + # GIVEN a spring file which is archived via Miria + spring_file: File = archive_context.housekeeper_api.files( + tags={SequencingFileTag.SPRING, ArchiveLocations.KAROLINSKA_BUCKET} + ).first() + spring_file_path: str = spring_file.path + if not spring_file.archive: + archive_context.housekeeper_api.add_archives( + files=[spring_file], archive_task_id=archival_job_id + ) + archive_context.housekeeper_api.set_archive_archived_at( + file_id=spring_file.id, archiving_task_id=archival_job_id + ) + + # GIVEN that the request returns a failed response + with mock.patch.object( + DDNDataFlowClient, + "_get_auth_token", + return_value=test_auth_token, + ), mock.patch.object( + APIRequest, + "api_request_from_content", + return_value=failed_delete_file_response, + ), pytest.raises( + HTTPError + ): + # WHEN trying to delete the file via Miria and in Housekeeper + + # THEN an HTTPError should be raised + cli_runner.invoke( + delete_file, [spring_file.path], obj=archive_context, catch_exceptions=False + ) + + # THEN the file should still be in Housekeeper + assert archive_context.housekeeper_api.files(path=spring_file_path) + + +def test_delete_file_success( + cli_runner: CliRunner, + archive_context: CGConfig, + ok_delete_file_response: Response, + test_auth_token: AuthToken, + archival_job_id: int, +): + """Tests that given a successful response from Miria, the file is deleted and removed from Housekeeper.""" + + # GIVEN a spring file which is archived via Miria + spring_file: File = archive_context.housekeeper_api.files( + tags={SequencingFileTag.SPRING, ArchiveLocations.KAROLINSKA_BUCKET} + ).first() + spring_file_id: int = spring_file.id + if not spring_file.archive: + archive_context.housekeeper_api.add_archives( + files=[spring_file], archive_task_id=archival_job_id + ) + archive_context.housekeeper_api.set_archive_archived_at( + file_id=spring_file.id, archiving_task_id=archival_job_id + ) + + # GIVEN that the delete request returns a successful response + with mock.patch.object( + DDNDataFlowClient, + "_get_auth_token", + return_value=test_auth_token, + ), mock.patch.object( + APIRequest, + "api_request_from_content", + return_value=ok_delete_file_response, + ): + # WHEN trying to delete the file via Miria and in Housekeeper + + # THEN no error is raised + cli_runner.invoke( + delete_file, [spring_file.path], obj=archive_context, catch_exceptions=False + ) + + # THEN the file is removed from Housekeeper + assert not archive_context.housekeeper_api.get_file(spring_file_id) diff --git a/tests/meta/archive/test_archiving.py b/tests/meta/archive/test_archiving.py index 634339b9cf..4a1b0ddb94 100644 --- a/tests/meta/archive/test_archiving.py +++ b/tests/meta/archive/test_archiving.py @@ -349,7 +349,7 @@ def test_retrieve_samples( } ], "osType": OSTYPE, - "createFolder": True, + "createFolder": False, "metadataList": [], "settings": [], }, diff --git a/tests/meta/workflow/test_analysis.py b/tests/meta/workflow/test_analysis.py index 15dd3ea530..d7ac852bba 100644 --- a/tests/meta/workflow/test_analysis.py +++ b/tests/meta/workflow/test_analysis.py @@ -1,5 +1,5 @@ """Test for analysis""" - +import logging from datetime import datetime import mock @@ -13,8 +13,9 @@ from cg.meta.workflow.mip import MipAnalysisAPI from cg.meta.workflow.mip_dna import MipDNAAnalysisAPI from cg.meta.workflow.prepare_fastq import PrepareFastqAPI +from cg.models.fastq import FastqFileMeta from cg.store import Store -from cg.store.models import Case +from cg.store.models import Case, Sample @pytest.mark.parametrize( @@ -390,3 +391,29 @@ def test_prepare_fastq_files_decompression_running( # WHEN running prepare_fastq_files # THEN an AnalysisNotReadyError should be thrown mip_analysis_api.prepare_fastq_files(case_id=case.internal_id, dry_run=False) + + +def test_link_fastq_files_for_sample( + analysis_store: Store, + caplog, + mip_analysis_api: MipDNAAnalysisAPI, + fastq_file_meta_raw: dict, + mocker, +): + caplog.set_level(logging.INFO) + # GIVEN a case + case: Case = analysis_store.get_cases()[0] + + # GIVEN a sample + sample: Sample = case.links[0].sample + + with mocker.patch.object( + AnalysisAPI, + "gather_file_metadata_for_sample", + return_value=[FastqFileMeta.model_validate(fastq_file_meta_raw)], + ): + # WHEN parsing header + mip_analysis_api.link_fastq_files_for_sample(case=case, sample=sample) + + # THEN broadcast linking of files + assert "Linking: " in caplog.text diff --git a/tests/meta/workflow/test_fastq.py b/tests/meta/workflow/test_fastq.py new file mode 100644 index 0000000000..359a42af08 --- /dev/null +++ b/tests/meta/workflow/test_fastq.py @@ -0,0 +1,96 @@ +from pathlib import Path + +import pytest + +from cg.meta.workflow.fastq import FastqHandler +from cg.models.fastq import FastqFileMeta + + +@pytest.mark.parametrize( + "fastq_header, expected_header_meta", + [ + ( + "@HWUSI-EAS100R:6:73:941:1973#0/1", + FastqFileMeta(lane=6, flow_cell_id="XXXXXX", read_direction=1), + ), + ( + "@EAS139:136:FC706VJ:2:2104:15343:197393 1:Y:18:ATCACG", + FastqFileMeta(lane=2, flow_cell_id="FC706VJ", read_direction=1), + ), + ( + "@ST-E00201:173:HCLCGALXX:1:2106:22516:34834/1", + FastqFileMeta(lane=1, flow_cell_id="HCLCGALXX", read_direction=1), + ), + ], +) +def test_parse_fastq_header(fastq_header: str, expected_header_meta: dict): + # GIVEN a FASTQ header + + # WHEN parsing header + header_meta: FastqFileMeta = FastqHandler.parse_fastq_header(line=fastq_header) + + # THEN header meta should match the expected header information + assert header_meta == expected_header_meta + + +@pytest.mark.parametrize( + "fastq_header, expected_error", + [ + ("no match", TypeError), + ("no:match", TypeError), + ("", TypeError), + ], +) +def test_parse_fastq_header_when_no_match(fastq_header: str, expected_error): + # GIVEN no matching FASTQ header + + with pytest.raises(expected_error): + # WHEN parsing header + # THEN raise error + FastqHandler.parse_fastq_header(line=fastq_header) + + +@pytest.mark.parametrize( + "fastq_path, expected_fastq_meta", + [ + ( + Path("tests", "fixtures", "io", "casava_five_parts.fastq.gz"), + FastqFileMeta( + path=Path("tests", "fixtures", "io", "casava_five_parts.fastq.gz"), + lane=6, + flow_cell_id="XXXXXX", + read_direction=1, + undetermined=None, + ), + ), + ( + Path("tests", "fixtures", "io", "casava_ten_parts.fastq.gz"), + FastqFileMeta( + path=Path("tests", "fixtures", "io", "casava_ten_parts.fastq.gz"), + lane=2, + flow_cell_id="FC706VJ", + read_direction=1, + undetermined=None, + ), + ), + ( + Path("tests", "fixtures", "io", "casava_seven_parts.fastq.gz"), + FastqFileMeta( + path=Path("tests", "fixtures", "io", "casava_seven_parts.fastq.gz"), + lane=1, + flow_cell_id="HCLCGALXX", + read_direction=1, + undetermined=None, + ), + ), + ], +) +def test_parse_file_data(fastq_path: Path, expected_fastq_meta: dict, mocker): + # GIVEN a FASTQ file + + with mocker.patch("cg.meta.workflow.fastq._is_undetermined_in_path", return_value=None): + # WHEN parsing header + header_meta = FastqHandler.parse_file_data(fastq_path=fastq_path) + + # THEN header meta should match the expected header information + assert header_meta == expected_fastq_meta diff --git a/tests/models/test_fastq.py b/tests/models/test_fastq.py new file mode 100644 index 0000000000..7da9d8db47 --- /dev/null +++ b/tests/models/test_fastq.py @@ -0,0 +1,23 @@ +from cg.models.fastq import FastqFileMeta + + +def test_instantiate_fastq_file_meta(fastq_file_meta_raw: dict): + # GIVEN a dictionary with fastq info + + # WHEN instantiating object + fast_file_meta = FastqFileMeta.model_validate(fastq_file_meta_raw) + + # THEN assert that it was successfully created + assert isinstance(fast_file_meta, FastqFileMeta) + + +def test_fastq_file_mets_convert_to_int(fastq_file_meta_raw: dict): + # GIVEN a dictionary with fastq info + + # WHEN instantiating object + fast_file_meta = FastqFileMeta.model_validate(fastq_file_meta_raw) + + # THEN assert that str is converted to int + assert isinstance(fast_file_meta.lane, int) + + assert isinstance(fast_file_meta.read_direction, int)