Skip to content

Commit

Permalink
fix(demultiplexing)Remove demux dir before starting a new demux job (#…
Browse files Browse the repository at this point in the history
…2619)(minor)

### Added
- Method `prepare_output_directory` to both remove and create output directory

### Changed

- Output dir creation moved from `start_demultiplexing` to new function `prepare_output_directory`
- Method `create_demultiplexing_output_dir` now only takes one argument
  • Loading branch information
Vince-janv authored Dec 6, 2023
1 parent ac91c4a commit e2414d2
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 59 deletions.
53 changes: 19 additions & 34 deletions cg/apps/demultiplex/demultiplex_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This api should handle everything around demultiplexing."""
"""This API should handle everything around demultiplexing."""
import logging
import shutil
from pathlib import Path

from typing_extensions import Literal
Expand Down Expand Up @@ -140,15 +141,6 @@ def demultiplexing_completed_path(self, flow_cell: FlowCellDirectoryData) -> Pat
self.flow_cell_out_dir_path(flow_cell), DemultiplexingDirsAndFiles.DEMUX_COMPLETE
)

def is_demultiplexing_completed(self, flow_cell: FlowCellDirectoryData) -> bool:
"""Check the path to where the demultiplexed result should be produced."""
LOG.info(f"Check if demultiplexing is ready for {flow_cell.path}")
logfile: Path = self.get_stderr_logfile(flow_cell)
if not logfile.exists():
LOG.warning(f"Could not find logfile: {logfile}!")
return False
return self.demultiplexing_completed_path(flow_cell).exists()

def is_demultiplexing_possible(self, flow_cell: FlowCellDirectoryData) -> bool:
"""Check if it is possible to start demultiplexing.
Expand Down Expand Up @@ -177,13 +169,6 @@ def is_demultiplexing_possible(self, flow_cell: FlowCellDirectoryData) -> bool:
LOG.warning("Demultiplexing has already been started")
demultiplexing_possible = False

if self.flow_cell_out_dir_path(flow_cell=flow_cell).exists():
LOG.warning("Flow cell out dir exists")
demultiplexing_possible = False

if self.is_demultiplexing_completed(flow_cell):
LOG.warning(f"Demultiplexing is already completed for flow cell {flow_cell.id}")
demultiplexing_possible = False
return demultiplexing_possible

def create_demultiplexing_started_file(self, demultiplexing_started_path: Path) -> None:
Expand Down Expand Up @@ -227,21 +212,13 @@ def add_to_trailblazer(
def start_demultiplexing(self, flow_cell: FlowCellDirectoryData):
"""Start demultiplexing for a flow cell."""
self.create_demultiplexing_started_file(flow_cell.demultiplexing_started_path)
demux_dir: Path = self.flow_cell_out_dir_path(flow_cell=flow_cell)
unaligned_dir: Path = self.get_flow_cell_unaligned_dir(flow_cell=flow_cell)
LOG.info(f"Demultiplexing to {unaligned_dir}")
if not self.dry_run:
self.create_demultiplexing_output_dir(
flow_cell=flow_cell, demux_dir=demux_dir, unaligned_dir=unaligned_dir
)

log_path: Path = self.get_stderr_logfile(flow_cell=flow_cell)
error_function: str = self.get_sbatch_error(
flow_cell=flow_cell, email=self.mail, demux_dir=demux_dir
flow_cell=flow_cell, email=self.mail, demux_dir=self.flow_cell_out_dir_path(flow_cell)
)
commands: str = self.get_sbatch_command(
run_dir=flow_cell.path,
demux_dir=demux_dir,
demux_dir=self.flow_cell_out_dir_path(flow_cell=flow_cell),
sample_sheet=flow_cell.sample_sheet_path,
demux_completed=self.demultiplexing_completed_path(flow_cell=flow_cell),
flow_cell=flow_cell,
Expand Down Expand Up @@ -283,11 +260,19 @@ def start_demultiplexing(self, flow_cell: FlowCellDirectoryData):
LOG.info(f"Demultiplexing running as job {sbatch_number}")
return sbatch_number

@staticmethod
def create_demultiplexing_output_dir(
flow_cell: FlowCellDirectoryData, demux_dir: Path, unaligned_dir: Path
) -> None:
LOG.debug(f"Creating demux dir {unaligned_dir}")
demux_dir.mkdir(exist_ok=False, parents=True)
def prepare_output_directory(self, flow_cell: FlowCellDirectoryData) -> None:
"""Makes sure the output directory is ready for demultiplexing."""
self.remove_demultiplexing_output_directory(flow_cell)
self.create_demultiplexing_output_dir(flow_cell)

def remove_demultiplexing_output_directory(self, flow_cell: FlowCellDirectoryData) -> None:
if not self.dry_run and self.flow_cell_out_dir_path(flow_cell=flow_cell).exists():
shutil.rmtree(self.flow_cell_out_dir_path(flow_cell=flow_cell), ignore_errors=False)

def create_demultiplexing_output_dir(self, flow_cell: FlowCellDirectoryData) -> None:
"""Creates the demultiplexing output directory and, if necessary, the unaligned directory."""
output_directory: Path = self.flow_cell_out_dir_path(flow_cell)
LOG.debug(f"Creating demultiplexing output directory: {output_directory}")
output_directory.mkdir(exist_ok=False, parents=True)
if flow_cell.bcl_converter == BclConverter.BCL2FASTQ:
unaligned_dir.mkdir(exist_ok=False, parents=False)
self.get_flow_cell_unaligned_dir(flow_cell).mkdir(exist_ok=False, parents=False)
2 changes: 2 additions & 0 deletions cg/cli/demultiplex/demux.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def demultiplex_all(context: CGConfig, flow_cells_directory: click.Path, dry_run
continue

if not dry_run:
demultiplex_api.prepare_output_directory(flow_cell)
slurm_job_id: int = demultiplex_api.start_demultiplexing(flow_cell=flow_cell)
tb_api: TrailblazerAPI = context.trailblazer_api
demultiplex_api.add_to_trailblazer(
Expand Down Expand Up @@ -112,6 +113,7 @@ def demultiplex_flow_cell(
raise click.Abort

if not dry_run:
demultiplex_api.prepare_output_directory(flow_cell)
slurm_job_id: int = demultiplex_api.start_demultiplexing(flow_cell=flow_cell)
tb_api: TrailblazerAPI = context.trailblazer_api
demultiplex_api.add_to_trailblazer(
Expand Down
156 changes: 138 additions & 18 deletions tests/apps/demultiplex/test_demultiplex_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Tests for functions of DemultiplexAPI."""
from pathlib import Path

import pytest

from cg.apps.demultiplex.demultiplex_api import DemultiplexingAPI
from cg.constants.demultiplexing import DemultiplexingDirsAndFiles
from cg.meta.demultiplex.housekeeper_storage_functions import (
Expand Down Expand Up @@ -50,44 +52,162 @@ def test_is_sample_sheet_in_housekeeper_not_in_hk(


def test_create_demultiplexing_output_dir_for_bcl2fastq(
tmp_bcl2fastq_flow_cell, tmp_path, demultiplexing_context_for_demux: CGConfig
tmp_bcl2fastq_flow_cell: FlowCellDirectoryData,
tmp_path: Path,
demultiplexing_api: DemultiplexingAPI,
):
"""Test that the correct demultiplexing output directory is created."""
# GIVEN a Bcl2Fastq FlowCellDirectoryData object

# GIVEN that the demultiplexing output directory does not exist
demux_dir = Path(tmp_path, DemultiplexingDirsAndFiles.DEMULTIPLEXED_RUNS_DIRECTORY_NAME)
unaligned_dir = Path(demux_dir, DemultiplexingDirsAndFiles.UNALIGNED_DIR_NAME)
assert not demux_dir.exists()
demultiplexing_api.demultiplexed_runs_dir = tmp_path
output_directory: Path = demultiplexing_api.flow_cell_out_dir_path(tmp_bcl2fastq_flow_cell)
unaligned_dir: Path = demultiplexing_api.get_flow_cell_unaligned_dir(tmp_bcl2fastq_flow_cell)

assert not output_directory.exists()
assert not unaligned_dir.exists()

# WHEN creating the demultiplexing output directory for a bcl2fastq flow cell
demultiplexing_context_for_demux.demultiplex_api.create_demultiplexing_output_dir(
flow_cell=tmp_bcl2fastq_flow_cell, demux_dir=demux_dir, unaligned_dir=unaligned_dir
)
demultiplexing_api.create_demultiplexing_output_dir(tmp_bcl2fastq_flow_cell)

# THEN the demultiplexing output directory should exist
assert demux_dir.exists()
assert output_directory.exists()
assert unaligned_dir.exists()


def test_create_demultiplexing_output_dir_for_bcl_convert(
tmp_bcl_convert_flow_cell, tmp_path, demultiplexing_context_for_demux: CGConfig
tmp_bcl_convert_flow_cell: FlowCellDirectoryData,
tmp_path: Path,
demultiplexing_api: DemultiplexingAPI,
):
"""Test that the correct demultiplexing output directory is created."""
# GIVEN BCL Convert FlowCellDirectoryData object

# GIVEN that the demultiplexing output directory does not exist
demux_dir = Path(tmp_path, DemultiplexingDirsAndFiles.DEMULTIPLEXED_RUNS_DIRECTORY_NAME)
unaligned_dir = Path(demux_dir, DemultiplexingDirsAndFiles.UNALIGNED_DIR_NAME)
assert not demux_dir.exists()
assert not unaligned_dir.exists()
demultiplexing_api.demultiplexed_runs_dir = tmp_path
output_directory: Path = demultiplexing_api.flow_cell_out_dir_path(tmp_bcl_convert_flow_cell)
unaligned_directory: Path = demultiplexing_api.get_flow_cell_unaligned_dir(
tmp_bcl_convert_flow_cell
)
assert not output_directory.exists()
assert not unaligned_directory.exists()

# WHEN creating the demultiplexing output directory for a BCL Convert flow cell
demultiplexing_context_for_demux.demultiplex_api.create_demultiplexing_output_dir(
flow_cell=tmp_bcl_convert_flow_cell, demux_dir=demux_dir, unaligned_dir=unaligned_dir
)
demultiplexing_api.create_demultiplexing_output_dir(tmp_bcl_convert_flow_cell)

# THEN the demultiplexing output directory should exist
assert demux_dir.exists()
assert not unaligned_dir.exists()
assert output_directory.exists()
assert not unaligned_directory.exists()


def test_is_demultiplexing_possible_true(
demultiplexing_api: DemultiplexingAPI,
tmp_bcl_convert_flow_cell: FlowCellDirectoryData,
):
"""Test demultiplexing pre-check when all criteria are fulfilled."""
add_sample_sheet_path_to_housekeeper(
flow_cell_directory=tmp_bcl_convert_flow_cell.path,
flow_cell_name=tmp_bcl_convert_flow_cell.id,
hk_api=demultiplexing_api.hk_api,
)
# GIVEN a flow cell with no missing file

# WHEN checking if demultiplexing is possible
result: bool = demultiplexing_api.is_demultiplexing_possible(
flow_cell=tmp_bcl_convert_flow_cell
)
# THEN the flow cell is ready for demultiplexing
assert result is True


@pytest.mark.parametrize("missing_file", ["RTAComplete.txt", "CopyComplete.txt", "SampleSheet.csv"])
def test_is_demultiplexing_possible_missing_files(
demultiplexing_api: DemultiplexingAPI,
missing_file: str,
tmp_bcl_convert_flow_cell: FlowCellDirectoryData,
):
"""Test demultiplexing pre-check when files are missing in flow cell directory."""
# GIVEN a flow cell with a sample sheet in Housekeeper
add_sample_sheet_path_to_housekeeper(
flow_cell_directory=tmp_bcl_convert_flow_cell.path,
flow_cell_name=tmp_bcl_convert_flow_cell.id,
hk_api=demultiplexing_api.hk_api,
)

# GIVEN that all other demultiplexing criteria are fulfilled
assert (
demultiplexing_api.is_demultiplexing_possible(flow_cell=tmp_bcl_convert_flow_cell) is True
)

# GIVEN a flow cell with a missing file
Path(tmp_bcl_convert_flow_cell.path, missing_file).unlink()

# WHEN checking if demultiplexing is possible
result: bool = demultiplexing_api.is_demultiplexing_possible(
flow_cell=tmp_bcl_convert_flow_cell
)
# THEN the flow cell should not be deemed ready for demultiplexing
assert result is False


def is_demultiplexing_possible_no_sample_sheet_in_hk(
demultiplexing_api: DemultiplexingAPI,
tmp_bcl_convert_flow_cell: FlowCellDirectoryData,
):
"""Test demultiplexing pre-check when no sample sheet exists in Housekeeper."""
# GIVEN a flow cell with no sample sheet in Housekeeper
assert (
demultiplexing_api.is_sample_sheet_in_housekeeper(flow_cell_id=tmp_bcl_convert_flow_cell.id)
is False
)

# WHEN checking if demultiplexing is possible
result: bool = demultiplexing_api.is_demultiplexing_possible(
flow_cell=tmp_bcl_convert_flow_cell
)
# THEN the flow cell should not be deemed ready for demultiplexing
assert result is False


def test_is_demultiplexing_possible_already_started(
demultiplexing_api: DemultiplexingAPI,
tmp_bcl_convert_flow_cell: FlowCellDirectoryData,
):
"""Test demultiplexing pre-check demultiplexing has already started."""
# GIVEN a flow cell with a sample sheet in Housekeeper
add_sample_sheet_path_to_housekeeper(
flow_cell_directory=tmp_bcl_convert_flow_cell.path,
flow_cell_name=tmp_bcl_convert_flow_cell.id,
hk_api=demultiplexing_api.hk_api,
)
# GIVEN that all other demultiplexing criteria are fulfilled
assert (
demultiplexing_api.is_demultiplexing_possible(flow_cell=tmp_bcl_convert_flow_cell) is True
)

# GIVEN a flow cell where demultiplexing has already started
Path(tmp_bcl_convert_flow_cell.path, DemultiplexingDirsAndFiles.DEMUX_STARTED).touch()

# WHEN checking if demultiplexing is possible
result: bool = demultiplexing_api.is_demultiplexing_possible(
flow_cell=tmp_bcl_convert_flow_cell
)
# THEN the flow cell should not be deemed ready for demultiplexing
assert result is False


def test_remove_demultiplexing_output_directory(
demultiplexing_api: DemultiplexingAPI,
tmp_path: Path,
bcl_convert_flow_cell: FlowCellDirectoryData,
):
"""Test that the demultiplexing output directory is removed."""
# GIVEN a flow cell with a demultiplexing output directory
demultiplexing_api.demultiplexed_runs_dir = tmp_path
demultiplexing_api.create_demultiplexing_output_dir(bcl_convert_flow_cell)
assert demultiplexing_api.flow_cell_out_dir_path(bcl_convert_flow_cell).exists()

# WHEN removing the demultiplexing output directory
demultiplexing_api.remove_demultiplexing_output_directory(flow_cell=bcl_convert_flow_cell)

assert not demultiplexing_api.flow_cell_out_dir_path(bcl_convert_flow_cell).exists()
21 changes: 14 additions & 7 deletions tests/cli/demultiplex/test_demultiplex_flowcell.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,12 @@ def test_demultiplex_bcl2fastq_flow_cell(
hk_api=demultiplexing_context_for_demux.housekeeper_api,
)

# GIVEN an out dir that does not exist
# GIVEN a flow cell that is ready for demultiplexing
demux_api: DemultiplexingAPI = demultiplexing_context_for_demux.demultiplex_api
demux_dir: Path = demux_api.flow_cell_out_dir_path(flow_cell)
unaligned_dir: Path = Path(demux_dir, DemultiplexingDirsAndFiles.UNALIGNED_DIR_NAME)
assert demux_api.is_demultiplexing_possible(flow_cell=flow_cell)
assert demux_dir.exists() is False
assert unaligned_dir.exists() is False

mocker.patch("cg.apps.tb.TrailblazerAPI.add_pending_analysis")

# WHEN starting demultiplexing from the CLI with dry run flag
Expand All @@ -98,7 +97,7 @@ def test_demultiplex_bcl2fastq_flow_cell(
obj=demultiplexing_context_for_demux,
)

# THEN assert the command exits sucessfully
# THEN assert the command exits successfully

assert result.exit_code == 0

Expand Down Expand Up @@ -131,14 +130,19 @@ def test_demultiplex_dragen_flowcell(
hk_api=demultiplexing_context_for_demux.housekeeper_api,
)

# GIVEN an out dir that does not exist
# GIVEN a flow cell that is ready for demultiplexing
demux_api: DemultiplexingAPI = demultiplexing_context_for_demux.demultiplex_api
demux_dir: Path = demux_api.flow_cell_out_dir_path(flow_cell)
assert demux_api.is_demultiplexing_possible(flow_cell=flow_cell)
assert demux_dir.exists() is False
mocker.patch("cg.apps.tb.TrailblazerAPI.add_pending_analysis")

# WHEN starting demultiplexing from the CLI with dry run flag
# GIVEN an already existing output directory
demux_dir.mkdir(parents=True)
marker_file = Path(demux_dir, "dummy_file_present_in_old_dir")
marker_file.touch()
assert marker_file.exists()

# WHEN starting demultiplexing from the CLI
result: testing.Result = cli_runner.invoke(
demultiplex_flow_cell,
[str(tmp_flow_cell_directory_bclconvert), "-b", "dragen"],
Expand All @@ -151,6 +155,9 @@ def test_demultiplex_dragen_flowcell(
# THEN assert the results folder was created
assert demux_dir.exists()

# THEN assert that the old directory was removed
assert not marker_file.exists()

# THEN assert that the sbatch script was created
assert demux_api.demultiplex_sbatch_path(flow_cell).exists()

Expand Down

0 comments on commit e2414d2

Please sign in to comment.