Skip to content

Commit

Permalink
fix(new delivery service calls in CLI) (#3673)
Browse files Browse the repository at this point in the history
# description

add new service to cli
  • Loading branch information
ChrOertlin authored Sep 3, 2024
1 parent 01a9187 commit df06b43
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 431 deletions.
210 changes: 74 additions & 136 deletions cg/cli/deliver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,27 @@
from cg.apps.tb import TrailblazerAPI
from cg.cli.utils import CLICK_CONTEXT_SETTINGS
from cg.constants.cli_options import DRY_RUN
from cg.constants.delivery import PIPELINE_ANALYSIS_OPTIONS, PIPELINE_ANALYSIS_TAG_MAP
from cg.meta.deliver import DeliverAPI, DeliverTicketAPI
from cg.constants.delivery import FileDeliveryOption
from cg.meta.rsync.rsync_api import RsyncAPI
from cg.models.cg_config import CGConfig
from cg.services.fastq_concatenation_service.fastq_concatenation_service import (
FastqConcatenationService,
from cg.services.file_delivery.deliver_files_service.deliver_files_service import (
DeliverFilesService,
)
from cg.services.file_delivery.deliver_files_service.deliver_files_service_factory import (
DeliveryServiceFactory,
)
from cg.store.models import Case
from cg.store.store import Store

LOG = logging.getLogger(__name__)

DELIVERY_TYPE = click.option(
"-d",
"--delivery-type",
multiple=True,
type=click.Choice(PIPELINE_ANALYSIS_OPTIONS),
required=True,
)
FORCE_ALL = click.option(
"--force-all",
help=(
"Force delivery of all sample files "
"- disregarding of amount of reads or previous deliveries"
),
is_flag=True,
)
TICKET_ID_ARG = click.argument("ticket", type=str, required=True)

IGNORE_MISSING_BUNDLES = click.option(
"-i",
"--ignore-missing-bundles",
help="Ignore errors due to missing case bundles",
is_flag=True,
default=False,
multiple=False,
type=click.Choice(choices=[option for option in FileDeliveryOption]),
required=False,
)
TICKET_ID_ARG = click.option("-t", "--ticket", type=str, required=True)


@click.group(context_settings=CLICK_CONTEXT_SETTINGS)
Expand All @@ -52,74 +37,10 @@ def deliver():
LOG.info("Running CG deliver")


@deliver.command(name="analysis")
@DRY_RUN
@DELIVERY_TYPE
@click.option("-c", "--case-id", help="Deliver the files for a specific case")
@click.option(
"-t", "--ticket", type=str, help="Deliver the files for ALL cases connected to a ticket"
)
@FORCE_ALL
@IGNORE_MISSING_BUNDLES
@click.pass_obj
def deliver_analysis(
context: CGConfig,
case_id: str | None,
ticket: str | None,
delivery_type: list[str],
dry_run: bool,
force_all: bool,
ignore_missing_bundles: bool,
):
"""Deliver analysis files to customer inbox
Files can be delivered either on case level or for all cases connected to a ticket.
Any of those needs to be specified.
"""
if not (case_id or ticket):
LOG.info("Please provide a case-id or ticket-id")
return

inbox: str = context.delivery_path
if not inbox:
LOG.info("Please specify the root path for where files should be delivered")
return

status_db: Store = context.status_db
for delivery in delivery_type:
deliver_api = DeliverAPI(
store=status_db,
hk_api=context.housekeeper_api,
case_tags=PIPELINE_ANALYSIS_TAG_MAP[delivery]["case_tags"],
sample_tags=PIPELINE_ANALYSIS_TAG_MAP[delivery]["sample_tags"],
project_base_path=Path(inbox),
delivery_type=delivery,
force_all=force_all,
ignore_missing_bundles=ignore_missing_bundles,
fastq_file_service=FastqConcatenationService(),
)
deliver_api.set_dry_run(dry_run)
cases: list[Case] = []
if case_id:
case_obj: Case = status_db.get_case_by_internal_id(internal_id=case_id)
if not case_obj:
LOG.warning(f"Could not find case {case_id}")
return
cases.append(case_obj)
else:
cases: list[Case] = status_db.get_cases_by_ticket_id(ticket_id=ticket)
if not cases:
LOG.warning(f"Could not find cases for ticket {ticket}")
return

for case_obj in cases:
deliver_api.deliver_files(case_obj=case_obj)


@deliver.command(name="rsync")
@DRY_RUN
@TICKET_ID_ARG
@click.pass_obj
@TICKET_ID_ARG
@DRY_RUN
def rsync(context: CGConfig, ticket: str, dry_run: bool):
"""The folder generated using the "cg deliver analysis" command will be
rsynced with this function to the customers inbox on the delivery server
Expand All @@ -133,60 +54,77 @@ def rsync(context: CGConfig, ticket: str, dry_run: bool):
)


@deliver.command(name="concatenate")
@deliver.command(name="case")
@click.pass_obj
@click.option(
"-c",
"--case-id",
required=True,
help="Deliver the files for a specific case",
)
@DELIVERY_TYPE
@DRY_RUN
@TICKET_ID_ARG
@click.pass_context
def concatenate(context: click.Context, ticket: str, dry_run: bool):
"""The fastq files in the folder generated using "cg deliver analysis"
will be concatenated into one forward and one reverse fastq file
def deliver_case(
context: CGConfig,
case_id: str,
delivery_type: FileDeliveryOption,
dry_run: bool,
):
"""
cg_context: CGConfig = context.obj
deliver_ticket_api = DeliverTicketAPI(config=cg_context)
deliver_ticket_api.concatenate_fastq_files(ticket=ticket, dry_run=dry_run)
Deliver all case files based on delivery type to the customer inbox on the HPC
"""
inbox: str = context.delivery_path
rsync_api: RsyncAPI = RsyncAPI(config=context)
service_builder = DeliveryServiceFactory(
store=context.status_db,
hk_api=context.housekeeper_api,
tb_service=context.trailblazer_api,
rsync_service=rsync_api,
)
case: Case = context.status_db.get_case_by_internal_id(internal_id=case_id)
if not case:
LOG.error(f"Could not find case with id {case_id}")
return
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=delivery_type if delivery_type else case.data_delivery,
workflow=case.workflow,
)
delivery_service.deliver_files_for_case(
case=case, delivery_base_path=Path(inbox), dry_run=dry_run
)


@deliver.command(name="ticket")
@click.pass_obj
@TICKET_ID_ARG
@DELIVERY_TYPE
@DRY_RUN
@FORCE_ALL
@IGNORE_MISSING_BUNDLES
@click.option(
"-t",
"--ticket",
type=str,
help="Deliver and rsync the files for ALL cases connected to a ticket",
required=True,
)
@click.pass_context
def deliver_ticket(
context: click.Context,
delivery_type: list[str],
dry_run: bool,
force_all: bool,
context: CGConfig,
ticket: str,
ignore_missing_bundles: bool,
delivery_type: FileDeliveryOption,
dry_run: bool,
):
"""Will first collect hard links in the customer inbox then
concatenate fastq files if needed and finally send the folder
from customer inbox hasta to the customer inbox on the delivery server
"""
cg_context: CGConfig = context.obj
deliver_ticket_api = DeliverTicketAPI(config=cg_context)
is_upload_needed = deliver_ticket_api.check_if_upload_is_needed(ticket=ticket)
if is_upload_needed or force_all:
LOG.info("Delivering files to customer inbox on the HPC")
context.invoke(
deliver_analysis,
delivery_type=delivery_type,
dry_run=dry_run,
force_all=force_all,
ticket=ticket,
ignore_missing_bundles=ignore_missing_bundles,
)
else:
LOG.info("Files already delivered to customer inbox on the HPC")
return
Deliver all case files based on delivery type to the customer inbox on the HPC for cases connected to a ticket.
"""
inbox: str = context.delivery_path
rsync_api: RsyncAPI = RsyncAPI(config=context)
service_builder = DeliveryServiceFactory(
store=context.status_db,
hk_api=context.housekeeper_api,
tb_service=context.trailblazer_api,
rsync_service=rsync_api,
)

deliver_ticket_api.report_missing_samples(ticket=ticket, dry_run=dry_run)
context.invoke(rsync, ticket=ticket, dry_run=dry_run)
cases: list[Case] = context.status_db.get_cases_by_ticket_id(ticket_id=ticket)
if not cases:
LOG.error(f"Could not find case connected to ticket {ticket}")
return
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=delivery_type if delivery_type else cases[0].data_delivery,
workflow=cases[0].workflow,
)
delivery_service.deliver_files_for_ticket(
ticket_id=ticket, delivery_base_path=Path(inbox), dry_run=dry_run
)
8 changes: 8 additions & 0 deletions cg/constants/delivery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Constants for delivery."""

from enum import StrEnum

from cg.constants.constants import Workflow
from cg.constants.housekeeper_tags import (
AlignmentFileTag,
Expand Down Expand Up @@ -206,3 +208,9 @@

INBOX_NAME: str = "inbox"
OUTBOX_NAME: str = "outbox"


class FileDeliveryOption(StrEnum):
FASTQ: str = "fastq"
ANALYSIS: str = "analysis"
FASTQ_ANALYSIS: str = "fastq-analysis"
1 change: 1 addition & 0 deletions cg/constants/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
EXIT_SUCCESS = 0
EXIT_WARNING = 8
EXIT_FAIL = 1
EXIT_PARSE_ERROR = 2
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from pathlib import Path

from cg.apps.tb import TrailblazerAPI
from cg.meta.rsync import RsyncAPI
from cg.services.file_delivery.fetch_file_service.fetch_delivery_files_service import (
FetchDeliveryFilesService,
)
Expand All @@ -9,6 +12,9 @@
from cg.services.file_delivery.move_files_service.move_delivery_files_service import (
MoveDeliveryFilesService,
)
from cg.store.exc import EntryNotFoundError
from cg.store.models import Case
from cg.store.store import Store


class DeliverFilesService:
Expand All @@ -24,15 +30,46 @@ def __init__(
delivery_file_manager_service: FetchDeliveryFilesService,
move_file_service: MoveDeliveryFilesService,
file_formatter_service: DeliveryFileFormattingService,
rsync_service: RsyncAPI,
tb_service: TrailblazerAPI,
status_db: Store,
):
self.file_manager = delivery_file_manager_service
self.file_mover = move_file_service
self.file_formatter = file_formatter_service
self.status_db = status_db
self.rsync_service = rsync_service
self.tb_service = tb_service

def deliver_files_for_case(self, case_id: str, delivery_base_path: Path) -> None:
"""Deliver the files to the customer folder."""
delivery_files: DeliveryFiles = self.file_manager.get_files_to_deliver(case_id)
def deliver_files_for_case(
self, case: Case, delivery_base_path: Path, dry_run: bool = False
) -> None:
"""Deliver the files for a case to the customer folder."""
delivery_files: DeliveryFiles = self.file_manager.get_files_to_deliver(
case_id=case.internal_id
)
moved_files: DeliveryFiles = self.file_mover.move_files(
delivery_files=delivery_files, delivery_base_path=delivery_base_path
)
self.file_formatter.format_files(moved_files)
slurm_id: int = self.rsync_service.run_rsync_on_slurm(
ticket=case.latest_ticket, dry_run=dry_run
)
self.rsync_service.add_to_trailblazer_api(
tb_api=self.tb_service,
slurm_job_id=slurm_id,
ticket=case.latest_ticket,
dry_run=dry_run,
)

def deliver_files_for_ticket(
self, ticket_id: str, delivery_base_path: Path, dry_run: bool = False
) -> None:
"""Deliver the files for all cases in a ticket to the customer folder."""
cases: list[Case] = self.status_db.get_cases_by_ticket_id(ticket_id)
if not cases:
raise EntryNotFoundError(f"No cases found for ticket {ticket_id}")
for case in cases:
self.deliver_files_for_case(
case=case, delivery_base_path=delivery_base_path, dry_run=dry_run
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Module for the factory of the deliver files service."""

from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.apps.tb import TrailblazerAPI
from cg.constants import Workflow, DataDelivery
from cg.meta.rsync import RsyncAPI
from cg.services.fastq_concatenation_service.fastq_concatenation_service import (
FastqConcatenationService,
)
Expand Down Expand Up @@ -51,9 +53,17 @@
class DeliveryServiceFactory:
"""Class to build the delivery services based on workflow and delivery type."""

def __init__(self, store: Store, hk_api: HousekeeperAPI):
def __init__(
self,
store: Store,
hk_api: HousekeeperAPI,
rsync_service: RsyncAPI,
tb_service: TrailblazerAPI,
):
self.store = store
self.hk_api = hk_api
self.rsync_service = rsync_service
self.tb_service = tb_service

@staticmethod
def _get_file_tag_fetcher(delivery_type: DataDelivery) -> FetchDeliveryFileTagsService:
Expand Down Expand Up @@ -103,4 +113,7 @@ def build_delivery_service(
delivery_file_manager_service=file_fetcher,
move_file_service=MoveDeliveryFilesService(),
file_formatter_service=file_formatter,
status_db=self.store,
rsync_service=self.rsync_service,
tb_service=self.tb_service,
)
Loading

0 comments on commit df06b43

Please sign in to comment.