diff --git a/.bumpversion.cfg b/.bumpversion.cfg index d6b3c5264a..bb09d292e3 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 54.4.1 +current_version = 54.4.6 commit = True tag = True tag_name = v{new_version} diff --git a/cg/__init__.py b/cg/__init__.py index 0f5a958688..7c35c73ed2 100644 --- a/cg/__init__.py +++ b/cg/__init__.py @@ -1,2 +1,2 @@ __title__ = "cg" -__version__ = "54.4.1" +__version__ = "54.4.6" diff --git a/cg/constants/cgstats.py b/cg/constants/cgstats.py deleted file mode 100644 index c0c2184bc5..0000000000 --- a/cg/constants/cgstats.py +++ /dev/null @@ -1,11 +0,0 @@ -STATS_HEADER: list[str] = [ - "sample", - "Flowcell", - "Lanes", - "readcounts/lane", - "sum_readcounts", - "yieldMB/lane", - "sum_yield", - "%Q30", - "MeanQscore", -] diff --git a/cg/io/gzip.py b/cg/io/gzip.py new file mode 100644 index 0000000000..58f641c78d --- /dev/null +++ b/cg/io/gzip.py @@ -0,0 +1,8 @@ +import gzip +from pathlib import Path + + +def read_gzip_first_line(file_path: Path) -> str: + """Return first line of gzip file.""" + with gzip.open(file_path) as file: + return file.readline().decode() diff --git a/cg/meta/archive/archive.py b/cg/meta/archive/archive.py index 2f76068b75..4e95b39e63 100644 --- a/cg/meta/archive/archive.py +++ b/cg/meta/archive/archive.py @@ -9,7 +9,7 @@ from cg.constants import SequencingFileTag from cg.constants.archiving import ArchiveLocations from cg.exc import ArchiveJobFailedError -from cg.meta.archive.ddn_dataflow import DDNDataFlowClient +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination from cg.models.cg_config import DataFlowConfig from cg.store import Store diff --git a/cg/meta/archive/ddn/__init__.py b/cg/meta/archive/ddn/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/cg/meta/archive/ddn/constants.py b/cg/meta/archive/ddn/constants.py new file mode 100644 index 0000000000..2267622286 --- /dev/null +++ b/cg/meta/archive/ddn/constants.py @@ -0,0 +1,51 @@ +from enum import StrEnum + +OSTYPE: str = "Unix/MacOS" +ROOT_TO_TRIM: str = "/home" + +DESTINATION_ATTRIBUTE: str = "destination" +SOURCE_ATTRIBUTE: str = "source" + + +class DataflowEndpoints(StrEnum): + """Enum containing all DDN dataflow endpoints used.""" + + ARCHIVE_FILES = "files/archive" + DELETE_FILE = "files/delete" + GET_AUTH_TOKEN = "auth/token" + REFRESH_AUTH_TOKEN = "auth/token/refresh" + RETRIEVE_FILES = "files/retrieve" + GET_JOB_STATUS = "activity/jobs/" + + +class JobStatus(StrEnum): + """Enum for the different job statuses which can be returned via Miria.""" + + CANCELED = "Canceled" + COMPLETED = "Completed" + DENIED = "Denied" + CREATION_IN_PROGRESS = "Creation in progress" + IN_QUEUE = "In Queue" + INVALID_LICENSE = "Invalid license" + ON_VALIDATION = "On validation" + REFUSED = "Refused" + RUNNING = "Running" + SUSPENDED = "Suspended" + TERMINATED_ON_ERROR = "Terminated on error" + TERMINATED_ON_WARNING = "Terminated on warning" + + +FAILED_JOB_STATUSES: list[str] = [ + JobStatus.CANCELED, + JobStatus.DENIED, + JobStatus.INVALID_LICENSE, + JobStatus.REFUSED, + JobStatus.TERMINATED_ON_ERROR, + JobStatus.TERMINATED_ON_WARNING, +] +ONGOING_JOB_STATUSES: list[str] = [ + JobStatus.CREATION_IN_PROGRESS, + JobStatus.IN_QUEUE, + JobStatus.ON_VALIDATION, + JobStatus.RUNNING, +] diff --git a/cg/meta/archive/ddn_dataflow.py b/cg/meta/archive/ddn/ddn_data_flow_client.py similarity index 50% rename from cg/meta/archive/ddn_dataflow.py rename to cg/meta/archive/ddn/ddn_data_flow_client.py index 756a806f5d..a36dbb8303 100644 --- a/cg/meta/archive/ddn_dataflow.py +++ b/cg/meta/archive/ddn/ddn_data_flow_client.py @@ -1,265 +1,37 @@ """Module for archiving and retrieving folders via DDN Dataflow.""" import logging from datetime import datetime -from enum import StrEnum from pathlib import Path from urllib.parse import urljoin from housekeeper.store.models import File -from pydantic import BaseModel, Field -from requests.models import Response +from requests import Response from cg.constants.constants import APIMethods from cg.exc import ArchiveJobFailedError, DdnDataflowAuthenticationError from cg.io.controller import APIRequest -from cg.meta.archive.models import ( - ArchiveHandler, - FileAndSample, - FileTransferData, - SampleAndDestination, +from cg.meta.archive.ddn.constants import ( + DESTINATION_ATTRIBUTE, + FAILED_JOB_STATUSES, + SOURCE_ATTRIBUTE, + DataflowEndpoints, + JobStatus, ) +from cg.meta.archive.ddn.models import ( + AuthPayload, + AuthToken, + DeleteFilePayload, + GetJobStatusPayload, + GetJobStatusResponse, + MiriaObject, + RefreshPayload, + TransferPayload, +) +from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination from cg.models.cg_config import DataFlowConfig -from cg.store.models import Sample LOG = logging.getLogger(__name__) -OSTYPE: str = "Unix/MacOS" -ROOT_TO_TRIM: str = "/home" - -DESTINATION_ATTRIBUTE: str = "destination" -SOURCE_ATTRIBUTE: str = "source" - - -def get_request_log(headers: dict, body: dict): - return "Sending request with headers: \n" + f"{headers} \n" + "and body: \n" + f"{body}" - - -class DataflowEndpoints(StrEnum): - """Enum containing all DDN dataflow endpoints used.""" - - ARCHIVE_FILES = "files/archive" - DELETE_FILE = "files/delete" - GET_AUTH_TOKEN = "auth/token" - REFRESH_AUTH_TOKEN = "auth/token/refresh" - RETRIEVE_FILES = "files/retrieve" - GET_JOB_STATUS = "activity/jobs/" - - -class JobStatus(StrEnum): - """Enum for the different job statuses which can be returned via Miria.""" - - CANCELED = "Canceled" - COMPLETED = "Completed" - DENIED = "Denied" - CREATION_IN_PROGRESS = "Creation in progress" - IN_QUEUE = "In Queue" - INVALID_LICENSE = "Invalid license" - ON_VALIDATION = "On validation" - REFUSED = "Refused" - RUNNING = "Running" - SUSPENDED = "Suspended" - TERMINATED_ON_ERROR = "Terminated on error" - TERMINATED_ON_WARNING = "Terminated on warning" - - -FAILED_JOB_STATUSES: list[str] = [ - JobStatus.CANCELED, - JobStatus.DENIED, - JobStatus.INVALID_LICENSE, - JobStatus.REFUSED, - JobStatus.TERMINATED_ON_ERROR, - JobStatus.TERMINATED_ON_WARNING, -] -ONGOING_JOB_STATUSES: list[str] = [ - JobStatus.CREATION_IN_PROGRESS, - JobStatus.IN_QUEUE, - JobStatus.ON_VALIDATION, - JobStatus.RUNNING, -] - - -class MiriaObject(FileTransferData): - """Model for representing a singular object transfer.""" - - _metadata = None - destination: str - source: str - - @classmethod - def create_from_file_and_sample( - cls, file: File, sample: Sample, is_archiving: bool = True - ) -> "MiriaObject": - """Instantiates the class from a File and Sample object.""" - if is_archiving: - return cls(destination=sample.internal_id, source=file.full_path) - return cls(destination=file.full_path, source=sample.internal_id) - - @classmethod - def create_from_sample_and_destination( - cls, sample_and_destination: SampleAndDestination - ) -> "MiriaObject": - """Instantiates the class from a SampleAndDestination object, - i.e. when we want to fetch a folder containing all spring files for said sample.""" - return cls( - destination=sample_and_destination.destination, - source=sample_and_destination.sample.internal_id, - ) - - def trim_path(self, attribute_to_trim: str): - """Trims the given attribute (source or destination) from its root directory.""" - setattr( - self, - attribute_to_trim, - f"/{Path(getattr(self, attribute_to_trim)).relative_to(ROOT_TO_TRIM)}", - ) - - def add_repositories(self, source_prefix: str, destination_prefix: str): - """Prepends the given repositories to the source and destination paths.""" - self.source: str = source_prefix + self.source - self.destination: str = destination_prefix + self.destination - - -class TransferPayload(BaseModel): - """Model for representing a Dataflow transfer task.""" - - files_to_transfer: list[MiriaObject] - osType: str = OSTYPE - createFolder: bool = True - settings: list[dict] = [] - - def trim_paths(self, attribute_to_trim: str): - """Trims the source path from its root directory for all objects in the transfer.""" - for miria_file in self.files_to_transfer: - miria_file.trim_path(attribute_to_trim=attribute_to_trim) - - def add_repositories(self, source_prefix: str, destination_prefix: str): - """Prepends the given repositories to the source and destination paths all objects in the - transfer.""" - for miria_file in self.files_to_transfer: - miria_file.add_repositories( - source_prefix=source_prefix, destination_prefix=destination_prefix - ) - - def model_dump(self, **kwargs) -> dict: - """Creates a correctly structured dict to be used as the request payload.""" - payload: dict = super().model_dump(exclude={"files_to_transfer"}) - payload["pathInfo"] = [miria_file.model_dump() for miria_file in self.files_to_transfer] - payload["metadataList"] = [] - return payload - - def post_request(self, url: str, headers: dict) -> "TransferJob": - """Sends a request to the given url with, the given headers, and its own content as payload. - - Arguments: - url: URL to which the POST goes to. - headers: Headers which are set in the request - Raises: - HTTPError if the response status is not successful. - ValidationError if the response does not conform to the expected response structure. - Returns: - The job ID of the launched transfer task. - """ - - LOG.info(get_request_log(headers=headers, body=self.model_dump())) - - response: Response = APIRequest.api_request_from_content( - api_method=APIMethods.POST, - url=url, - headers=headers, - json=self.model_dump(), - verify=False, - ) - response.raise_for_status() - return TransferJob.model_validate(response.json()) - - -class AuthPayload(BaseModel): - """Model representing the payload for an Authentication request.""" - - dbName: str - name: str - password: str - superUser: bool = False - - -class RefreshPayload(BaseModel): - """Model representing the payload for Auth-token refresh request.""" - - refresh: str - - -class AuthToken(BaseModel): - """Model representing the response fields from an access request to the Dataflow API.""" - - access: str - expire: int - refresh: str | None = None - - -class TransferJob(BaseModel): - """Model representing th response fields of an archive or retrieve reqeust to the Dataflow - API.""" - - job_id: int = Field(alias="jobId") - - -class GetJobStatusResponse(BaseModel): - """Model representing the response fields from a get_job_status post.""" - - job_id: int = Field(alias="id") - status: JobStatus - - -class GetJobStatusPayload(BaseModel): - """Model representing the payload for a get_job_status request.""" - - id: int - - def get_job_status(self, url: str, headers: dict) -> GetJobStatusResponse: - """Sends a get request to the given URL with the given headers. - Returns the parsed status response of the task specified in the URL. - Raises: - HTTPError if the response code is not ok. - """ - - LOG.info(get_request_log(headers=headers, body=self.model_dump())) - - response: Response = APIRequest.api_request_from_content( - api_method=APIMethods.GET, - url=url, - headers=headers, - json=self.model_dump(), - verify=False, - ) - response.raise_for_status() - return GetJobStatusResponse.model_validate(response.json()) - - -class DeleteFileResponse(BaseModel): - message: str - - -class DeleteFilePayload(BaseModel): - global_path: str - - def delete_file(self, url: str, headers: dict) -> DeleteFileResponse: - """Posts to the given URL with the given headers to delete the file or directory at the specified global path. - Returns the parsed response. - Raises: - HTTPError if the response code is not ok. - """ - LOG.info(get_request_log(headers=headers, body=self.model_dump())) - - response: Response = APIRequest.api_request_from_content( - api_method=APIMethods.POST, - url=url, - headers=headers, - json=self.model_dump(), - verify=False, - ) - response.raise_for_status() - return DeleteFileResponse.model_validate(response.json()) - class DDNDataFlowClient(ArchiveHandler): """Class for archiving and retrieving folders via DDN Dataflow.""" diff --git a/cg/meta/archive/ddn/models.py b/cg/meta/archive/ddn/models.py new file mode 100644 index 0000000000..d8c6e0b0b8 --- /dev/null +++ b/cg/meta/archive/ddn/models.py @@ -0,0 +1,201 @@ +import logging +from pathlib import Path + +from housekeeper.store.models import File +from pydantic import BaseModel, Field +from requests import Response + +from cg.constants.constants import APIMethods +from cg.io.controller import APIRequest +from cg.meta.archive.ddn.constants import OSTYPE, ROOT_TO_TRIM, JobStatus +from cg.meta.archive.models import FileTransferData, SampleAndDestination +from cg.store.models import Sample + +LOG = logging.getLogger(__name__) + + +def get_request_log(headers: dict, body: dict): + return "Sending request with headers: \n" + f"{headers} \n" + "and body: \n" + f"{body}" + + +class MiriaObject(FileTransferData): + """Model for representing a singular object transfer.""" + + _metadata = None + destination: str + source: str + + @classmethod + def create_from_file_and_sample( + cls, file: File, sample: Sample, is_archiving: bool = True + ) -> "MiriaObject": + """Instantiates the class from a File and Sample object.""" + if is_archiving: + return cls(destination=sample.internal_id, source=file.full_path) + return cls(destination=file.full_path, source=sample.internal_id) + + @classmethod + def create_from_sample_and_destination( + cls, sample_and_destination: SampleAndDestination + ) -> "MiriaObject": + """Instantiates the class from a SampleAndDestination object, + i.e. when we want to fetch a folder containing all spring files for said sample.""" + return cls( + destination=sample_and_destination.destination, + source=sample_and_destination.sample.internal_id, + ) + + def trim_path(self, attribute_to_trim: str): + """Trims the given attribute (source or destination) from its root directory.""" + setattr( + self, + attribute_to_trim, + f"/{Path(getattr(self, attribute_to_trim)).relative_to(ROOT_TO_TRIM)}", + ) + + def add_repositories(self, source_prefix: str, destination_prefix: str): + """Prepends the given repositories to the source and destination paths.""" + self.source: str = source_prefix + self.source + self.destination: str = destination_prefix + self.destination + + +class TransferPayload(BaseModel): + """Model for representing a Dataflow transfer task.""" + + files_to_transfer: list[MiriaObject] + osType: str = OSTYPE + createFolder: bool = True + settings: list[dict] = [] + + def trim_paths(self, attribute_to_trim: str): + """Trims the source path from its root directory for all objects in the transfer.""" + for miria_file in self.files_to_transfer: + miria_file.trim_path(attribute_to_trim=attribute_to_trim) + + def add_repositories(self, source_prefix: str, destination_prefix: str): + """Prepends the given repositories to the source and destination paths all objects in the + transfer.""" + for miria_file in self.files_to_transfer: + miria_file.add_repositories( + source_prefix=source_prefix, destination_prefix=destination_prefix + ) + + def model_dump(self, **kwargs) -> dict: + """Creates a correctly structured dict to be used as the request payload.""" + payload: dict = super().model_dump(exclude={"files_to_transfer"}) + payload["pathInfo"] = [miria_file.model_dump() for miria_file in self.files_to_transfer] + payload["metadataList"] = [] + return payload + + def post_request(self, url: str, headers: dict) -> "TransferResponse": + """Sends a request to the given url with, the given headers, and its own content as payload. + + Arguments: + url: URL to which the POST goes to. + headers: Headers which are set in the request + Raises: + HTTPError if the response status is not successful. + ValidationError if the response does not conform to the expected response structure. + Returns: + The job ID of the launched transfer task. + """ + + LOG.info(get_request_log(headers=headers, body=self.model_dump())) + + response: Response = APIRequest.api_request_from_content( + api_method=APIMethods.POST, + url=url, + headers=headers, + json=self.model_dump(), + verify=False, + ) + response.raise_for_status() + return TransferResponse.model_validate(response.json()) + + +class TransferResponse(BaseModel): + """Model representing th response fields of an archive or retrieve reqeust to the Dataflow + API.""" + + job_id: int = Field(alias="jobId") + + +class AuthPayload(BaseModel): + """Model representing the payload for an Authentication request.""" + + dbName: str + name: str + password: str + superUser: bool = False + + +class RefreshPayload(BaseModel): + """Model representing the payload for Auth-token refresh request.""" + + refresh: str + + +class AuthToken(BaseModel): + """Model representing the response fields from an access request to the Dataflow API.""" + + access: str + expire: int + refresh: str | None = None + + +class GetJobStatusResponse(BaseModel): + """Model representing the response fields from a get_job_status post.""" + + job_id: int = Field(alias="id") + status: JobStatus + + +class GetJobStatusPayload(BaseModel): + """Model representing the payload for a get_job_status request.""" + + id: int + + def get_job_status(self, url: str, headers: dict) -> GetJobStatusResponse: + """Sends a get request to the given URL with the given headers. + Returns the parsed status response of the task specified in the URL. + Raises: + HTTPError if the response code is not ok. + """ + + LOG.info(get_request_log(headers=headers, body=self.model_dump())) + + response: Response = APIRequest.api_request_from_content( + api_method=APIMethods.GET, + url=url, + headers=headers, + json=self.model_dump(), + verify=False, + ) + response.raise_for_status() + return GetJobStatusResponse.model_validate(response.json()) + + +class DeleteFileResponse(BaseModel): + message: str + + +class DeleteFilePayload(BaseModel): + global_path: str + + def delete_file(self, url: str, headers: dict) -> DeleteFileResponse: + """Posts to the given URL with the given headers to delete the file or directory at the specified global path. + Returns the parsed response. + Raises: + HTTPError if the response code is not ok. + """ + LOG.info(get_request_log(headers=headers, body=self.model_dump())) + + response: Response = APIRequest.api_request_from_content( + api_method=APIMethods.POST, + url=url, + headers=headers, + json=self.model_dump(), + verify=False, + ) + response.raise_for_status() + return DeleteFileResponse.model_validate(response.json()) diff --git a/cg/meta/workflow/analysis.py b/cg/meta/workflow/analysis.py index f067e63944..a4f2a39d98 100644 --- a/cg/meta/workflow/analysis.py +++ b/cg/meta/workflow/analysis.py @@ -9,7 +9,7 @@ from housekeeper.store.models import Bundle, Version from cg.apps.environ import environ_email -from cg.constants import EXIT_FAIL, EXIT_SUCCESS, Pipeline, Priority +from cg.constants import EXIT_FAIL, EXIT_SUCCESS, Pipeline, Priority, SequencingFileTag from cg.constants.constants import ( AnalysisType, CaseActions, @@ -24,6 +24,7 @@ from cg.meta.workflow.fastq import FastqHandler from cg.models.analysis import AnalysisModel from cg.models.cg_config import CGConfig +from cg.models.fastq import FastqFileMeta from cg.store.models import Analysis, BedVersion, Case, CaseSample, Sample LOG = logging.getLogger(__name__) @@ -31,11 +32,12 @@ def add_gene_panel_combo(default_panels: set[str]) -> set[str]: """Add gene panels combinations for gene panels being part of gene panel combination and return updated gene panels.""" - all_panels = default_panels + additional_panels = set() for panel in default_panels: if panel in GenePanelCombo.COMBO_1: - all_panels |= GenePanelCombo.COMBO_1.get(panel) - return all_panels + additional_panels |= GenePanelCombo.COMBO_1.get(panel) + default_panels |= additional_panels + return default_panels class AnalysisAPI(MetaAPI): @@ -288,58 +290,59 @@ def get_cases_to_qc(self) -> list[Case]: if self.trailblazer_api.is_latest_analysis_qc(case_id=case.internal_id) ] - def get_sample_fastq_destination_dir(self, case: Case, sample: Sample): + def get_sample_fastq_destination_dir(self, case: Case, sample: Sample) -> Path: """Return the path to the FASTQ destination directory.""" raise NotImplementedError - def gather_file_metadata_for_sample(self, sample_obj: Sample) -> list[dict]: + def gather_file_metadata_for_sample(self, sample: Sample) -> list[FastqFileMeta]: return [ - self.fastq_handler.parse_file_data(file_obj.full_path) - for file_obj in self.housekeeper_api.files( - bundle=sample_obj.internal_id, tags=["fastq"] + self.fastq_handler.parse_file_data(hk_file.full_path) + for hk_file in self.housekeeper_api.files( + bundle=sample.internal_id, tags={SequencingFileTag.FASTQ} ) ] def link_fastq_files_for_sample( - self, case_obj: Case, sample_obj: Sample, concatenate: bool = False + self, case: Case, sample: Sample, concatenate: bool = False ) -> None: """ - Link FASTQ files for a sample to working directory. + Link FASTQ files for a sample to the work directory. If pipeline input requires concatenated fastq, files can also be concatenated """ - linked_reads_paths = {1: [], 2: []} - concatenated_paths = {1: "", 2: ""} - files: list[dict] = self.gather_file_metadata_for_sample(sample_obj=sample_obj) - sorted_files = sorted(files, key=lambda k: k["path"]) - fastq_dir = self.get_sample_fastq_destination_dir(case=case_obj, sample=sample_obj) + linked_reads_paths: dict[int, list[Path]] = {1: [], 2: []} + concatenated_paths: dict[int, str] = {1: "", 2: ""} + fastq_files_meta: list[FastqFileMeta] = self.gather_file_metadata_for_sample(sample=sample) + sorted_fastq_files_meta: list[FastqFileMeta] = sorted( + fastq_files_meta, key=lambda k: k.path + ) + fastq_dir: Path = self.get_sample_fastq_destination_dir(case=case, sample=sample) fastq_dir.mkdir(parents=True, exist_ok=True) - for fastq_data in sorted_files: - fastq_path = Path(fastq_data["path"]) - fastq_name = self.fastq_handler.create_fastq_name( - lane=fastq_data["lane"], - flowcell=fastq_data["flowcell"], - sample=sample_obj.internal_id, - read=fastq_data["read"], - undetermined=fastq_data["undetermined"], - meta=self.get_additional_naming_metadata(sample_obj), + for fastq_file in sorted_fastq_files_meta: + fastq_file_name: str = self.fastq_handler.create_fastq_name( + lane=fastq_file.lane, + flow_cell=fastq_file.flow_cell_id, + sample=sample.internal_id, + read_direction=fastq_file.read_direction, + undetermined=fastq_file.undetermined, + meta=self.get_lims_naming_metadata(sample), ) - destination_path: Path = fastq_dir / fastq_name - linked_reads_paths[fastq_data["read"]].append(destination_path) + destination_path = Path(fastq_dir, fastq_file_name) + linked_reads_paths[fastq_file.read_direction].append(destination_path) concatenated_paths[ - fastq_data["read"] - ] = f"{fastq_dir}/{self.fastq_handler.get_concatenated_name(fastq_name)}" + fastq_file.read_direction + ] = f"{fastq_dir}/{self.fastq_handler.get_concatenated_name(fastq_file_name)}" if not destination_path.exists(): - LOG.info(f"Linking: {fastq_path} -> {destination_path}") - destination_path.symlink_to(fastq_path) + LOG.info(f"Linking: {fastq_file.path} -> {destination_path}") + destination_path.symlink_to(fastq_file.path) else: LOG.warning(f"Destination path already exists: {destination_path}") if not concatenate: return - LOG.info("Concatenation in progress for sample %s.", sample_obj.internal_id) + LOG.info(f"Concatenation in progress for sample: {sample.internal_id}") for read, value in linked_reads_paths.items(): self.fastq_handler.concatenate(linked_reads_paths[read], concatenated_paths[read]) self.fastq_handler.remove_files(value) @@ -435,7 +438,7 @@ def get_date_from_file_path(file_path: Path) -> dt.datetime.date: """ return dt.datetime.fromtimestamp(int(os.path.getctime(file_path))) - def get_additional_naming_metadata(self, sample_obj: Sample) -> str | None: + def get_lims_naming_metadata(self, sample: Sample) -> str | None: return None def get_latest_metadata(self, case_id: str) -> AnalysisModel: diff --git a/cg/meta/workflow/balsamic.py b/cg/meta/workflow/balsamic.py index 03030875e6..d1af0f61f5 100644 --- a/cg/meta/workflow/balsamic.py +++ b/cg/meta/workflow/balsamic.py @@ -23,6 +23,7 @@ BalsamicWGSQCMetrics, ) from cg.models.cg_config import CGConfig +from cg.models.fastq import FastqFileMeta from cg.store.models import Case, CaseSample, Sample from cg.utils import Process from cg.utils.utils import build_command_from_dict, get_string_from_list_by_pattern @@ -146,22 +147,22 @@ def get_sample_fastq_destination_dir(self, case: Case, sample: Sample = None) -> return Path(self.get_case_path(case.internal_id), FileFormat.FASTQ) def link_fastq_files(self, case_id: str, dry_run: bool = False) -> None: - case_obj = self.status_db.get_case_by_internal_id(internal_id=case_id) - for link in case_obj.links: - self.link_fastq_files_for_sample( - case_obj=case_obj, sample_obj=link.sample, concatenate=True - ) + case = self.status_db.get_case_by_internal_id(internal_id=case_id) + for link in case.links: + self.link_fastq_files_for_sample(case=case, sample=link.sample, concatenate=True) def get_concatenated_fastq_path(self, link_object: CaseSample) -> Path: - """Returns path to the concatenated FASTQ file of a sample""" - file_collection: list[dict] = self.gather_file_metadata_for_sample(link_object.sample) + """Returns the path to the concatenated FASTQ file of a sample""" + file_collection: list[FastqFileMeta] = self.gather_file_metadata_for_sample( + link_object.sample + ) fastq_data = file_collection[0] linked_fastq_name = self.fastq_handler.create_fastq_name( - lane=fastq_data["lane"], - flowcell=fastq_data["flowcell"], + lane=fastq_data.lane, + flow_cell=fastq_data.flow_cell_id, sample=link_object.sample.internal_id, - read=fastq_data["read"], - undetermined=fastq_data["undetermined"], + read_direction=fastq_data.read_direction, + undetermined=fastq_data.undetermined, ) concatenated_fastq_name: str = self.fastq_handler.get_concatenated_name(linked_fastq_name) return Path( diff --git a/cg/meta/workflow/fastq.py b/cg/meta/workflow/fastq.py index 5a6055a032..77b4419a49 100644 --- a/cg/meta/workflow/fastq.py +++ b/cg/meta/workflow/fastq.py @@ -13,6 +13,10 @@ import shutil from pathlib import Path +from cg.constants import FileExtensions +from cg.io.gzip import read_gzip_first_line +from cg.models.fastq import FastqFileMeta, GetFastqFileMeta + LOG = logging.getLogger(__name__) DEFAULT_DATE_STR = ( @@ -23,6 +27,10 @@ ) +def _is_undetermined_in_path(file_path: Path) -> bool: + return "Undetermined" in file_path + + class FastqHandler: """Handles fastq file linking""" @@ -90,103 +98,56 @@ def get_concatenated_name(linked_fastq_name: str) -> str: return f"concatenated_{'_'.join(linked_fastq_name.split('_')[-4:])}" @staticmethod - def parse_header(line: str) -> dict: - """Generates a dict with parsed lanes, flowcells and read numbers - Handle illumina's two different header formats + def parse_fastq_header(line: str) -> FastqFileMeta | None: + """Parse and return fastq header metadata. + Handle Illumina's two different header formats @see https://en.wikipedia.org/wiki/FASTQ_format - - @HWUSI-EAS100R:6:73:941:1973#0/1 - - HWUSI-EAS100R the unique instrument name - 6 flowcell lane - 73 tile number within the flowcell lane - 941 'x'-coordinate of the cluster within the tile - 1973 'y'-coordinate of the cluster within the tile - #0 index number for a multiplexed sample (0 for no indexing) - /1 the member of a pair, /1 or /2 (paired-end or mate-pair reads only) - - Versions of the Illumina pipeline since 1.4 appear to use #NNNNNN - instead of #0 for the multiplex ID, where NNNNNN is the sequence of the - multiplex tag. - - With Casava 1.8 the format of the '@' line has changed: - - @EAS139:136:FC706VJ:2:2104:15343:197393 1:Y:18:ATCACG - - EAS139 the unique instrument name - 136 the run id - FC706VJ the flowcell id - 2 flowcell lane - 2104 tile number within the flowcell lane - 15343 'x'-coordinate of the cluster within the tile - 197393 'y'-coordinate of the cluster within the tile - 1 the member of a pair, 1 or 2 (paired-end or mate-pair reads only) - Y Y if the read is filtered, N otherwise - 18 0 when none of the control bits are on, otherwise it is an even number - ATCACG index sequence + Raise: + TypeError if unable to split line into expected parts. """ - - fastq_meta = {"lane": None, "flowcell": None, "readnumber": None} - parts = line.split(":") - if len(parts) == 5: # @HWUSI-EAS100R:6:73:941:1973#0/1 - fastq_meta["lane"] = parts[1] - fastq_meta["flowcell"] = "XXXXXX" - fastq_meta["readnumber"] = parts[-1].split("/")[-1] - if len(parts) == 10: # @EAS139:136:FC706VJ:2:2104:15343:197393 1:Y:18:ATCACG - fastq_meta["lane"] = parts[3] - fastq_meta["flowcell"] = parts[2] - fastq_meta["readnumber"] = parts[6].split(" ")[-1] - if len(parts) == 7: # @ST-E00201:173:HCLCGALXX:1:2106:22516:34834/1 - fastq_meta["lane"] = parts[3] - fastq_meta["flowcell"] = parts[2] - fastq_meta["readnumber"] = parts[-1].split("/")[-1] - - return fastq_meta + try: + return GetFastqFileMeta.header_format.get(len(parts))(parts=parts) + except TypeError as exception: + LOG.error(f"Could not parse header format for header: {line}") + raise exception @staticmethod - def parse_file_data(fastq_path: Path) -> dict: - with gzip.open(fastq_path) as handle: - header_line = handle.readline().decode() - header_info = FastqHandler.parse_header(header_line) - - data = { - "path": fastq_path, - "lane": int(header_info["lane"]), - "flowcell": header_info["flowcell"], - "read": int(header_info["readnumber"]), - "undetermined": ("Undetermined" in fastq_path), - } - matches = re.findall(r"-l[1-9]t([1-9]{2})_", str(fastq_path)) - if len(matches) > 0: - data["flowcell"] = f"{data['flowcell']}-{matches[0]}" - return data + def parse_file_data(fastq_path: Path) -> FastqFileMeta: + header_line: str = read_gzip_first_line(file_path=fastq_path) + fastq_file_meta: FastqFileMeta = FastqHandler.parse_fastq_header(header_line) + fastq_file_meta.path = fastq_path + fastq_file_meta.undetermined = _is_undetermined_in_path(fastq_path) + matches = re.findall(r"-l[1-9]t([1-9]{2})_", str(fastq_path)) + if len(matches) > 0: + fastq_file_meta.flow_cell_id = f"{fastq_file_meta.flow_cell_id}-{matches[0]}" + return fastq_file_meta @staticmethod def create_fastq_name( - lane: str, + lane: int, flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, meta: str | None = None, ) -> str: """Name a FASTQ file with standard conventions and - no naming constrains from pipeline.""" + no naming constrains from the pipeline.""" flow_cell: str = f"{flow_cell}-undetermined" if undetermined else flow_cell date: str = date if isinstance(date, str) else date.strftime("%y%m%d") - return f"{lane}_{date}_{flow_cell}_{sample}_{index}_{read}.fastq.gz" + return f"{lane}_{date}_{flow_cell}_{sample}_{index}_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" class BalsamicFastqHandler(FastqHandler): @staticmethod def create_fastq_name( - lane: str, - flowcell: str, + lane: int, + flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, @@ -194,36 +155,36 @@ def create_fastq_name( ) -> str: """Name a FASTQ file following Balsamic conventions. Naming must be xxx_R_1.fastq.gz and xxx_R_2.fastq.gz""" - flowcell = f"{flowcell}-undetermined" if undetermined else flowcell - date_str = date if isinstance(date, str) else date.strftime("%y%m%d") - return f"{lane}_{date_str}_{flowcell}_{sample}_{index}_R_{read}.fastq.gz" + flow_cell = f"{flow_cell}-undetermined" if undetermined else flow_cell + date: str = date if isinstance(date, str) else date.strftime("%y%m%d") + return f"{lane}_{date}_{flow_cell}_{sample}_{index}_R_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" class MipFastqHandler(FastqHandler): @staticmethod def create_fastq_name( - lane: str, - flowcell: str, + lane: int, + flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, meta: str | None = None, ) -> str: """Name a FASTQ file following MIP conventions.""" - flowcell = f"{flowcell}-undetermined" if undetermined else flowcell - date_str = date if isinstance(date, str) else date.strftime("%y%m%d") - return f"{lane}_{date_str}_{flowcell}_{sample}_{index}_{read}.fastq.gz" + flow_cell = f"{flow_cell}-undetermined" if undetermined else flow_cell + date: str = date if isinstance(date, str) else date.strftime("%y%m%d") + return f"{lane}_{date}_{flow_cell}_{sample}_{index}_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" class MicrosaltFastqHandler(FastqHandler): @staticmethod def create_fastq_name( - lane: str, - flowcell: str, + lane: int, + flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, @@ -231,19 +192,17 @@ def create_fastq_name( ) -> str: """Name a FASTQ file following usalt conventions. Naming must be xxx_R_1.fastq.gz and xxx_R_2.fastq.gz""" - # ACC1234A1_FCAB1ABC2_L1_1.fastq.gz sample_flowcell_lane_read.fastq.gz - - flowcell = f"{flowcell}-undetermined" if undetermined else flowcell - return f"{sample}_{flowcell}_L{lane}_{read}.fastq.gz" + flow_cell = f"{flow_cell}-undetermined" if undetermined else flow_cell + return f"{sample}_{flow_cell}_L{lane}_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" class MutantFastqHandler(FastqHandler): @staticmethod def create_fastq_name( - lane: str, - flowcell: str, + lane: int, + flow_cell: str, sample: str, - read: str, + read_direction: int, date: dt.datetime = DEFAULT_DATE_STR, index: str = DEFAULT_INDEX, undetermined: str | None = None, @@ -251,9 +210,7 @@ def create_fastq_name( ) -> str: """Name a FASTQ file following mutant conventions. Naming must be xxx_R_1.fastq.gz and xxx_R_2.fastq.gz""" - # ACC1234A1_FCAB1ABC2_L1_1.fastq.gz sample_flowcell_lane_read.fastq.gz - - return f"{flowcell}_L{lane}_{meta}_{read}.fastq.gz" + return f"{flow_cell}_L{lane}_{meta}_{read_direction}{FileExtensions.FASTQ}{FileExtensions.GZIP}" @staticmethod def get_concatenated_name(linked_fastq_name: str) -> str: @@ -275,7 +232,7 @@ def create_nanopore_fastq_name( filenr: str, meta: str | None = None, ) -> str: - return f"{flowcell}_{sample}_{meta}_{filenr}.fastq.gz" + return f"{flowcell}_{sample}_{meta}_{filenr}{FileExtensions.FASTQ}{FileExtensions.GZIP}" @staticmethod def parse_nanopore_file_data(fastq_path: Path) -> dict: diff --git a/cg/meta/workflow/microsalt.py b/cg/meta/workflow/microsalt.py index e51283303f..3bd36e91fe 100644 --- a/cg/meta/workflow/microsalt.py +++ b/cg/meta/workflow/microsalt.py @@ -152,7 +152,7 @@ def link_fastq_files(self, case_id: str, sample_id: str | None, dry_run: bool = case_obj: Case = self.status_db.get_case_by_internal_id(internal_id=case_id) samples: list[Sample] = self.get_samples(case_id=case_id, sample_id=sample_id) for sample_obj in samples: - self.link_fastq_files_for_sample(case_obj=case_obj, sample_obj=sample_obj) + self.link_fastq_files_for_sample(case=case_obj, sample=sample_obj) def get_samples(self, case_id: str, sample_id: str | None = None) -> list[Sample]: """Returns a list of samples to configure diff --git a/cg/meta/workflow/mip.py b/cg/meta/workflow/mip.py index 5c6a3f2581..e6dc4993dc 100644 --- a/cg/meta/workflow/mip.py +++ b/cg/meta/workflow/mip.py @@ -143,12 +143,9 @@ def get_sample_fastq_destination_dir(self, case: Case, sample: Sample) -> Path: ) def link_fastq_files(self, case_id: str, dry_run: bool = False) -> None: - case_obj = self.status_db.get_case_by_internal_id(internal_id=case_id) - for link in case_obj.links: - self.link_fastq_files_for_sample( - case_obj=case_obj, - sample_obj=link.sample, - ) + case: Case = self.status_db.get_case_by_internal_id(internal_id=case_id) + for link in case.links: + self.link_fastq_files_for_sample(case=case, sample=link.sample) def write_panel(self, case_id: str, content: list[str]) -> None: """Write the gene panel to case dir.""" diff --git a/cg/meta/workflow/mutant.py b/cg/meta/workflow/mutant.py index b60c99607c..7c99c30082 100644 --- a/cg/meta/workflow/mutant.py +++ b/cg/meta/workflow/mutant.py @@ -2,7 +2,7 @@ import shutil from pathlib import Path -from cg.constants import Pipeline +from cg.constants import Pipeline, SequencingFileTag from cg.constants.constants import FileFormat from cg.io.controller import WriteFile from cg.meta.workflow.analysis import AnalysisAPI @@ -81,16 +81,14 @@ def link_fastq_files(self, case_id: str, dry_run: bool = False) -> None: application = sample_obj.application_version.application if self._is_nanopore(application): self.link_nanopore_fastq_for_sample( - case_obj=case_obj, sample_obj=sample_obj, concatenate=True + case=case_obj, sample=sample_obj, concatenate=True ) continue if not sample_obj.sequencing_qc: LOG.info("Sample %s read count below threshold, skipping!", sample_obj.internal_id) continue else: - self.link_fastq_files_for_sample( - case_obj=case_obj, sample_obj=sample_obj, concatenate=True - ) + self.link_fastq_files_for_sample(case=case_obj, sample=sample_obj, concatenate=True) def get_sample_parameters(self, sample_obj: Sample) -> MutantSampleConfig: return MutantSampleConfig( @@ -140,16 +138,15 @@ def create_case_config(self, case_id: str, dry_run: bool) -> None: ) LOG.info("Saved config to %s", config_path) - def get_additional_naming_metadata(self, sample_obj: Sample) -> str | None: - sample_name = sample_obj.name + def get_lims_naming_metadata(self, sample: Sample) -> str | None: region_code = self.lims_api.get_sample_attribute( - lims_id=sample_obj.internal_id, key="region_code" + lims_id=sample.internal_id, key="region_code" ).split(" ")[0] lab_code = self.lims_api.get_sample_attribute( - lims_id=sample_obj.internal_id, key="lab_code" + lims_id=sample.internal_id, key="lab_code" ).split(" ")[0] - return f"{region_code}_{lab_code}_{sample_name}" + return f"{region_code}_{lab_code}_{sample.name}" def run_analysis(self, case_id: str, dry_run: bool, config_artic: str = None) -> None: if self.get_case_output_path(case_id=case_id).exists(): @@ -194,34 +191,34 @@ def get_cases_to_store(self) -> list[Case]: if Path(self.get_deliverables_file_path(case_id=case.internal_id)).exists() ] - def get_metadata_for_nanopore_sample(self, sample_obj: Sample) -> list[dict]: + def get_metadata_for_nanopore_sample(self, sample: Sample) -> list[dict]: return [ self.fastq_handler.parse_nanopore_file_data(file_obj.full_path) for file_obj in self.housekeeper_api.files( - bundle=sample_obj.internal_id, tags=["fastq"] + bundle=sample.internal_id, tags={SequencingFileTag.FASTQ} ) ] def link_nanopore_fastq_for_sample( - self, case_obj: Case, sample_obj: Sample, concatenate: bool = False + self, case: Case, sample: Sample, concatenate: bool = False ) -> None: """ Link FASTQ files for a nanopore sample to working directory. If pipeline input requires concatenated fastq, files can also be concatenated """ read_paths = [] - files: list[dict] = self.get_metadata_for_nanopore_sample(sample_obj=sample_obj) + files: list[dict] = self.get_metadata_for_nanopore_sample(sample=sample) sorted_files = sorted(files, key=lambda k: k["path"]) - fastq_dir = self.get_sample_fastq_destination_dir(case=case_obj, sample=sample_obj) + fastq_dir = self.get_sample_fastq_destination_dir(case=case, sample=sample) fastq_dir.mkdir(parents=True, exist_ok=True) for counter, fastq_data in enumerate(sorted_files): fastq_path = Path(fastq_data["path"]) fastq_name = self.fastq_handler.create_nanopore_fastq_name( flowcell=fastq_data["flowcell"], - sample=sample_obj.internal_id, + sample=sample.internal_id, filenr=str(counter), - meta=self.get_additional_naming_metadata(sample_obj), + meta=self.get_lims_naming_metadata(sample), ) destination_path: Path = fastq_dir / fastq_name read_paths.append(destination_path) @@ -237,13 +234,13 @@ def link_nanopore_fastq_for_sample( concatenated_fastq_name = self.fastq_handler.create_nanopore_fastq_name( flowcell=fastq_data["flowcell"], - sample=sample_obj.internal_id, + sample=sample.internal_id, filenr=str(1), - meta=self.get_additional_naming_metadata(sample_obj), + meta=self.get_lims_naming_metadata(sample), ) concatenated_path = ( f"{fastq_dir}/{self.fastq_handler.get_concatenated_name(concatenated_fastq_name)}" ) - LOG.info("Concatenation in progress for sample %s.", sample_obj.internal_id) + LOG.info(f"Concatenation in progress for sample {sample.internal_id}.") self.fastq_handler.concatenate(read_paths, concatenated_path) self.fastq_handler.remove_files(read_paths) diff --git a/cg/meta/workflow/nf_analysis.py b/cg/meta/workflow/nf_analysis.py index 194481f354..94e7840cc8 100644 --- a/cg/meta/workflow/nf_analysis.py +++ b/cg/meta/workflow/nf_analysis.py @@ -1,5 +1,4 @@ import logging -import operator from datetime import datetime from pathlib import Path from typing import Any @@ -13,6 +12,7 @@ from cg.meta.workflow.analysis import AnalysisAPI from cg.meta.workflow.nf_handlers import NextflowHandler, NfTowerHandler from cg.models.cg_config import CGConfig +from cg.models.fastq import FastqFileMeta from cg.models.nf_analysis import FileDeliverable, PipelineDeliverables from cg.models.rnafusion.rnafusion import CommandArgs from cg.store.models import Sample @@ -136,7 +136,7 @@ def get_workdir_path(self, case_id: str, work_dir: Path | None = None) -> Path: @staticmethod def extract_read_files( - metadata: list, forward_read: bool = False, reverse_read: bool = False + metadata: list[FastqFileMeta], forward_read: bool = False, reverse_read: bool = False ) -> list[str]: """Extract a list of fastq file paths for either forward or reverse reads.""" if forward_read and not reverse_read: @@ -145,8 +145,12 @@ def extract_read_files( read_direction = 2 else: raise ValueError("Either forward or reverse needs to be specified") - sorted_metadata: list = sorted(metadata, key=operator.itemgetter("path")) - return [d["path"] for d in sorted_metadata if d["read"] == read_direction] + sorted_metadata: list = sorted(metadata, key=lambda k: k.path) + return [ + fastq_file.path + for fastq_file in sorted_metadata + if fastq_file.read_direction == read_direction + ] def verify_sample_sheet_exists(self, case_id: str, dry_run: bool = False) -> None: """Raise an error if sample sheet file is not found.""" diff --git a/cg/meta/workflow/rnafusion.py b/cg/meta/workflow/rnafusion.py index 9611e40a04..1014f62322 100644 --- a/cg/meta/workflow/rnafusion.py +++ b/cg/meta/workflow/rnafusion.py @@ -19,6 +19,7 @@ MetricsDeliverablesCondition, MultiqcDataJson, ) +from cg.models.fastq import FastqFileMeta from cg.models.nf_analysis import PipelineDeliverables from cg.models.rnafusion.rnafusion import ( RnafusionAnalysis, @@ -46,7 +47,7 @@ def __init__( self.profile: str = config.rnafusion.profile self.conda_env: str = config.rnafusion.conda_env self.conda_binary: str = config.rnafusion.conda_binary - self.tower_binary_path: str = config.rnafusion.tower_binary_path + self.tower_binary_path: str = config.tower_binary_path self.tower_pipeline: str = config.rnafusion.tower_pipeline self.account: str = config.rnafusion.slurm.account self.email: str = config.rnafusion.slurm.mail_user @@ -72,7 +73,7 @@ def get_sample_sheet_content_per_sample( self, sample: Sample, case_id: str, strandedness: Strandedness ) -> list[list[str]]: """Get sample sheet content per sample.""" - sample_metadata: list[dict] = self.gather_file_metadata_for_sample(sample_obj=sample) + sample_metadata: list[FastqFileMeta] = self.gather_file_metadata_for_sample(sample=sample) fastq_forward_read_paths: list[str] = self.extract_read_files( metadata=sample_metadata, forward_read=True ) diff --git a/cg/meta/workflow/taxprofiler.py b/cg/meta/workflow/taxprofiler.py index 3af5f89ee1..e12350e961 100644 --- a/cg/meta/workflow/taxprofiler.py +++ b/cg/meta/workflow/taxprofiler.py @@ -8,6 +8,7 @@ from cg.constants.sequencing import SequencingPlatform from cg.meta.workflow.nf_analysis import NfAnalysisAPI from cg.models.cg_config import CGConfig +from cg.models.fastq import FastqFileMeta from cg.models.taxprofiler.taxprofiler import ( TaxprofilerParameters, TaxprofilerSampleSheetEntry, @@ -35,7 +36,7 @@ def __init__( self.revision: str = config.taxprofiler.revision self.hostremoval_reference: Path = Path(config.taxprofiler.hostremoval_reference) self.databases: Path = Path(config.taxprofiler.databases) - self.tower_binary_path: str = config.taxprofiler.tower_binary_path + self.tower_binary_path: str = config.tower_binary_path self.tower_pipeline: str = config.taxprofiler.tower_pipeline self.account: str = config.taxprofiler.slurm.account self.email: str = config.taxprofiler.slurm.mail_user @@ -47,7 +48,7 @@ def get_sample_sheet_content_per_sample( ) -> list[list[str]]: """Get sample sheet content per sample.""" sample_name: str = sample.name - sample_metadata: list[str] = self.gather_file_metadata_for_sample(sample) + sample_metadata: list[FastqFileMeta] = self.gather_file_metadata_for_sample(sample) fastq_forward_read_paths: list[str] = self.extract_read_files( metadata=sample_metadata, forward_read=True ) diff --git a/cg/models/cg_config.py b/cg/models/cg_config.py index 8c3c28be0b..c932a7f542 100644 --- a/cg/models/cg_config.py +++ b/cg/models/cg_config.py @@ -158,7 +158,6 @@ class RareDiseaseConfig(CommonAppConfig): revision: str root: str slurm: SlurmConfig - tower_binary_path: str tower_pipeline: str @@ -174,7 +173,6 @@ class RnafusionConfig(CommonAppConfig): launch_directory: str revision: str slurm: SlurmConfig - tower_binary_path: str tower_pipeline: str @@ -190,7 +188,6 @@ class TaxprofilerConfig(CommonAppConfig): revision: str root: str slurm: SlurmConfig - tower_binary_path: str tower_pipeline: str @@ -260,6 +257,7 @@ class CGConfig(BaseModel): environment: Literal["production", "stage"] = "stage" flow_cells_dir: str madeline_exe: str + tower_binary_path: str max_flowcells: int | None data_input: DataInput | None = None # Base APIs that always should exist diff --git a/cg/models/fastq.py b/cg/models/fastq.py new file mode 100644 index 0000000000..df3470f65d --- /dev/null +++ b/cg/models/fastq.py @@ -0,0 +1,38 @@ +from pathlib import Path +from typing import Callable + +from pydantic import BaseModel + + +class FastqFileMeta(BaseModel): + path: Path | None = None + lane: int + read_direction: int + undetermined: bool | None = None + flow_cell_id: str + + +def _get_header_meta_casava_five_parts(parts: list[str]) -> FastqFileMeta: + return FastqFileMeta( + lane=parts[1], flow_cell_id="XXXXXX", read_direction=parts[-1].split("/")[-1] + ) + + +def _get_header_meta_casava_ten_parts(parts: list[str]) -> FastqFileMeta: + return FastqFileMeta( + lane=parts[3], flow_cell_id=parts[2], read_direction=parts[6].split(" ")[-1] + ) + + +def _get_header_meta_casava_seven_parts(parts: list[str]) -> FastqFileMeta: + return FastqFileMeta( + lane=parts[3], flow_cell_id=parts[2], read_direction=parts[-1].split("/")[-1] + ) + + +class GetFastqFileMeta: + header_format: dict[int, Callable] = { + 5: _get_header_meta_casava_five_parts, + 7: _get_header_meta_casava_seven_parts, + 10: _get_header_meta_casava_ten_parts, + } diff --git a/pyproject.toml b/pyproject.toml index 3afee23995..689a3e5973 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "cg" -version = "54.4.1" +version = "54.4.6" description = "Clinical Genomics command center" authors = ["Clinical Genomics "] readme = "README.md" diff --git a/tests/apps/mip/conftest.py b/tests/apps/mip/conftest.py index 452b6f841f..79957db827 100644 --- a/tests/apps/mip/conftest.py +++ b/tests/apps/mip/conftest.py @@ -3,11 +3,13 @@ import pytest +from cg.constants import FileExtensions + def create_file(tmpdir, flowcell, lane, read, file_content): """actual file on disk""" - file_name = f"S1_FC000{flowcell}_L00{lane}_R_{read}.fastq.gz" + file_name = f"S1_FC000{flowcell}_L00{lane}_R_{read}{FileExtensions.FASTQ}{FileExtensions.GZIP}" file_path = tmpdir / file_name file_path.write(file_content) return file_path diff --git a/tests/conftest.py b/tests/conftest.py index ddba5c6ba4..9890fd9152 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -312,6 +312,7 @@ def base_config_dict() -> dict: return { "database": "sqlite:///", "madeline_exe": "path/to/madeline", + "tower_binary_path": "path/to/tower", "delivery_path": "path/to/delivery", "flow_cells_dir": "path/to/flow_cells", "demultiplexed_flow_cells_dir": "path/to/demultiplexed_flow_cells_dir", @@ -2411,6 +2412,7 @@ def context_config( "sender_password": "", }, "madeline_exe": "echo", + "tower_binary_path": Path("path", "to", "bin", "tw").as_posix(), "pon_path": str(cg_dir), "backup": { "pdc_archiving_directory": pdc_archiving_directory.dict(), @@ -2558,7 +2560,6 @@ def context_config( "account": "development", "mail_user": "test.email@scilifelab.se", }, - "tower_binary_path": Path("path", "to", "bin", "tw").as_posix(), "tower_pipeline": "raredisease", }, "rnafusion": { @@ -2576,7 +2577,6 @@ def context_config( "account": "development", "mail_user": "test.email@scilifelab.se", }, - "tower_binary_path": Path("path", "to", "bin", "tw").as_posix(), "tower_pipeline": "rnafusion", }, "pigz": {"binary_path": "/bin/pigz"}, @@ -2597,7 +2597,6 @@ def context_config( "account": "development", "mail_user": "taxprofiler.email@scilifelab.se", }, - "tower_binary_path": Path("path", "to", "bin", "tw").as_posix(), "tower_pipeline": "taxprofiler", }, "scout": { @@ -3698,3 +3697,14 @@ def raredisease_context( helpers.add_relationship(status_db, case=case_not_enough_reads, sample=sample_not_enough_reads) return cg_context + + +@pytest.fixture +def fastq_file_meta_raw(flow_cell_name: str) -> dict: + return { + "path": Path("a", f"file{FileExtensions.FASTQ}{FileExtensions.GZIP}"), + "lane": str(1), + "read_direction": str(2), + "flow_cell_id": flow_cell_name, + "undetermined": None, + } diff --git a/tests/fixtures/io/casava_five_parts.fastq.gz b/tests/fixtures/io/casava_five_parts.fastq.gz new file mode 100644 index 0000000000..7b3d02ece8 Binary files /dev/null and b/tests/fixtures/io/casava_five_parts.fastq.gz differ diff --git a/tests/fixtures/io/casava_seven_parts.fastq.gz b/tests/fixtures/io/casava_seven_parts.fastq.gz new file mode 100644 index 0000000000..c7911dccee Binary files /dev/null and b/tests/fixtures/io/casava_seven_parts.fastq.gz differ diff --git a/tests/fixtures/io/casava_ten_parts.fastq.gz b/tests/fixtures/io/casava_ten_parts.fastq.gz new file mode 100644 index 0000000000..2d5e03ccc9 Binary files /dev/null and b/tests/fixtures/io/casava_ten_parts.fastq.gz differ diff --git a/tests/fixtures/io/example.gz b/tests/fixtures/io/example.gz new file mode 100644 index 0000000000..8237ec5a51 Binary files /dev/null and b/tests/fixtures/io/example.gz differ diff --git a/tests/io/conftest.py b/tests/io/conftest.py index ed88ef37e6..209bf0bfd1 100644 --- a/tests/io/conftest.py +++ b/tests/io/conftest.py @@ -106,3 +106,9 @@ def xml_file_path(fixtures_dir: Path) -> Path: def xml_temp_path(cg_dir: Path) -> Path: """Return a temp file path to use when writing xml.""" return Path(cg_dir, "write_xml.xml") + + +@pytest.fixture +def gzip_file_path(fixtures_dir: Path) -> Path: + """Return a file path to example Gzip file.""" + return Path(fixtures_dir, "io", "example.gz") diff --git a/tests/io/test_io_gzip.py b/tests/io/test_io_gzip.py new file mode 100644 index 0000000000..bcc24e88e6 --- /dev/null +++ b/tests/io/test_io_gzip.py @@ -0,0 +1,19 @@ +from pathlib import Path + +from cg.io.gzip import read_gzip_first_line + + +def test_read_gzip_first_line(gzip_file_path: Path): + """ + Test reading first line from a gzip file into a string. + """ + # GIVEN a gzip file + + # WHEN reading the file + line: str = read_gzip_first_line(file_path=gzip_file_path) + + # THEN assert a str is returned + assert isinstance(line, str) + + # THEN the content should match the expected line + assert line == "- ipsum, sit, amet" diff --git a/tests/meta/archive/conftest.py b/tests/meta/archive/conftest.py index dc260eb6ee..993d90eeed 100644 --- a/tests/meta/archive/conftest.py +++ b/tests/meta/archive/conftest.py @@ -16,13 +16,9 @@ from cg.constants.subject import Gender from cg.io.controller import WriteStream from cg.meta.archive.archive import SpringArchiveAPI -from cg.meta.archive.ddn_dataflow import ( - ROOT_TO_TRIM, - AuthToken, - DDNDataFlowClient, - MiriaObject, - TransferPayload, -) +from cg.meta.archive.ddn.constants import ROOT_TO_TRIM +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient +from cg.meta.archive.ddn.models import AuthToken, MiriaObject, TransferPayload from cg.meta.archive.models import FileAndSample from cg.models.cg_config import CGConfig, DataFlowConfig from cg.store import Store @@ -143,7 +139,7 @@ def ddn_dataflow_client(ddn_dataflow_config: DataFlowConfig) -> DDNDataFlowClien }, ).encode() with mock.patch( - "cg.meta.archive.ddn_dataflow.APIRequest.api_request_from_content", + "cg.meta.archive.ddn.ddn_data_flow_client.APIRequest.api_request_from_content", return_value=mock_ddn_auth_success_response, ): return DDNDataFlowClient(ddn_dataflow_config) diff --git a/tests/meta/archive/test_archive_api.py b/tests/meta/archive/test_archive_api.py index 8defc705ba..5e68495bef 100644 --- a/tests/meta/archive/test_archive_api.py +++ b/tests/meta/archive/test_archive_api.py @@ -9,14 +9,16 @@ from cg.constants.housekeeper_tags import SequencingFileTag from cg.io.controller import APIRequest from cg.meta.archive.archive import ARCHIVE_HANDLERS, FileAndSample, SpringArchiveAPI -from cg.meta.archive.ddn_dataflow import ( +from cg.meta.archive.ddn.constants import ( FAILED_JOB_STATUSES, ONGOING_JOB_STATUSES, + JobStatus, +) +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient +from cg.meta.archive.ddn.models import ( AuthToken, - DDNDataFlowClient, GetJobStatusPayload, GetJobStatusResponse, - JobStatus, MiriaObject, ) from cg.meta.archive.models import ArchiveHandler, FileTransferData diff --git a/tests/meta/archive/test_archive_cli.py b/tests/meta/archive/test_archive_cli.py index 8f78d4336a..497775045a 100644 --- a/tests/meta/archive/test_archive_cli.py +++ b/tests/meta/archive/test_archive_cli.py @@ -10,16 +10,18 @@ from cg.constants import EXIT_SUCCESS, SequencingFileTag from cg.constants.archiving import ArchiveLocations from cg.io.controller import APIRequest -from cg.meta.archive.ddn_dataflow import ( +from cg.meta.archive.ddn.constants import ( FAILED_JOB_STATUSES, ONGOING_JOB_STATUSES, + JobStatus, +) +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient +from cg.meta.archive.ddn.models import ( AuthToken, - DDNDataFlowClient, GetJobStatusPayload, GetJobStatusResponse, - JobStatus, - TransferJob, TransferPayload, + TransferResponse, ) from cg.models.cg_config import CGConfig @@ -74,7 +76,7 @@ def test_archive_spring_files_success( "api_request_from_content", return_value=ok_miria_response, ), mock.patch.object( - TransferPayload, "post_request", return_value=TransferJob(jobId=archival_job_id) + TransferPayload, "post_request", return_value=TransferResponse(jobId=archival_job_id) ): result = cli_runner.invoke( archive_spring_files, diff --git a/tests/meta/archive/test_archiving.py b/tests/meta/archive/test_archiving.py index 4a1b0ddb94..df82702265 100644 --- a/tests/meta/archive/test_archiving.py +++ b/tests/meta/archive/test_archiving.py @@ -9,22 +9,21 @@ from cg.constants.constants import APIMethods, FileFormat from cg.exc import DdnDataflowAuthenticationError from cg.io.controller import APIRequest, WriteStream -from cg.meta.archive.ddn_dataflow import ( +from cg.meta.archive.ddn.constants import ( DESTINATION_ATTRIBUTE, OSTYPE, ROOT_TO_TRIM, SOURCE_ATTRIBUTE, DataflowEndpoints, - DDNDataFlowClient, - MiriaObject, - TransferPayload, ) +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient +from cg.meta.archive.ddn.models import MiriaObject, TransferPayload from cg.meta.archive.models import FileAndSample, SampleAndDestination from cg.models.cg_config import DataFlowConfig from cg.store import Store from cg.store.models import Sample -FUNCTION_TO_MOCK = "cg.meta.archive.ddn_dataflow.APIRequest.api_request_from_content" +FUNCTION_TO_MOCK = "cg.meta.archive.ddn.ddn_data_flow_client.APIRequest.api_request_from_content" def test_correct_source_root( @@ -168,7 +167,7 @@ def test_ddn_dataflow_client_initialization_invalid_credentials( # WHEN initializing the DDNDataFlowClient class with the invalid credentials with mock.patch( - "cg.meta.archive.ddn_dataflow.APIRequest.api_request_from_content", + "cg.meta.archive.ddn.ddn_data_flow_client.APIRequest.api_request_from_content", return_value=unauthorized_response, ): # THEN an exception should be raised diff --git a/tests/meta/workflow/test_analysis.py b/tests/meta/workflow/test_analysis.py index 15dd3ea530..d7ac852bba 100644 --- a/tests/meta/workflow/test_analysis.py +++ b/tests/meta/workflow/test_analysis.py @@ -1,5 +1,5 @@ """Test for analysis""" - +import logging from datetime import datetime import mock @@ -13,8 +13,9 @@ from cg.meta.workflow.mip import MipAnalysisAPI from cg.meta.workflow.mip_dna import MipDNAAnalysisAPI from cg.meta.workflow.prepare_fastq import PrepareFastqAPI +from cg.models.fastq import FastqFileMeta from cg.store import Store -from cg.store.models import Case +from cg.store.models import Case, Sample @pytest.mark.parametrize( @@ -390,3 +391,29 @@ def test_prepare_fastq_files_decompression_running( # WHEN running prepare_fastq_files # THEN an AnalysisNotReadyError should be thrown mip_analysis_api.prepare_fastq_files(case_id=case.internal_id, dry_run=False) + + +def test_link_fastq_files_for_sample( + analysis_store: Store, + caplog, + mip_analysis_api: MipDNAAnalysisAPI, + fastq_file_meta_raw: dict, + mocker, +): + caplog.set_level(logging.INFO) + # GIVEN a case + case: Case = analysis_store.get_cases()[0] + + # GIVEN a sample + sample: Sample = case.links[0].sample + + with mocker.patch.object( + AnalysisAPI, + "gather_file_metadata_for_sample", + return_value=[FastqFileMeta.model_validate(fastq_file_meta_raw)], + ): + # WHEN parsing header + mip_analysis_api.link_fastq_files_for_sample(case=case, sample=sample) + + # THEN broadcast linking of files + assert "Linking: " in caplog.text diff --git a/tests/meta/workflow/test_fastq.py b/tests/meta/workflow/test_fastq.py new file mode 100644 index 0000000000..359a42af08 --- /dev/null +++ b/tests/meta/workflow/test_fastq.py @@ -0,0 +1,96 @@ +from pathlib import Path + +import pytest + +from cg.meta.workflow.fastq import FastqHandler +from cg.models.fastq import FastqFileMeta + + +@pytest.mark.parametrize( + "fastq_header, expected_header_meta", + [ + ( + "@HWUSI-EAS100R:6:73:941:1973#0/1", + FastqFileMeta(lane=6, flow_cell_id="XXXXXX", read_direction=1), + ), + ( + "@EAS139:136:FC706VJ:2:2104:15343:197393 1:Y:18:ATCACG", + FastqFileMeta(lane=2, flow_cell_id="FC706VJ", read_direction=1), + ), + ( + "@ST-E00201:173:HCLCGALXX:1:2106:22516:34834/1", + FastqFileMeta(lane=1, flow_cell_id="HCLCGALXX", read_direction=1), + ), + ], +) +def test_parse_fastq_header(fastq_header: str, expected_header_meta: dict): + # GIVEN a FASTQ header + + # WHEN parsing header + header_meta: FastqFileMeta = FastqHandler.parse_fastq_header(line=fastq_header) + + # THEN header meta should match the expected header information + assert header_meta == expected_header_meta + + +@pytest.mark.parametrize( + "fastq_header, expected_error", + [ + ("no match", TypeError), + ("no:match", TypeError), + ("", TypeError), + ], +) +def test_parse_fastq_header_when_no_match(fastq_header: str, expected_error): + # GIVEN no matching FASTQ header + + with pytest.raises(expected_error): + # WHEN parsing header + # THEN raise error + FastqHandler.parse_fastq_header(line=fastq_header) + + +@pytest.mark.parametrize( + "fastq_path, expected_fastq_meta", + [ + ( + Path("tests", "fixtures", "io", "casava_five_parts.fastq.gz"), + FastqFileMeta( + path=Path("tests", "fixtures", "io", "casava_five_parts.fastq.gz"), + lane=6, + flow_cell_id="XXXXXX", + read_direction=1, + undetermined=None, + ), + ), + ( + Path("tests", "fixtures", "io", "casava_ten_parts.fastq.gz"), + FastqFileMeta( + path=Path("tests", "fixtures", "io", "casava_ten_parts.fastq.gz"), + lane=2, + flow_cell_id="FC706VJ", + read_direction=1, + undetermined=None, + ), + ), + ( + Path("tests", "fixtures", "io", "casava_seven_parts.fastq.gz"), + FastqFileMeta( + path=Path("tests", "fixtures", "io", "casava_seven_parts.fastq.gz"), + lane=1, + flow_cell_id="HCLCGALXX", + read_direction=1, + undetermined=None, + ), + ), + ], +) +def test_parse_file_data(fastq_path: Path, expected_fastq_meta: dict, mocker): + # GIVEN a FASTQ file + + with mocker.patch("cg.meta.workflow.fastq._is_undetermined_in_path", return_value=None): + # WHEN parsing header + header_meta = FastqHandler.parse_file_data(fastq_path=fastq_path) + + # THEN header meta should match the expected header information + assert header_meta == expected_fastq_meta diff --git a/tests/models/test_fastq.py b/tests/models/test_fastq.py new file mode 100644 index 0000000000..7da9d8db47 --- /dev/null +++ b/tests/models/test_fastq.py @@ -0,0 +1,23 @@ +from cg.models.fastq import FastqFileMeta + + +def test_instantiate_fastq_file_meta(fastq_file_meta_raw: dict): + # GIVEN a dictionary with fastq info + + # WHEN instantiating object + fast_file_meta = FastqFileMeta.model_validate(fastq_file_meta_raw) + + # THEN assert that it was successfully created + assert isinstance(fast_file_meta, FastqFileMeta) + + +def test_fastq_file_mets_convert_to_int(fastq_file_meta_raw: dict): + # GIVEN a dictionary with fastq info + + # WHEN instantiating object + fast_file_meta = FastqFileMeta.model_validate(fastq_file_meta_raw) + + # THEN assert that str is converted to int + assert isinstance(fast_file_meta.lane, int) + + assert isinstance(fast_file_meta.read_direction, int)