Skip to content

Commit

Permalink
Refactor to pep 0604 (#2706)
Browse files Browse the repository at this point in the history
### Changed

- Refactor to pep 0604
  • Loading branch information
henrikstranneheim authored Nov 27, 2023
1 parent 7a343ff commit 3bd0c3e
Show file tree
Hide file tree
Showing 180 changed files with 1,108 additions and 1,288 deletions.
3 changes: 1 addition & 2 deletions cg/apps/coverage/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Chanjo API"""
import logging
import tempfile
from typing import Optional

from cg.constants.constants import FileFormat
from cg.io.controller import ReadStream
Expand Down Expand Up @@ -41,7 +40,7 @@ def upload(

self.process.run_command(parameters=load_parameters)

def sample(self, sample_id: str) -> Optional[dict]:
def sample(self, sample_id: str) -> dict | None:
"""Fetch sample from the database"""

sample_parameters = ["db", "samples", "-s", sample_id]
Expand Down
5 changes: 2 additions & 3 deletions cg/apps/crunchy/crunchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import datetime
import logging
from pathlib import Path
from typing import Optional

from cg.apps.crunchy import files
from cg.apps.crunchy.models import CrunchyFile, CrunchyMetadata
Expand All @@ -33,7 +32,7 @@ class CrunchyAPI:
"""

def __init__(self, config: dict):
self.conda_binary: Optional[str] = config["crunchy"]["conda_binary"] or None
self.conda_binary: str | None = config["crunchy"]["conda_binary"] or None
self.crunchy_env: str = config["crunchy"]["slurm"]["conda_env"]
self.dry_run: bool = False
self.reference_path: str = config["crunchy"]["cram_reference"]
Expand Down Expand Up @@ -153,7 +152,7 @@ def is_fastq_compression_done(compression: CompressionData) -> bool:
)

# Check if the SPRING archive has been unarchived
updated_at: Optional[datetime.date] = files.get_file_updated_at(crunchy_metadata)
updated_at: datetime.date | None = files.get_file_updated_at(crunchy_metadata)
if updated_at is None:
LOG.info(f"FASTQ compression is done for {compression.run_name}")
return True
Expand Down
5 changes: 2 additions & 3 deletions cg/apps/crunchy/files.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
import tempfile
from datetime import datetime
from datetime import date, datetime
from json.decoder import JSONDecodeError
from pathlib import Path
from typing import Optional

from cg.apps.crunchy.models import CrunchyFile, CrunchyMetadata
from cg.constants.constants import FileFormat
Expand Down Expand Up @@ -64,7 +63,7 @@ def get_crunchy_metadata(metadata_path: Path) -> CrunchyMetadata:
return metadata


def get_file_updated_at(crunchy_metadata: CrunchyMetadata) -> Optional[datetime.date]:
def get_file_updated_at(crunchy_metadata: CrunchyMetadata) -> date | None:
"""Check if a SPRING metadata file has been updated and return the date when updated"""
return crunchy_metadata.files[0].updated

Expand Down
7 changes: 3 additions & 4 deletions cg/apps/crunchy/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import date
from typing import Optional

from pydantic import BaseModel, conlist
from typing_extensions import Literal
Expand All @@ -8,9 +7,9 @@
class CrunchyFile(BaseModel):
path: str
file: Literal["first_read", "second_read", "spring"]
checksum: Optional[str] = None
algorithm: Optional[Literal["sha1", "md5", "sha256"]] = None
updated: Optional[date] = None
checksum: str | None = None
algorithm: Literal["sha1", "md5", "sha256"] | None = None
updated: date | None = None


class CrunchyMetadata(BaseModel):
Expand Down
5 changes: 1 addition & 4 deletions cg/apps/demultiplex/demultiplex_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""This api should handle everything around demultiplexing."""
import logging
from pathlib import Path
from typing import Optional

from typing_extensions import Literal

Expand All @@ -28,9 +27,7 @@ class DemultiplexingAPI:
This includes starting demultiplexing, creating sample sheets, creating base masks,
"""

def __init__(
self, config: dict, housekeeper_api: HousekeeperAPI, out_dir: Optional[Path] = None
):
def __init__(self, config: dict, housekeeper_api: HousekeeperAPI, out_dir: Path | None = None):
self.slurm_api = SlurmAPI()
self.hk_api = housekeeper_api
self.slurm_account: str = config["demultiplex"]["slurm"]["account"]
Expand Down
3 changes: 1 addition & 2 deletions cg/apps/demultiplex/sample_sheet/index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Functions that deal with modifications of the indexes."""
import logging
from typing import Union

from packaging import version
from pydantic import BaseModel
Expand Down Expand Up @@ -219,7 +218,7 @@ def pad_and_reverse_complement_sample_indexes(


def update_indexes_for_samples(
samples: list[Union[FlowCellSampleBCLConvert, FlowCellSampleBcl2Fastq]],
samples: list[FlowCellSampleBCLConvert | FlowCellSampleBcl2Fastq],
index_cycles: int,
is_reverse_complement: bool,
) -> None:
Expand Down
20 changes: 9 additions & 11 deletions cg/apps/demultiplex/sample_sheet/sample_sheet_creator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
""" Create a sample sheet for NovaSeq flow cells."""
import logging
from typing import Optional, Type, Union
from typing import Type

from cg.apps.demultiplex.sample_sheet.index import (
Index,
Expand Down Expand Up @@ -37,17 +37,15 @@ class SampleSheetCreator:
def __init__(
self,
flow_cell: FlowCellDirectoryData,
lims_samples: list[Union[FlowCellSampleBCLConvert, FlowCellSampleBcl2Fastq]],
lims_samples: list[FlowCellSampleBCLConvert | FlowCellSampleBcl2Fastq],
force: bool = False,
):
self.flow_cell: FlowCellDirectoryData = flow_cell
self.flow_cell_id: str = flow_cell.id
self.lims_samples: list[
Union[FlowCellSampleBCLConvert, FlowCellSampleBcl2Fastq]
] = lims_samples
self.lims_samples: list[FlowCellSampleBCLConvert | FlowCellSampleBcl2Fastq] = lims_samples
self.run_parameters: RunParameters = flow_cell.run_parameters
self.sample_type: Type[
Union[FlowCellSampleBCLConvert, FlowCellSampleBcl2Fastq]
FlowCellSampleBCLConvert | FlowCellSampleBcl2Fastq
] = flow_cell.sample_type
self.force: bool = force

Expand Down Expand Up @@ -79,7 +77,7 @@ def remove_unwanted_samples(self) -> None:
"""Filter out samples with single indexes."""
LOG.info("Removing all samples without dual indexes")
samples_to_keep = []
sample: Union[FlowCellSampleBCLConvert, FlowCellSampleBcl2Fastq]
sample: FlowCellSampleBCLConvert | FlowCellSampleBcl2Fastq
for sample in self.lims_samples:
if not is_dual_index(sample.index):
LOG.warning(f"Removing sample {sample} since it does not have dual index")
Expand All @@ -89,19 +87,19 @@ def remove_unwanted_samples(self) -> None:

@staticmethod
def convert_sample_to_header_dict(
sample: Union[FlowCellSampleBCLConvert, FlowCellSampleBcl2Fastq],
sample: FlowCellSampleBCLConvert | FlowCellSampleBcl2Fastq,
data_column_names: list[str],
) -> list[str]:
"""Convert a lims sample object to a list that corresponds to the sample sheet headers."""
LOG.debug(f"Use sample sheet header {data_column_names}")
sample_dict = sample.model_dump(by_alias=True)
return [str(sample_dict[column]) for column in data_column_names]

def get_additional_sections_sample_sheet(self) -> Optional[list]:
def get_additional_sections_sample_sheet(self) -> list | None:
"""Return all sections of the sample sheet that are not the data section."""
raise NotImplementedError("Impossible to get sample sheet sections from parent class")

def get_data_section_header_and_columns(self) -> Optional[list[list[str]]]:
def get_data_section_header_and_columns(self) -> list[list[str]] | None:
"""Return the header and column names of the data section of the sample sheet."""
raise NotImplementedError("Impossible to get sample sheet sections from parent class")

Expand All @@ -123,7 +121,7 @@ def create_sample_sheet_content(self) -> list[list[str]]:
def process_samples_for_sample_sheet(self) -> None:
"""Remove unwanted samples and adapt remaining samples."""
self.remove_unwanted_samples()
samples_in_lane: list[Union[FlowCellSampleBCLConvert, FlowCellSampleBcl2Fastq]]
samples_in_lane: list[FlowCellSampleBCLConvert | FlowCellSampleBcl2Fastq]
self.add_override_cycles_to_samples()
for lane, samples_in_lane in get_samples_by_lane(self.lims_samples).items():
LOG.info(f"Adapting index and barcode mismatch values for samples in lane {lane}")
Expand Down
9 changes: 4 additions & 5 deletions cg/apps/hermes/hermes_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional

from cg.apps.housekeeper import models as hk_models
from cg.utils.commands import Process
Expand All @@ -18,7 +17,7 @@ def __init__(self, config: dict):
self.process = Process(binary=config["hermes"]["binary_path"])

def convert_deliverables(
self, deliverables_file: Path, pipeline: str, analysis_type: Optional[str] = None
self, deliverables_file: Path, pipeline: str, analysis_type: str | None = None
) -> CGDeliverables:
"""Convert deliverables file in raw pipeline format to CG format with hermes"""
LOG.info("Converting pipeline deliverables to CG deliverables")
Expand All @@ -40,8 +39,8 @@ def create_housekeeper_bundle(
bundle_name: str,
deliverables: Path,
pipeline: str,
analysis_type: Optional[str],
created: Optional[datetime],
analysis_type: str | None,
created: datetime | None,
) -> hk_models.InputBundle:
"""Convert pipeline deliverables to housekeeper bundle ready to be inserted into hk"""
cg_deliverables: CGDeliverables = self.convert_deliverables(
Expand All @@ -53,7 +52,7 @@ def create_housekeeper_bundle(

@staticmethod
def get_housekeeper_bundle(
deliverables: CGDeliverables, bundle_name: str, created: Optional[datetime] = None
deliverables: CGDeliverables, bundle_name: str, created: datetime | None = None
) -> hk_models.InputBundle:
"""Convert a deliverables object to a housekeeper object"""
bundle_info = {
Expand Down
3 changes: 1 addition & 2 deletions cg/apps/hermes/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Models used by hermes <-> cg interactions"""
import logging
from pathlib import Path
from typing import Optional

from pydantic import BaseModel, field_validator

Expand All @@ -15,7 +14,7 @@ class CGTag(BaseModel):

path: str
tags: list[str]
mandatory: Optional[bool] = False
mandatory: bool | None = False


class CGDeliverables(BaseModel):
Expand Down
37 changes: 17 additions & 20 deletions cg/apps/housekeeper/hk.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import os
from pathlib import Path
from typing import Optional

from housekeeper.include import checksum as hk_checksum
from housekeeper.include import include_version
Expand Down Expand Up @@ -77,7 +76,7 @@ def new_file(
tags = []
return self._store.new_file(path, checksum, to_archive, tags)

def get_file(self, file_id: int) -> Optional[File]:
def get_file(self, file_id: int) -> File | None:
"""Get a file based on file id."""
LOG.info(f"Return file: {file_id}")
file_obj: File = self._store.get_file_by_id(file_id=file_id)
Expand All @@ -86,7 +85,7 @@ def get_file(self, file_id: int) -> Optional[File]:
return None
return file_obj

def delete_file(self, file_id: int) -> Optional[File]:
def delete_file(self, file_id: int) -> File | None:
"""Delete a file both from database and disk (if included)."""
file_obj: File = self.get_file(file_id)
if not file_obj:
Expand Down Expand Up @@ -136,7 +135,7 @@ def files(
bundle_name=bundle, tag_names=tags, version_id=version, file_path=path
)

def get_file_insensitive_path(self, path: Path) -> Optional[File]:
def get_file_insensitive_path(self, path: Path) -> File | None:
"""Returns a file in Housekeeper with a path that matches the given path, insensitive to whether the paths
are included or not."""
file: File = self.files(path=path.as_posix())
Expand All @@ -148,7 +147,7 @@ def get_file_insensitive_path(self, path: Path) -> Optional[File]:
return file

@staticmethod
def get_files_from_version(version: Version, tags: set[str]) -> Optional[list[File]]:
def get_files_from_version(version: Version, tags: set[str]) -> list[File] | None:
"""Return a list of files associated with the given version and tags."""
LOG.debug(f"Getting files from version with tags {tags}")
files: list[File] = []
Expand All @@ -162,13 +161,13 @@ def get_files_from_version(version: Version, tags: set[str]) -> Optional[list[Fi
return files

@staticmethod
def get_file_from_version(version: Version, tags: set[str]) -> Optional[File]:
def get_file_from_version(version: Version, tags: set[str]) -> File | None:
"""Return the first file matching the given tags."""
files: list[File] = HousekeeperAPI.get_files_from_version(version=version, tags=tags)
return files[0] if files else None

@staticmethod
def get_latest_file_from_version(version: Version, tags: set[str]) -> Optional[File]:
def get_latest_file_from_version(version: Version, tags: set[str]) -> File | None:
"""Return the latest file from Housekeeper given its version and tags."""
files: list[File] = HousekeeperAPI.get_files_from_version(version=version, tags=tags)
return sorted(files, key=lambda file_obj: file_obj.id)[-1] if files else None
Expand All @@ -181,17 +180,15 @@ def session_no_autoflush(self):
"""Wrap property in Housekeeper Store."""
return self._store.session.no_autoflush

def get_files(
self, bundle: str, tags: Optional[list] = None, version: Optional[int] = None
) -> Query:
def get_files(self, bundle: str, tags: list | None = None, version: int | None = None) -> Query:
"""Get all the files in housekeeper, optionally filtered by bundle and/or tags and/or
version.
"""
return self._store.get_files(bundle_name=bundle, tag_names=tags, version_id=version)

def get_latest_file(
self, bundle: str, tags: Optional[list] = None, version: Optional[int] = None
) -> Optional[File]:
self, bundle: str, tags: list | None = None, version: int | None = None
) -> File | None:
"""Return latest file from Housekeeper, filtered by bundle and/or tags and/or version."""
files: Query = self._store.get_files(bundle_name=bundle, tag_names=tags, version_id=version)
return files.order_by(File.id.desc()).first()
Expand All @@ -201,7 +198,7 @@ def check_bundle_files(
bundle_name: str,
file_paths: list[Path],
last_version: Version,
tags: Optional[list] = None,
tags: list | None = None,
) -> list[Path]:
"""Checks if any of the files in the provided list are already added to the provided
bundle. Returns a list of files that have not been added."""
Expand Down Expand Up @@ -271,7 +268,7 @@ def get_all_non_archived_spring_files(self) -> list[File]:
"""Return all spring files which are not marked as archived in Housekeeper."""
return self._store.get_all_non_archived_files(tag_names=[SequencingFileTag.SPRING])

def get_latest_bundle_version(self, bundle_name: str) -> Optional[Version]:
def get_latest_bundle_version(self, bundle_name: str) -> Version | None:
"""Get the latest version of a Housekeeper bundle."""
last_version: Version = self.last_version(bundle_name)
if not last_version:
Expand Down Expand Up @@ -382,7 +379,7 @@ def include_files_to_latest_version(self, bundle_name: str) -> None:
bundle_version.included_at = dt.datetime.now()
self.commit()

def get_file_from_latest_version(self, bundle_name: str, tags: set[str]) -> Optional[File]:
def get_file_from_latest_version(self, bundle_name: str, tags: set[str]) -> File | None:
"""Return a file in the latest version of a bundle."""
version: Version = self.last_version(bundle=bundle_name)
if not version:
Expand Down Expand Up @@ -411,7 +408,7 @@ def is_fastq_or_spring_in_all_bundles(self, bundle_names: list[str]) -> bool:
sequencing_files_in_hk[bundle_name] = False
for tag in [SequencingFileTag.FASTQ, SequencingFileTag.SPRING_METADATA]:
sample_file_in_hk: list[bool] = []
hk_files: Optional[list[File]] = self.get_files_from_latest_version(
hk_files: list[File] | None = self.get_files_from_latest_version(
bundle_name=bundle_name, tags=[tag]
)
sample_file_in_hk += [True for hk_file in hk_files if hk_file.is_included]
Expand All @@ -422,18 +419,18 @@ def is_fastq_or_spring_in_all_bundles(self, bundle_names: list[str]) -> bool:
)
return all(sequencing_files_in_hk.values())

def get_non_archived_files(self, bundle_name: str, tags: Optional[list] = None) -> list[File]:
def get_non_archived_files(self, bundle_name: str, tags: list | None = None) -> list[File]:
"""Returns all non-archived_files from a given bundle, tagged with the given tags"""
return self._store.get_non_archived_files(bundle_name=bundle_name, tags=tags or [])

def get_archived_files(self, bundle_name: str, tags: Optional[list] = None) -> list[File]:
def get_archived_files(self, bundle_name: str, tags: list | None = None) -> list[File]:
"""Returns all archived_files from a given bundle, tagged with the given tags"""
return self._store.get_archived_files(bundle_name=bundle_name, tags=tags or [])

def add_archives(self, files: list[Path], archive_task_id: int) -> None:
"""Creates an archive object for the given files, and adds the archive task id to them."""
for file in files:
archived_file: Optional[File] = self._store.get_files(file_path=file.as_posix()).first()
archived_file: File | None = self._store.get_files(file_path=file.as_posix()).first()
if not archived_file:
raise HousekeeperFileMissingError(f"No file in housekeeper with the path {file}")
archive: Archive = self._store.create_archive(
Expand All @@ -451,7 +448,7 @@ def is_fastq_or_spring_on_disk_in_all_bundles(self, bundle_names: list[str]) ->
sequencing_files_on_disk[bundle_name] = False
for tag in [SequencingFileTag.FASTQ, SequencingFileTag.SPRING_METADATA]:
sample_file_on_disk: list[bool] = []
hk_files: Optional[list[File]] = self.get_files_from_latest_version(
hk_files: list[File] | None = self.get_files_from_latest_version(
bundle_name=bundle_name, tags=[tag]
)
sample_file_on_disk += [
Expand Down
Loading

0 comments on commit 3bd0c3e

Please sign in to comment.