Skip to content

Commit

Permalink
(feat taxprofiler): Add store command (#3038)
Browse files Browse the repository at this point in the history
### Added

- cg workflow taxprofiler store case_id
  • Loading branch information
sofstam authored Mar 21, 2024
1 parent e47200d commit 88108f4
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 145 deletions.
36 changes: 19 additions & 17 deletions cg/cli/workflow/nf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
import click
from pydantic import ValidationError

from cg.apps.housekeeper.hk import HousekeeperAPI

from cg.cli.workflow.commands import ARGUMENT_CASE_ID, OPTION_DRY
from cg.constants.constants import MetaApis
from cg.exc import CgError, HousekeeperStoreError
from cg.meta.workflow.nf_analysis import NfAnalysisAPI
from cg.models.cg_config import CGConfig
from cg.store.store import Store

LOG = logging.getLogger(__name__)

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -162,13 +161,9 @@ def metrics_deliver(context: CGConfig, case_id: str, dry_run: bool) -> None:
"""Create and validate a metrics deliverables file for given case id.
If QC metrics are met it sets the status in Trailblazer to complete.
If failed, it sets it as failed and adds a comment with information of the failed metrics."""

analysis_api: NfAnalysisAPI = context.meta_apis[MetaApis.ANALYSIS_API]

try:
analysis_api.status_db.verify_case_exists(case_internal_id=case_id)
analysis_api.write_metrics_deliverables(case_id=case_id, dry_run=dry_run)
analysis_api.validate_qc_metrics(case_id=case_id, dry_run=dry_run)
analysis_api.metrics_deliver(case_id=case_id, dry_run=dry_run)
except CgError as error:
raise click.Abort() from error

Expand All @@ -179,17 +174,10 @@ def metrics_deliver(context: CGConfig, case_id: str, dry_run: bool) -> None:
@click.pass_obj
def report_deliver(context: CGConfig, case_id: str, dry_run: bool) -> None:
"""Create a Housekeeper deliverables file for given case id."""

analysis_api: NfAnalysisAPI = context.meta_apis[MetaApis.ANALYSIS_API]

try:
analysis_api.status_db.verify_case_exists(case_internal_id=case_id)
analysis_api.trailblazer_api.is_latest_analysis_completed(case_id=case_id)
if not dry_run:
analysis_api.report_deliver(case_id=case_id)
else:
LOG.info(f"Dry-run: Would have created delivery files for case {case_id}")
except Exception as error:
analysis_api.report_deliver(case_id=case_id, dry_run=dry_run)
except CgError as error:
LOG.error(f"Could not create report file: {error}")
raise click.Abort()

Expand All @@ -201,9 +189,23 @@ def report_deliver(context: CGConfig, case_id: str, dry_run: bool) -> None:
def store_housekeeper(context: CGConfig, case_id: str, dry_run: bool) -> None:
"""Store a finished RNAFUSION and TAXPROFILER analysis in Housekeeper and StatusDB."""
analysis_api: NfAnalysisAPI = context.meta_apis[MetaApis.ANALYSIS_API]

try:
analysis_api.store_analysis_housekeeper(case_id=case_id, dry_run=dry_run)
except HousekeeperStoreError as error:
LOG.error(f"Could not store bundle in Housekeeper and StatusDB: {error}!")
raise click.Abort()


@click.command("store")
@ARGUMENT_CASE_ID
@OPTION_DRY
@click.pass_context
def store(context: click.Context, case_id: str, dry_run: bool) -> None:
"""Generate deliverable files for a case and store in Housekeeper if they
pass QC metrics checks."""
analysis_api: NfAnalysisAPI = context.obj.meta_apis[MetaApis.ANALYSIS_API]
try:
analysis_api.store(case_id=case_id, dry_run=dry_run)
except Exception as error:
LOG.error(repr(error))
raise click.Abort()
34 changes: 2 additions & 32 deletions cg/cli/workflow/rnafusion/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
report_deliver,
run,
store_housekeeper,
store,
)
from cg.constants import EXIT_FAIL, EXIT_SUCCESS
from cg.constants.constants import DRY_RUN, MetaApis
Expand Down Expand Up @@ -116,38 +117,7 @@ def start_available(context: click.Context, dry_run: bool = False) -> None:
rnafusion.add_command(metrics_deliver)
rnafusion.add_command(report_deliver)
rnafusion.add_command(store_housekeeper)


@rnafusion.command("store")
@ARGUMENT_CASE_ID
@DRY_RUN
@click.pass_context
def store(context: click.Context, case_id: str, dry_run: bool) -> None:
"""Generate deliverables files for a case and store in Housekeeper if they
pass QC metrics checks."""
analysis_api: RnafusionAnalysisAPI = context.obj.meta_apis[MetaApis.ANALYSIS_API]

is_latest_analysis_qc: bool = analysis_api.trailblazer_api.is_latest_analysis_qc(
case_id=case_id
)
if not is_latest_analysis_qc and not analysis_api.trailblazer_api.is_latest_analysis_completed(
case_id=case_id
):
LOG.error(
"Case not stored. Trailblazer status must be either QC or COMPLETE to be able to store"
)
return

# Avoid storing a case without QC checks previously performed
if (
is_latest_analysis_qc
or not analysis_api.get_metrics_deliverables_path(case_id=case_id).exists()
):
LOG.info(f"Generating metrics file and performing QC checks for {case_id}")
context.invoke(metrics_deliver, case_id=case_id, dry_run=dry_run)
LOG.info(f"Storing analysis for {case_id}")
context.invoke(report_deliver, case_id=case_id, dry_run=dry_run)
context.invoke(store_housekeeper, case_id=case_id, dry_run=dry_run)
rnafusion.add_command(store)


@rnafusion.command("store-available")
Expand Down
2 changes: 2 additions & 0 deletions cg/cli/workflow/taxprofiler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
report_deliver,
run,
store_housekeeper,
store,
)
from cg.constants import EXIT_FAIL, EXIT_SUCCESS
from cg.constants.constants import DRY_RUN, MetaApis
Expand All @@ -45,6 +46,7 @@ def taxprofiler(context: click.Context) -> None:
taxprofiler.add_command(report_deliver)
taxprofiler.add_command(run)
taxprofiler.add_command(store_housekeeper)
taxprofiler.add_command(store)


@taxprofiler.command("start")
Expand Down
46 changes: 40 additions & 6 deletions cg/meta/workflow/nf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,17 +660,29 @@ def validate_qc_metrics(self, case_id: str, dry_run: bool = False) -> None:
raise CgError from error
self.trailblazer_api.set_analysis_status(case_id=case_id, status=AnalysisStatus.COMPLETED)

def report_deliver(self, case_id: str) -> None:
def metrics_deliver(self, case_id: str, dry_run: bool):
"""Create and validate a metrics deliverables file for given case id."""
self.status_db.verify_case_exists(case_internal_id=case_id)
self.write_metrics_deliverables(case_id=case_id, dry_run=dry_run)
self.validate_qc_metrics(case_id=case_id, dry_run=dry_run)

def report_deliver(self, case_id: str, dry_run: bool) -> None:
"""Write deliverables file."""
workflow_content: WorkflowDeliverables = self.get_deliverables_for_case(case_id=case_id)
self.write_deliverables_file(
deliverables_content=workflow_content.dict(),
file_path=self.get_deliverables_file_path(case_id=case_id),
)

self.status_db.verify_case_exists(case_internal_id=case_id)
self.trailblazer_api.is_latest_analysis_completed(case_id=case_id)
if not dry_run:
workflow_content: WorkflowDeliverables = self.get_deliverables_for_case(case_id=case_id)
self.write_deliverables_file(
deliverables_content=workflow_content.dict(),
file_path=self.get_deliverables_file_path(case_id=case_id),
)
LOG.info(
f"Writing deliverables file in {self.get_deliverables_file_path(case_id=case_id).as_posix()}"
)

LOG.info(f"Dry-run: Would have created delivery files for case {case_id}")

def store_analysis_housekeeper(self, case_id: str, dry_run: bool = False) -> None:
"""Store a finished nextflow analysis in Housekeeper and StatusDB"""

Expand All @@ -693,3 +705,25 @@ def store_analysis_housekeeper(self, case_id: str, dry_run: bool = False) -> Non
raise HousekeeperStoreError(
f"Could not store bundle in Housekeeper and StatusDB: {error}"
)

def store(self, case_id: str, dry_run: bool):
"""Generate deliverable files for a case and store in Housekeeper if they
pass QC metrics checks."""
is_latest_analysis_qc: bool = self.trailblazer_api.is_latest_analysis_qc(case_id=case_id)
if not is_latest_analysis_qc and not self.trailblazer_api.is_latest_analysis_completed(
case_id=case_id
):
LOG.error(
"Case not stored. Trailblazer status must be either QC or COMPLETE to be able to store"
)
raise ValueError

if (
is_latest_analysis_qc
or not self.get_metrics_deliverables_path(case_id=case_id).exists()
):
LOG.info(f"Generating metrics file and performing QC checks for {case_id}")
self.metrics_deliver(case_id=case_id, dry_run=dry_run)
LOG.info(f"Storing analysis for {case_id}")
self.report_deliver(case_id=case_id, dry_run=dry_run)
self.store_analysis_housekeeper(case_id=case_id, dry_run=dry_run)
120 changes: 120 additions & 0 deletions tests/cli/workflow/nf_analysis/test_cli_compound_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import logging

import pytest
from _pytest.fixtures import FixtureRequest
from _pytest.logging import LogCaptureFixture
from click.testing import CliRunner

from cg.apps.hermes.hermes_api import HermesApi
from cg.apps.hermes.models import CGDeliverables
from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.cli.workflow.base import workflow as workflow_cli
from cg.constants import EXIT_SUCCESS, Workflow
from cg.meta.workflow.nf_analysis import NfAnalysisAPI
from cg.models.cg_config import CGConfig


@pytest.mark.parametrize(
"workflow",
[Workflow.RNAFUSION, Workflow.TAXPROFILER],
)
def test_store_success(
cli_runner: CliRunner,
real_housekeeper_api: HousekeeperAPI,
workflow: Workflow,
caplog: LogCaptureFixture,
deliverables_template_content: list[dict],
request: FixtureRequest,
mocker,
):
"""Test to ensure all parts of store command are run successfully given ideal conditions."""
caplog.set_level(logging.INFO)

# GIVEN a case for which we mocked files created after a successful run

# GIVEN each fixture is being initialised
context: CGConfig = request.getfixturevalue(f"{workflow}_context")
case_id: str = request.getfixturevalue(f"{workflow}_case_id")
hermes_deliverables: dict = request.getfixturevalue(f"{workflow}_hermes_deliverables")
request.getfixturevalue(f"{workflow}_mock_deliverable_dir")
request.getfixturevalue(f"{workflow}_mock_analysis_finish")
deliverables_response_data = request.getfixturevalue(f"{workflow}_deliverables_response_data")

# GIVEN a mocked deliverables template
mocker.patch.object(
NfAnalysisAPI,
"get_deliverables_template_content",
return_value=deliverables_template_content,
)

# GIVEN that the Housekeeper store is empty
context.housekeeper_api_: HousekeeperAPI = real_housekeeper_api
context.meta_apis["analysis_api"].housekeeper_api = real_housekeeper_api

# GIVEN that the bundle is not present in Housekeeper
assert not context.housekeeper_api.bundle(case_id)

# GIVEN that the analysis is not already stored in status_db
assert not context.status_db.get_case_by_internal_id(internal_id=case_id).analyses

# GIVEN that HermesAPI returns a deliverables output
mocker.patch.object(HermesApi, "convert_deliverables")
HermesApi.convert_deliverables.return_value = CGDeliverables(**hermes_deliverables)

# GIVEN Hermes parses deliverables and generates a valid response
mocker.patch.object(HermesApi, "create_housekeeper_bundle")
HermesApi.create_housekeeper_bundle.return_value = deliverables_response_data

# WHEN storing the case files
result = cli_runner.invoke(workflow_cli, [workflow, "store", case_id], obj=context)

# THEN bundle should be successfully added to Housekeeper and StatusDB
assert result.exit_code == EXIT_SUCCESS
assert "Analysis successfully stored in Housekeeper" in caplog.text
assert "Analysis successfully stored in StatusDB" in caplog.text
assert context.status_db.get_case_by_internal_id(internal_id=case_id).analyses
assert context.housekeeper_api.bundle(case_id)


@pytest.mark.parametrize(
"workflow,context",
[(Workflow.RNAFUSION, "rnafusion_context"), (Workflow.TAXPROFILER, "taxprofiler_context")],
)
def test_store_fail(
cli_runner: CliRunner,
workflow: Workflow,
context: str,
real_housekeeper_api: HousekeeperAPI,
deliverables_template_content: list[dict],
caplog: LogCaptureFixture,
request: FixtureRequest,
mocker,
):
"""Test store command fails when a case did not finish for a workflow."""
caplog.set_level(logging.INFO)

# GIVEN each fixture is being initialised
context: CGConfig = request.getfixturevalue(context)

# GIVEN a case id where analysis finish is not mocked
case_id_fail: str = request.getfixturevalue(f"{workflow}_case_id")

# GIVEN a mocked deliverables template
mocker.patch.object(
NfAnalysisAPI,
"get_deliverables_template_content",
return_value=deliverables_template_content,
)

# GIVEN that the Housekeeper store is empty
context.housekeeper_api_: HousekeeperAPI = real_housekeeper_api
context.meta_apis["analysis_api"].housekeeper_api = real_housekeeper_api

# GIVEN that the bundle is not present in Housekeeper
assert not context.housekeeper_api.bundle(case_id_fail)

# WHEN running command
result_fail = cli_runner.invoke(workflow_cli, [workflow, "store", case_id_fail], obj=context)

# THEN bundle exist status should be
assert result_fail.exit_code != EXIT_SUCCESS
Loading

0 comments on commit 88108f4

Please sign in to comment.