Skip to content

Commit

Permalink
Concatenate fastq files for microsalt deliveries (#2951)
Browse files Browse the repository at this point in the history
  • Loading branch information
seallard authored Feb 22, 2024
1 parent d27007c commit f8d3bfc
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 5 deletions.
2 changes: 2 additions & 0 deletions cg/cli/deliver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from cg.meta.deliver_ticket import DeliverTicketAPI
from cg.meta.rsync.rsync_api import RsyncAPI
from cg.models.cg_config import CGConfig
from cg.services.fastq_file_service.fastq_file_service import FastqFileService
from cg.store.models import Case
from cg.store.store import Store

Expand Down Expand Up @@ -93,6 +94,7 @@ def deliver_analysis(
delivery_type=delivery,
force_all=force_all,
ignore_missing_bundles=ignore_missing_bundles,
fastq_file_service=FastqFileService(),
)
deliver_api.set_dry_run(dry_run)
cases: list[Case] = []
Expand Down
2 changes: 2 additions & 0 deletions cg/cli/upload/clinical_delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cg.constants.tb import AnalysisTypes
from cg.meta.deliver import DeliverAPI
from cg.meta.rsync import RsyncAPI
from cg.services.fastq_file_service.fastq_file_service import FastqFileService
from cg.store.models import Case
from cg.store.store import Store

Expand Down Expand Up @@ -52,6 +53,7 @@ def upload_clinical_delivery(context: click.Context, case_id: str, dry_run: bool
sample_tags=PIPELINE_ANALYSIS_TAG_MAP[delivery_type]["sample_tags"],
delivery_type=delivery_type,
project_base_path=Path(context.obj.delivery_path),
fastq_file_service=FastqFileService(),
).deliver_files(case_obj=case)

rsync_api: RsyncAPI = RsyncAPI(context.obj)
Expand Down
29 changes: 24 additions & 5 deletions cg/meta/deliver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.constants import delivery as constants
from cg.constants.constants import DataDelivery
from cg.constants.constants import DataDelivery, Workflow
from cg.exc import MissingFilesError
from cg.services.fastq_file_service.fastq_file_service import FastqFileService
from cg.store.models import Case, CaseSample, Sample
from cg.store.store import Store

Expand All @@ -29,6 +30,7 @@ def __init__(
sample_tags: list[set[str]],
project_base_path: Path,
delivery_type: str,
fastq_file_service: FastqFileService,
force_all: bool = False,
ignore_missing_bundles: bool = False,
):
Expand All @@ -55,6 +57,7 @@ def __init__(
self.delivery_type in constants.SKIP_MISSING or ignore_missing_bundles
)
self.deliver_failed_samples = force_all
self.fastq_file_service = fastq_file_service

def set_dry_run(self, dry_run: bool) -> None:
"""Update dry run."""
Expand Down Expand Up @@ -112,8 +115,7 @@ def deliver_files(self, case_obj: Case):
continue
raise SyntaxError(f"Could not find any version for {sample_id}")
self.deliver_sample_files(
case_id=case_id,
case_name=case_name,
case=case_obj,
sample_id=sample_id,
sample_name=sample_name,
version_obj=last_version,
Expand Down Expand Up @@ -163,14 +165,16 @@ def deliver_case_files(

def deliver_sample_files(
self,
case_id: str,
case_name: str,
case: Case,
sample_id: str,
sample_name: str,
version_obj: Version,
) -> None:
"""Deliver files on sample level."""
# Make sure that the directory exists
case_name: str = case.name
case_id: str = case.internal_id

if self.delivery_type in constants.ONLY_ONE_CASE_PER_TICKET:
case_name = None
delivery_base: Path = self.create_delivery_dir_path(
Expand Down Expand Up @@ -210,6 +214,21 @@ def deliver_sample_files(
f"There were {number_previously_linked_files} previously linked files and {number_linked_files_now} were linked for sample {sample_id}, case {case_id}"
)

if self.delivery_type == Workflow.FASTQ and case.data_analysis == Workflow.MICROSALT:
self.concatenate_fastqs(sample_directory=delivery_base, sample_name=sample_name)

def concatenate_fastqs(self, sample_directory: Path, sample_name: str):
if self.dry_run:
return
forward_out_path = Path(sample_directory, f"{sample_name}_R1.fastq.gz")
reverse_out_path = Path(sample_directory, f"{sample_name}_R2.fastq.gz")
self.fastq_file_service.concatenate(
fastq_directory=sample_directory,
forward_output=forward_out_path,
reverse_output=reverse_out_path,
remove_raw=True,
)

def get_case_files_from_version(self, version: Version, sample_ids: set[str]) -> Iterable[Path]:
"""Fetch all case files from a version that are tagged with any of the case tags."""

Expand Down
Empty file.
10 changes: 10 additions & 0 deletions cg/services/fastq_file_service/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class FastqServiceError(Exception):
pass


class ConcatenationError(FastqServiceError):
pass


class InvalidFastqDirectory(FastqServiceError):
pass
33 changes: 33 additions & 0 deletions cg/services/fastq_file_service/fastq_file_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pathlib import Path

from cg.services.fastq_file_service.utils import (
concatenate_forward_reads,
concatenate_reverse_reads,
remove_raw_fastqs,
)


class FastqFileService:

def concatenate(
self,
fastq_directory: Path,
forward_output: Path,
reverse_output: Path,
remove_raw: bool = False,
):
temp_forward: Path | None = concatenate_forward_reads(fastq_directory)
temp_reverse: Path | None = concatenate_reverse_reads(fastq_directory)

if remove_raw:
remove_raw_fastqs(
fastq_directory=fastq_directory,
forward_file=temp_forward,
reverse_file=temp_reverse,
)

if temp_forward:
temp_forward.rename(forward_output)

if temp_reverse:
temp_reverse.rename(reverse_output)
80 changes: 80 additions & 0 deletions cg/services/fastq_file_service/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from pathlib import Path
import re
import shutil
import uuid

from cg.services.fastq_file_service.exceptions import ConcatenationError


def concatenate_forward_reads(directory: Path) -> Path | None:
fastqs: list[Path] = get_forward_read_fastqs(directory)
if not fastqs:
return
output_file: Path = get_new_unique_file(directory)
concatenate(input_files=fastqs, output_file=output_file)
validate_concatenation(input_files=fastqs, output_file=output_file)
return output_file


def concatenate_reverse_reads(directory: Path) -> Path | None:
fastqs: list[Path] = get_reverse_read_fastqs(directory)
if not fastqs:
return
file: Path = get_new_unique_file(directory)
concatenate(input_files=fastqs, output_file=file)
validate_concatenation(input_files=fastqs, output_file=file)
return file


def get_new_unique_file(directory: Path) -> Path:
unique_id = uuid.uuid4()
return Path(directory, f"{unique_id}.fastq.gz")


def get_forward_read_fastqs(fastq_directory: Path) -> list[Path]:
return get_fastqs_by_direction(fastq_directory=fastq_directory, direction=1)


def get_reverse_read_fastqs(fastq_directory: Path) -> list[Path]:
return get_fastqs_by_direction(fastq_directory=fastq_directory, direction=2)


def get_fastqs_by_direction(fastq_directory: Path, direction: int) -> list[Path]:
pattern = f".+_R{direction}_[0-9]+.fastq.gz"
fastqs: list[Path] = []
for file in fastq_directory.iterdir():
if re.match(pattern, file.name):
fastqs.append(file)
return sort_files_by_name(fastqs)


def get_total_size(files: list[Path]) -> int:
return sum(file.stat().st_size for file in files)


def concatenate(input_files: list[Path], output_file: Path) -> None:
with open(output_file, "wb") as write_file_obj:
for file in input_files:
with open(file, "rb") as file_descriptor:
shutil.copyfileobj(file_descriptor, write_file_obj)


def validate_concatenation(input_files: list[Path], output_file: Path) -> None:
total_size: int = get_total_size(input_files)
concatenated_size: int = get_total_size([output_file])
if total_size != concatenated_size:
raise ConcatenationError


def sort_files_by_name(files: list[Path]) -> list[Path]:
return sorted(files, key=lambda file: file.name)


def file_can_be_removed(file: Path, forward_file: Path, reverse_file: Path) -> bool:
return file.suffix == ".gz" and file != forward_file and file != reverse_file


def remove_raw_fastqs(fastq_directory: Path, forward_file: Path, reverse_file: Path) -> None:
for file in fastq_directory.iterdir():
if file_can_be_removed(file=file, forward_file=forward_file, reverse_file=reverse_file):
file.unlink()
2 changes: 2 additions & 0 deletions tests/cli/upload/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from cg.meta.workflow.mip_dna import MipDNAAnalysisAPI
from cg.models.cg_config import CGConfig
from cg.models.scout.scout_load_config import ScoutLoadConfig
from cg.services.fastq_file_service.fastq_file_service import FastqFileService
from cg.store.models import Analysis
from cg.store.store import Store
from tests.meta.upload.scout.conftest import mip_load_config
Expand Down Expand Up @@ -201,6 +202,7 @@ def fastq_context(
sample_tags=PIPELINE_ANALYSIS_TAG_MAP[Workflow.FASTQ]["sample_tags"],
delivery_type="fastq",
project_base_path=Path(base_context.delivery_path),
fastq_file_service=FastqFileService(),
)
base_context.meta_apis["rsync_api"] = RsyncAPI(cg_context)
base_context.trailblazer_api_ = trailblazer_api
Expand Down
3 changes: 3 additions & 0 deletions tests/meta/deliver/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cg.constants.delivery import INBOX_NAME
from cg.constants.housekeeper_tags import AlignmentFileTag
from cg.meta.deliver import DeliverAPI
from cg.services.fastq_file_service.fastq_file_service import FastqFileService
from cg.store.models import Case
from cg.store.store import Store
from tests.store_helpers import StoreHelpers
Expand All @@ -29,6 +30,7 @@ def deliver_api(
sample_tags=[{AlignmentFileTag.CRAM}],
project_base_path=project_dir,
delivery_type="balsamic",
fastq_file_service=FastqFileService(),
)
yield _deliver_api

Expand Down Expand Up @@ -57,6 +59,7 @@ def populated_deliver_api(
sample_tags=[{AlignmentFileTag.CRAM}],
project_base_path=project_dir,
delivery_type="balsamic",
fastq_file_service=FastqFileService(),
)
return _deliver_api

Expand Down
3 changes: 3 additions & 0 deletions tests/meta/deliver/test_delivery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cg.constants.delivery import INBOX_NAME
from cg.constants.housekeeper_tags import AlignmentFileTag
from cg.meta.deliver import DeliverAPI
from cg.services.fastq_file_service.fastq_file_service import FastqFileService
from cg.store.models import Case, CaseSample, Sample
from cg.store.store import Store
from tests.cli.deliver.conftest import fastq_delivery_bundle, mip_delivery_bundle
Expand All @@ -28,6 +29,7 @@ def test_get_delivery_path(
sample_tags=["sample-tag"],
project_base_path=project_dir,
delivery_type="balsamic",
fastq_file_service=FastqFileService(),
)
customer_id = "cust000"
ticket = "1234"
Expand Down Expand Up @@ -85,6 +87,7 @@ def test_get_case_files_from_version(
sample_tags=[{"sample-tag"}],
project_base_path=project_dir,
delivery_type="balsamic",
fastq_file_service=FastqFileService(),
)

# GIVEN a housekeeper db populated with a bundle including a case specific file and a sample specific file
Expand Down
37 changes: 37 additions & 0 deletions tests/services/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pathlib import Path
import pytest

from cg.services.fastq_file_service.fastq_file_service import FastqFileService


@pytest.fixture
def fastq_file_service():
return FastqFileService()


def create_fastqs_directory(number_forward_reads, number_reverse_reads, tmp_path):
fastq_dir = Path(tmp_path, "fastqs")
fastq_dir.mkdir()
for i in range(number_forward_reads):
file = Path(fastq_dir, f"sample_R1_{i}.fastq.gz")
file.write_text(f"forward read {i}")

for i in range(number_reverse_reads):
file = Path(fastq_dir, f"sample_R2_{i}.fastq.gz")
file.write_text(f"reverse read {i}")
return fastq_dir


@pytest.fixture
def fastqs_dir(tmp_path) -> Path:
return create_fastqs_directory(
number_forward_reads=3, number_reverse_reads=3, tmp_path=tmp_path
)


@pytest.fixture
def fastqs_forward(tmp_path) -> Path:
"""Return a directory with only forward reads."""
return create_fastqs_directory(
number_forward_reads=3, number_reverse_reads=0, tmp_path=tmp_path
)
Loading

0 comments on commit f8d3bfc

Please sign in to comment.