Skip to content

Commit

Permalink
Refactor pipeline functions (#2894) (maajor)
Browse files Browse the repository at this point in the history
### Changed

- refactors `pipeline` into `workflow` for functions and CLI
  • Loading branch information
henrikstranneheim authored Feb 6, 2024
1 parent 74ce5e6 commit e05b1dc
Show file tree
Hide file tree
Showing 114 changed files with 670 additions and 722 deletions.
6 changes: 3 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ We convert all the info that we get from LIMS/`genologics` to dictionaries befor

Interface to Trailblazer.

- Monitor analysis pipeline status
- Monitor analysis workflow status

#### Genotype (gt)

Interface to Genotype. For uploading results from the pipeline about genotypes to compare and validate that we are clear of sample mix-ups.
Interface to Genotype. For uploading results from the workflow about genotypes to compare and validate that we are clear of sample mix-ups.

#### Housekeeper (hk)

Expand All @@ -188,7 +188,7 @@ Internal app for opening tickets in SupportSystems. We use this mainly to link a

#### Scout (scoutapi)

Interface to Scout. For uploading analysis results to Scout. It's also used to access the generation of gene panels files used in the analysis pipeline.
Interface to Scout. For uploading analysis results to Scout. It's also used to access the generation of gene panels files used in the analysis workflow.

#### Delivery report

Expand Down
2 changes: 1 addition & 1 deletion cg/apps/gt.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def upload(self, bcf_path: str, samples_sex: dict, force: bool = False) -> None:
# This is the sample sex specified by the customer
sample_sex = samples_sex[sample_id]["pedigree"]
self.update_sample_sex(sample_id, sample_sex)
# This is the predicted sex based on variant calls from the pipeline
# This is the predicted sex based on variant calls from the workflow
analysis_predicted_sex = samples_sex[sample_id]["analysis"]
self.update_analysis_sex(sample_id, sex=analysis_predicted_sex)

Expand Down
14 changes: 7 additions & 7 deletions cg/apps/hermes/hermes_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ def __init__(self, config: dict):
self.process = Process(binary=config["hermes"]["binary_path"])

def convert_deliverables(
self, deliverables_file: Path, pipeline: str, analysis_type: str | None = None
self, deliverables_file: Path, workflow: str, analysis_type: str | None = None
) -> CGDeliverables:
"""Convert deliverables file in raw pipeline format to CG format with hermes"""
LOG.info("Converting pipeline deliverables to CG deliverables")
"""Convert deliverables file in raw workflow format to CG format with hermes"""
LOG.info("Converting workflow deliverables to CG deliverables")
convert_command = [
"convert",
"deliverables",
"--pipeline",
pipeline,
workflow,
str(deliverables_file),
]
if analysis_type:
Expand All @@ -38,13 +38,13 @@ def create_housekeeper_bundle(
self,
bundle_name: str,
deliverables: Path,
pipeline: str,
workflow: str,
analysis_type: str | None,
created: datetime | None,
) -> hk_models.InputBundle:
"""Convert pipeline deliverables to housekeeper bundle ready to be inserted into hk"""
"""Convert workflow deliverables to a Housekeeper bundle ready to be inserted into Housekeeper."""
cg_deliverables: CGDeliverables = self.convert_deliverables(
deliverables_file=deliverables, pipeline=pipeline, analysis_type=analysis_type
deliverables_file=deliverables, workflow=workflow, analysis_type=analysis_type
)
return self.get_housekeeper_bundle(
deliverables=cg_deliverables, created=created, bundle_name=bundle_name
Expand Down
6 changes: 3 additions & 3 deletions cg/apps/hermes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ class CGTag(BaseModel):


class CGDeliverables(BaseModel):
"""Class that specifies the output from hermes"""
"""Class that specifies the output from Hermes."""

pipeline: str
workflow: str
bundle_id: str
files: list[CGTag]

@field_validator("files")
@classmethod
def remove_missing_files(cls, files: list[CGTag]) -> list[CGTag]:
"""Validates that the files in a suggested CGDeliverables object are correct.
I.e. if a file doesn't exist an error is raised if the file was mandatory,
I.e., if a file doesn't exist, an error is raised if the file was mandatory,
otherwise it is simply removed from the list of files."""
filtered_files: list[CGTag] = files.copy()
for file in files:
Expand Down
16 changes: 8 additions & 8 deletions cg/cli/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def scout_finished_cases(
@DRY_RUN
@click.pass_context
def hk_case_bundle_files(context: CGConfig, days_old: int, dry_run: bool = False) -> None:
"""Clean up all non-protected files for all pipelines."""
"""Clean up all non-protected files for all workflows."""
housekeeper_api: HousekeeperAPI = context.obj.housekeeper_api
clean_api: CleanAPI = CleanAPI(status_db=context.obj.status_db, housekeeper_api=housekeeper_api)

Expand All @@ -172,7 +172,7 @@ def hk_case_bundle_files(context: CGConfig, days_old: int, dry_run: bool = False

@clean.command("hk-bundle-files")
@click.option("-c", "--case-id", type=str, required=False)
@click.option("-p", "--pipeline", type=Workflow, required=False)
@click.option("-w", "--workflow", type=Workflow, required=False)
@click.option("-t", "--tags", multiple=True, required=True)
@click.option("-o", "--days-old", type=int, default=30)
@DRY_RUN
Expand All @@ -182,7 +182,7 @@ def hk_bundle_files(
case_id: str | None,
tags: list,
days_old: int | None,
pipeline: Workflow | None,
workflow: Workflow | None,
dry_run: bool,
):
"""Remove files found in Housekeeper bundles."""
Expand All @@ -195,13 +195,13 @@ def hk_bundle_files(
function_dispatcher: Dispatcher = Dispatcher(
functions=[
status_db.get_analyses_started_at_before,
status_db.get_analyses_for_case_and_pipeline_started_at_before,
status_db.get_analyses_for_pipeline_started_at_before,
status_db.get_analyses_for_case_and_workflow_started_at_before,
status_db.get_analyses_for_workflow_started_at_before,
status_db.get_analyses_for_case_started_at_before,
],
input_dict={
"case_internal_id": case_id,
"pipeline": pipeline,
"workflow": workflow,
"started_at_before": date_threshold,
},
)
Expand All @@ -218,15 +218,15 @@ def hk_bundle_files(
LOG.warning(
f"Version not found for "
f"bundle:{bundle_name}; "
f"pipeline: {analysis.pipeline}; "
f"workflow: {analysis.pipeline}; "
f"date {analysis.started_at}"
)
continue

LOG.info(
f"Version found for "
f"bundle:{bundle_name}; "
f"pipeline: {analysis.pipeline}; "
f"workflow: {analysis.pipeline}; "
f"date {analysis.started_at}"
)
version_files: list[File] = housekeeper_api.get_files(
Expand Down
11 changes: 4 additions & 7 deletions cg/cli/delete/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
from sqlalchemy.orm import Query

from cg.cli.upload.observations.utils import get_observations_api, get_observations_case
from cg.cli.workflow.commands import (
ARGUMENT_CASE_ID,
OPTION_LOQUSDB_SUPPORTED_PIPELINES,
)
from cg.cli.workflow.commands import ARGUMENT_CASE_ID, OPTION_LOQUSDB_SUPPORTED_WORKFLOW
from cg.constants.constants import DRY_RUN, SKIP_CONFIRMATION, Workflow
from cg.exc import CaseNotFoundError, LoqusdbError
from cg.meta.observations.balsamic_observations_api import BalsamicObservationsAPI
Expand Down Expand Up @@ -44,17 +41,17 @@ def delete_observations(context: CGConfig, case_id: str, dry_run: bool, yes: boo


@click.command("available-observations")
@OPTION_LOQUSDB_SUPPORTED_PIPELINES
@OPTION_LOQUSDB_SUPPORTED_WORKFLOW
@SKIP_CONFIRMATION
@DRY_RUN
@click.pass_context
def delete_available_observations(
context: click.Context, pipeline: Workflow | None, dry_run: bool, yes: bool
context: click.Context, workflow: Workflow | None, dry_run: bool, yes: bool
):
"""Delete available observation from Loqusdb."""

status_db: Store = context.obj.status_db
uploaded_observations: Query = status_db.observations_uploaded(pipeline)
uploaded_observations: Query = status_db.observations_uploaded(workflow)

LOG.info(
f"This would delete observations for the following cases: {[case.internal_id for case in uploaded_observations]}"
Expand Down
16 changes: 8 additions & 8 deletions cg/cli/generate/report/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
ARGUMENT_CASE_ID,
OPTION_DRY_RUN,
OPTION_FORCE_REPORT,
OPTION_PIPELINE,
OPTION_STARTED_AT,
OPTION_WORKFLOW,
)
from cg.cli.generate.report.utils import (
get_report_analysis_started,
get_report_api,
get_report_api_pipeline,
get_report_api_workflow,
get_report_case,
)
from cg.constants import EXIT_FAIL, EXIT_SUCCESS, Workflow
Expand Down Expand Up @@ -85,23 +85,23 @@ def generate_delivery_report(


@click.command("available-delivery-reports")
@OPTION_PIPELINE
@OPTION_WORKFLOW
@OPTION_FORCE_REPORT
@OPTION_DRY_RUN
@click.pass_context
def generate_available_delivery_reports(
context: click.Context, pipeline: Workflow, force_report: bool, dry_run: bool
context: click.Context, workflow: Workflow, force_report: bool, dry_run: bool
) -> None:
"""Generates delivery reports for all cases that need one and stores them in housekeeper."""
"""Generates delivery reports for all cases that need one and stores them in Housekeeper."""

click.echo(click.style("--------------- AVAILABLE DELIVERY REPORTS ---------------"))

exit_code = EXIT_SUCCESS

report_api: ReportAPI = get_report_api_pipeline(context, pipeline)
context.obj.meta_apis["report_api"] = report_api if pipeline else None
report_api: ReportAPI = get_report_api_workflow(context=context, workflow=workflow)
context.obj.meta_apis["report_api"] = report_api if workflow else None

cases_without_delivery_report = report_api.get_cases_without_delivery_report(pipeline)
cases_without_delivery_report = report_api.get_cases_without_delivery_report(workflow)
if not cases_without_delivery_report:
click.echo(
click.style(
Expand Down
10 changes: 5 additions & 5 deletions cg/cli/generate/report/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

import click

from cg.constants import REPORT_SUPPORTED_PIPELINES
from cg.constants import REPORT_SUPPORTED_WORKFLOW

ARGUMENT_CASE_ID = click.argument(
"case_id",
required=False,
type=str,
)

OPTION_PIPELINE = click.option(
"--pipeline",
type=click.Choice(REPORT_SUPPORTED_PIPELINES),
help="Limit delivery report generation to a specific pipeline",
OPTION_WORKFLOW = click.option(
"--workflow",
type=click.Choice(REPORT_SUPPORTED_WORKFLOW),
help="Limit delivery report generation to a specific workflow",
)

OPTION_STARTED_AT = click.option(
Expand Down
26 changes: 13 additions & 13 deletions cg/cli/generate/report/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from cg.constants import (
REPORT_SUPPORTED_DATA_DELIVERY,
REPORT_SUPPORTED_PIPELINES,
REPORT_SUPPORTED_WORKFLOW,
Workflow,
)
from cg.meta.report.balsamic import BalsamicReportAPI
Expand Down Expand Up @@ -38,13 +38,13 @@ def get_report_case(context: click.Context, case_id: str) -> Case:
# Missing or not valid internal case ID
if not case_id or not case:
LOG.warning("Invalid case ID. Retrieving available cases.")
pipeline: Workflow = (
report_api.analysis_api.pipeline if context.obj.meta_apis.get("report_api") else None
workflow: Workflow = (
report_api.analysis_api.workflow if context.obj.meta_apis.get("report_api") else None
)
cases_without_delivery_report: list[Case] = (
report_api.get_cases_without_delivery_report(pipeline=pipeline)
report_api.get_cases_without_delivery_report(workflow=workflow)
if not context.obj.meta_apis.get("upload_api")
else report_api.get_cases_without_uploaded_delivery_report(pipeline=pipeline)
else report_api.get_cases_without_uploaded_delivery_report(workflow=workflow)
)
if not cases_without_delivery_report:
click.echo(
Expand All @@ -57,9 +57,9 @@ def get_report_case(context: click.Context, case_id: str) -> Case:
for case in cases_without_delivery_report:
click.echo(f"{case.internal_id} ({case.data_analysis})")
raise click.Abort
if case.data_analysis not in REPORT_SUPPORTED_PIPELINES:
if case.data_analysis not in REPORT_SUPPORTED_WORKFLOW:
LOG.error(
f"The {case.data_analysis} pipeline does not support delivery reports (case: {case.internal_id})"
f"The {case.data_analysis} workflow does not support delivery reports (case: {case.internal_id})"
)
raise click.Abort
if case.data_delivery not in REPORT_SUPPORTED_DATA_DELIVERY:
Expand All @@ -74,13 +74,13 @@ def get_report_api(context: click.Context, case: Case) -> ReportAPI:
"""Returns a report API to be used for the delivery report generation."""
if context.obj.meta_apis.get("report_api"):
return context.obj.meta_apis.get("report_api")
return get_report_api_pipeline(context, case.data_analysis)
return get_report_api_workflow(context, case.data_analysis)


def get_report_api_pipeline(context: click.Context, pipeline: Workflow) -> ReportAPI:
"""Resolves the report API given a specific pipeline."""
# Default report API pipeline: MIP-DNA
pipeline: Workflow = pipeline if pipeline else Workflow.MIP_DNA
def get_report_api_workflow(context: click.Context, workflow: Workflow) -> ReportAPI:
"""Return the report API given a specific workflow."""
# Default report API workflow: MIP-DNA
workflow: Workflow = workflow if workflow else Workflow.MIP_DNA
dispatch_report_api: dict[Workflow, ReportAPI] = {
Workflow.BALSAMIC: BalsamicReportAPI(
config=context.obj, analysis_api=BalsamicAnalysisAPI(config=context.obj)
Expand All @@ -98,7 +98,7 @@ def get_report_api_pipeline(context: click.Context, pipeline: Workflow) -> Repor
config=context.obj, analysis_api=RnafusionAnalysisAPI(config=context.obj)
),
}
return dispatch_report_api.get(pipeline)
return dispatch_report_api.get(workflow)


def get_report_analysis_started(
Expand Down
10 changes: 5 additions & 5 deletions cg/cli/upload/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,21 @@ def upload(context: click.Context, case_id: str | None, restart: bool):


@upload.command("auto")
@click.option("--pipeline", type=EnumChoice(Workflow), help="Limit to specific pipeline")
@click.option("--workflow", type=EnumChoice(Workflow), help="Limit to specific workflow")
@click.pass_context
def upload_all_completed_analyses(context: click.Context, pipeline: Workflow = None):
"""Upload all completed analyses"""
def upload_all_completed_analyses(context: click.Context, workflow: Workflow = None):
"""Upload all completed analyses."""

LOG.info("----------------- AUTO -----------------")

status_db: Store = context.obj.status_db

exit_code = 0
for analysis_obj in status_db.get_analyses_to_upload(pipeline=pipeline):
for analysis_obj in status_db.get_analyses_to_upload(workflow=workflow):
if analysis_obj.case.analyses[0].uploaded_at is not None:
LOG.warning(
f"Skipping upload for case {analysis_obj.case.internal_id}. "
f"It has been already uploaded at {analysis_obj.case.analyses[0].uploaded_at}."
f"Case has been already uploaded at {analysis_obj.case.analyses[0].uploaded_at}."
)
continue

Expand Down
2 changes: 1 addition & 1 deletion cg/cli/upload/clinical_delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def auto_fastq(context: click.Context, dry_run: bool):
exit_code: int = EXIT_SUCCESS
status_db: Store = context.obj.status_db
trailblazer_api: TrailblazerAPI = context.obj.trailblazer_api
for analysis_obj in status_db.get_analyses_to_upload(pipeline=Workflow.FASTQ):
for analysis_obj in status_db.get_analyses_to_upload(workflow=Workflow.FASTQ):
if analysis_obj.case.analyses[0].uploaded_at:
LOG.debug(
f"Newer analysis already uploaded for {analysis_obj.case.internal_id}, skipping"
Expand Down
10 changes: 5 additions & 5 deletions cg/cli/upload/observations/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from cg.cli.workflow.commands import (
ARGUMENT_CASE_ID,
OPTION_DRY,
OPTION_LOQUSDB_SUPPORTED_PIPELINES,
OPTION_LOQUSDB_SUPPORTED_WORKFLOW,
)
from cg.constants.constants import Workflow
from cg.exc import CaseNotFoundError, LoqusdbError
Expand Down Expand Up @@ -51,21 +51,21 @@ def upload_observations_to_loqusdb(context: CGConfig, case_id: str | None, dry_r


@click.command("available-observations")
@OPTION_LOQUSDB_SUPPORTED_PIPELINES
@OPTION_LOQUSDB_SUPPORTED_WORKFLOW
@OPTION_DRY
@click.pass_context
def upload_available_observations_to_loqusdb(
context: click.Context, pipeline: Workflow | None, dry_run: bool
context: click.Context, workflow: Workflow | None, dry_run: bool
):
"""Uploads the available observations to Loqusdb."""

click.echo(click.style("----------------- AVAILABLE OBSERVATIONS -----------------"))

status_db: Store = context.obj.status_db
cases_to_upload: Query = status_db.observations_to_upload(pipeline=pipeline)
cases_to_upload: Query = status_db.observations_to_upload(workflow=workflow)
if not cases_to_upload:
LOG.error(
f"There are no available cases to upload to Loqusdb for {pipeline} ({datetime.now()})"
f"There are no available cases to upload to Loqusdb for {workflow} ({datetime.now()})"
)
return

Expand Down
Loading

0 comments on commit e05b1dc

Please sign in to comment.