Skip to content

Commit f01213b

Browse files
Refactor link fastq files (#2763)
### Changed - FastqFile model - Gzip read function - Tests
1 parent 562eeeb commit f01213b

22 files changed

+363
-173
lines changed

cg/io/gzip.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import gzip
2+
from pathlib import Path
3+
4+
5+
def read_gzip_first_line(file_path: Path) -> str:
6+
"""Return first line of gzip file."""
7+
with gzip.open(file_path) as file:
8+
return file.readline().decode()

cg/meta/workflow/analysis.py

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from housekeeper.store.models import Bundle, Version
1010

1111
from cg.apps.environ import environ_email
12-
from cg.constants import EXIT_FAIL, EXIT_SUCCESS, Pipeline, Priority
12+
from cg.constants import EXIT_FAIL, EXIT_SUCCESS, Pipeline, Priority, SequencingFileTag
1313
from cg.constants.constants import (
1414
AnalysisType,
1515
CaseActions,
@@ -24,6 +24,7 @@
2424
from cg.meta.workflow.fastq import FastqHandler
2525
from cg.models.analysis import AnalysisModel
2626
from cg.models.cg_config import CGConfig
27+
from cg.models.fastq import FastqFileMeta
2728
from cg.store.models import Analysis, BedVersion, Case, CaseSample, Sample
2829

2930
LOG = logging.getLogger(__name__)
@@ -288,58 +289,59 @@ def get_cases_to_qc(self) -> list[Case]:
288289
if self.trailblazer_api.is_latest_analysis_qc(case_id=case.internal_id)
289290
]
290291

291-
def get_sample_fastq_destination_dir(self, case: Case, sample: Sample):
292+
def get_sample_fastq_destination_dir(self, case: Case, sample: Sample) -> Path:
292293
"""Return the path to the FASTQ destination directory."""
293294
raise NotImplementedError
294295

295-
def gather_file_metadata_for_sample(self, sample_obj: Sample) -> list[dict]:
296+
def gather_file_metadata_for_sample(self, sample: Sample) -> list[FastqFileMeta]:
296297
return [
297-
self.fastq_handler.parse_file_data(file_obj.full_path)
298-
for file_obj in self.housekeeper_api.files(
299-
bundle=sample_obj.internal_id, tags=["fastq"]
298+
self.fastq_handler.parse_file_data(hk_file.full_path)
299+
for hk_file in self.housekeeper_api.files(
300+
bundle=sample.internal_id, tags={SequencingFileTag.FASTQ}
300301
)
301302
]
302303

303304
def link_fastq_files_for_sample(
304-
self, case_obj: Case, sample_obj: Sample, concatenate: bool = False
305+
self, case: Case, sample: Sample, concatenate: bool = False
305306
) -> None:
306307
"""
307-
Link FASTQ files for a sample to working directory.
308+
Link FASTQ files for a sample to the work directory.
308309
If pipeline input requires concatenated fastq, files can also be concatenated
309310
"""
310-
linked_reads_paths = {1: [], 2: []}
311-
concatenated_paths = {1: "", 2: ""}
312-
files: list[dict] = self.gather_file_metadata_for_sample(sample_obj=sample_obj)
313-
sorted_files = sorted(files, key=lambda k: k["path"])
314-
fastq_dir = self.get_sample_fastq_destination_dir(case=case_obj, sample=sample_obj)
311+
linked_reads_paths: dict[int, list[Path]] = {1: [], 2: []}
312+
concatenated_paths: dict[int, str] = {1: "", 2: ""}
313+
fastq_files_meta: list[FastqFileMeta] = self.gather_file_metadata_for_sample(sample=sample)
314+
sorted_fastq_files_meta: list[FastqFileMeta] = sorted(
315+
fastq_files_meta, key=lambda k: k.path
316+
)
317+
fastq_dir: Path = self.get_sample_fastq_destination_dir(case=case, sample=sample)
315318
fastq_dir.mkdir(parents=True, exist_ok=True)
316319

317-
for fastq_data in sorted_files:
318-
fastq_path = Path(fastq_data["path"])
319-
fastq_name = self.fastq_handler.create_fastq_name(
320-
lane=fastq_data["lane"],
321-
flowcell=fastq_data["flowcell"],
322-
sample=sample_obj.internal_id,
323-
read=fastq_data["read"],
324-
undetermined=fastq_data["undetermined"],
325-
meta=self.get_additional_naming_metadata(sample_obj),
320+
for fastq_file in sorted_fastq_files_meta:
321+
fastq_file_name: str = self.fastq_handler.create_fastq_name(
322+
lane=fastq_file.lane,
323+
flow_cell=fastq_file.flow_cell_id,
324+
sample=sample.internal_id,
325+
read_direction=fastq_file.read_direction,
326+
undetermined=fastq_file.undetermined,
327+
meta=self.get_lims_naming_metadata(sample),
326328
)
327-
destination_path: Path = fastq_dir / fastq_name
328-
linked_reads_paths[fastq_data["read"]].append(destination_path)
329+
destination_path = Path(fastq_dir, fastq_file_name)
330+
linked_reads_paths[fastq_file.read_direction].append(destination_path)
329331
concatenated_paths[
330-
fastq_data["read"]
331-
] = f"{fastq_dir}/{self.fastq_handler.get_concatenated_name(fastq_name)}"
332+
fastq_file.read_direction
333+
] = f"{fastq_dir}/{self.fastq_handler.get_concatenated_name(fastq_file_name)}"
332334

333335
if not destination_path.exists():
334-
LOG.info(f"Linking: {fastq_path} -> {destination_path}")
335-
destination_path.symlink_to(fastq_path)
336+
LOG.info(f"Linking: {fastq_file.path} -> {destination_path}")
337+
destination_path.symlink_to(fastq_file.path)
336338
else:
337339
LOG.warning(f"Destination path already exists: {destination_path}")
338340

339341
if not concatenate:
340342
return
341343

342-
LOG.info("Concatenation in progress for sample %s.", sample_obj.internal_id)
344+
LOG.info(f"Concatenation in progress for sample: {sample.internal_id}")
343345
for read, value in linked_reads_paths.items():
344346
self.fastq_handler.concatenate(linked_reads_paths[read], concatenated_paths[read])
345347
self.fastq_handler.remove_files(value)
@@ -435,7 +437,7 @@ def get_date_from_file_path(file_path: Path) -> dt.datetime.date:
435437
"""
436438
return dt.datetime.fromtimestamp(int(os.path.getctime(file_path)))
437439

438-
def get_additional_naming_metadata(self, sample_obj: Sample) -> str | None:
440+
def get_lims_naming_metadata(self, sample: Sample) -> str | None:
439441
return None
440442

441443
def get_latest_metadata(self, case_id: str) -> AnalysisModel:

cg/meta/workflow/balsamic.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
BalsamicWGSQCMetrics,
2424
)
2525
from cg.models.cg_config import CGConfig
26+
from cg.models.fastq import FastqFileMeta
2627
from cg.store.models import Case, CaseSample, Sample
2728
from cg.utils import Process
2829
from cg.utils.utils import build_command_from_dict, get_string_from_list_by_pattern
@@ -146,22 +147,22 @@ def get_sample_fastq_destination_dir(self, case: Case, sample: Sample = None) ->
146147
return Path(self.get_case_path(case.internal_id), FileFormat.FASTQ)
147148

148149
def link_fastq_files(self, case_id: str, dry_run: bool = False) -> None:
149-
case_obj = self.status_db.get_case_by_internal_id(internal_id=case_id)
150-
for link in case_obj.links:
151-
self.link_fastq_files_for_sample(
152-
case_obj=case_obj, sample_obj=link.sample, concatenate=True
153-
)
150+
case = self.status_db.get_case_by_internal_id(internal_id=case_id)
151+
for link in case.links:
152+
self.link_fastq_files_for_sample(case=case, sample=link.sample, concatenate=True)
154153

155154
def get_concatenated_fastq_path(self, link_object: CaseSample) -> Path:
156-
"""Returns path to the concatenated FASTQ file of a sample"""
157-
file_collection: list[dict] = self.gather_file_metadata_for_sample(link_object.sample)
155+
"""Returns the path to the concatenated FASTQ file of a sample"""
156+
file_collection: list[FastqFileMeta] = self.gather_file_metadata_for_sample(
157+
link_object.sample
158+
)
158159
fastq_data = file_collection[0]
159160
linked_fastq_name = self.fastq_handler.create_fastq_name(
160-
lane=fastq_data["lane"],
161-
flowcell=fastq_data["flowcell"],
161+
lane=fastq_data.lane,
162+
flow_cell=fastq_data.flow_cell_id,
162163
sample=link_object.sample.internal_id,
163-
read=fastq_data["read"],
164-
undetermined=fastq_data["undetermined"],
164+
read_direction=fastq_data.read_direction,
165+
undetermined=fastq_data.undetermined,
165166
)
166167
concatenated_fastq_name: str = self.fastq_handler.get_concatenated_name(linked_fastq_name)
167168
return Path(

0 commit comments

Comments
 (0)