Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend Lims sample model #3473

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 62.2.3
current_version = 62.1.0
commit = True
tag = True
tag_name = v{new_version}
Expand Down
2 changes: 1 addition & 1 deletion cg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__title__ = "cg"
__version__ = "62.2.3"
__version__ = "62.1.0"
17 changes: 4 additions & 13 deletions cg/apps/invoice/render.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import datetime as dt
from pathlib import Path

from openpyxl import Workbook, load_workbook
from openpyxl.styles import Border, Font, PatternFill, Side

from cg.constants import FileExtensions
from cg.utils.files import get_project_root_dir
from pkg_resources import resource_filename


def render_xlsx(data: dict) -> Workbook:
Expand Down Expand Up @@ -37,17 +34,11 @@ def render_xlsx(data: dict) -> Workbook:
}]
}
"""
project_root_dir = get_project_root_dir()
pkg_dir = __name__.rpartition(".")[0]
sample_type = "pool" if data["pooled_samples"] else "sample"
costcenter = data["cost_center"]
template_path = Path(
project_root_dir,
"apps",
"invoice",
"templates",
f"{costcenter}_{sample_type}_invoice{FileExtensions.XLSX}",
)
workbook = load_workbook(template_path.as_posix())
template_path = resource_filename(pkg_dir, f"templates/{costcenter}_{sample_type}_invoice.xlsx")
workbook = load_workbook(template_path)
if data["pooled_samples"]:
worksheet = workbook["Bilaga Prover"]
worksheet["C1"] = costcenter.upper()
Expand Down
111 changes: 52 additions & 59 deletions cg/apps/lims/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging
from datetime import date, datetime
from typing import Any

from dateutil.parser import parse as parse_date
from genologics.entities import Artifact, Process, Researcher, Sample
Expand All @@ -20,6 +19,7 @@
from cg.constants.priority import Priority
from cg.exc import LimsDataError

from ...models.lims.sample import LimsProject, LimsSample
from .order import OrderHandler

SEX_MAP = {"F": "female", "M": "male", "Unknown": "unknown", "unknown": "unknown"}
Expand Down Expand Up @@ -56,12 +56,12 @@ def __init__(self, config):
def user(self) -> Researcher:
return self.get_researchers(username=self.username)[0]

def sample(self, lims_id: str) -> dict[str, Any]:
def sample(self, lims_id: str) -> LimsSample:
"""Return sample by ID from the LIMS database."""
lims_sample = {}
try:
sample = Sample(self, id=lims_id)
lims_sample: dict[str, Any] = self._export_sample(sample)
lims_sample: LimsSample = self._export_sample(sample)
except HTTPError as error:
LOG.warning(f"Sample {lims_id} not found in LIMS: {error}")
return lims_sample
Expand All @@ -74,45 +74,45 @@ def get_source(self, lims_id: str) -> str | None:
"""Return the source from LIMS for a given sample ID.
Return 'None' if no source information is set or
if sample is not found or cannot be fetched from LIMS."""
lims_sample: dict[str, Any] = self.sample(lims_id=lims_id)
return lims_sample.get("source")
lims_sample: LimsSample = self.sample(lims_id=lims_id)
return lims_sample.source

@staticmethod
def _export_project(lims_project) -> dict:
def _export_project(lims_project) -> LimsProject:
"""Fetch relevant information from a lims project object"""
return {
"id": lims_project.id,
"name": lims_project.name,
"date": parse_date(lims_project.open_date) if lims_project.open_date else None,
}
return LimsProject(
id=lims_project.id,
name=lims_project.name,
date=parse_date(lims_project.open_date) if lims_project.open_date else None,
)

def _export_sample(self, lims_sample):
"""Get data from a LIMS sample."""
udfs = lims_sample.udf
return {
"id": lims_sample.id,
"name": lims_sample.name,
"project": self._export_project(lims_sample.project),
"family": udfs.get("familyID"),
"customer": udfs.get("customer"),
"sex": SEX_MAP.get(udfs.get("Gender"), None),
"father": udfs.get("fatherID"),
"mother": udfs.get("motherID"),
"source": udfs.get("Source"),
"status": udfs.get("Status"),
"panels": udfs.get("Gene List").split(";") if udfs.get("Gene List") else None,
"priority": udfs.get("priority"),
"received": self.get_received_date(lims_sample.id),
"application": udfs.get("Sequencing Analysis"),
"application_version": (
return LimsSample(
id=lims_sample.id,
name=lims_sample.name,
project=self._export_project(lims_sample.project),
case=udfs.get("familyID"),
customer=udfs.get("customer"),
sex=SEX_MAP.get(udfs.get("Gender")),
father=udfs.get("fatherID"),
mother=udfs.get("motherID"),
source=udfs.get("Source"),
status=udfs.get("Status"),
panels=udfs.get("Gene List").split(";") if udfs.get("Gene List") else None,
priority=udfs.get("priority"),
received=self.get_received_date(lims_sample.id),
application=udfs.get("Sequencing Analysis"),
application_version=(
int(udfs["Application Tag Version"])
if udfs.get("Application Tag Version")
else None
),
"comment": udfs.get("comment"),
"concentration_ng_ul": udfs.get("Concentration (ng/ul)"),
"passed_initial_qc": udfs.get("Passed Initial QC"),
}
comment=udfs.get("comment"),
concentration_ng_ul=udfs.get("Concentration (ng/ul)"),
passed_initial_qc=udfs.get("Passed Initial QC"),
)

def get_received_date(self, lims_id: str) -> date:
"""Get the date when a sample was received."""
Expand Down Expand Up @@ -182,21 +182,23 @@ def family(self, customer: str, family: str):
"""Fetch information about a family of samples."""
filters = {"customer": customer, "familyID": family}
lims_samples = self.get_samples(udf=filters)
samples_data = [self._export_sample(lims_sample) for lims_sample in lims_samples]
samples_data: list[LimsSample] = [
self._export_sample(lims_sample) for lims_sample in lims_samples
]
# get family level data
family_data = {"family": family, "customer": customer, "samples": []}
priorities = set()
panels = set()

for sample_data in samples_data:
priorities.add(sample_data["priority"])
if sample_data["panels"]:
panels.update(sample_data["panels"])
priorities.add(sample_data.priority)
if sample_data.panels:
panels.update(sample_data.panels)
family_data["samples"].append(sample_data)

if len(priorities) == 1:
family_data["priority"] = priorities.pop()
elif len(priorities) < 1:
elif not priorities:
raise LimsDataError(f"unable to determine family priority: {priorities}")
else:
for prio in [
Expand All @@ -223,8 +225,7 @@ def update_sample(
lims_sample = Sample(self, id=lims_id)

if sex:
lims_gender = REV_SEX_MAP.get(sex)
if lims_gender:
if lims_gender := REV_SEX_MAP.get(sex):
lims_sample.udf[PROP2UDF["sex"]] = lims_gender
if name:
lims_sample.name = name
Expand Down Expand Up @@ -309,9 +310,7 @@ def _get_methods(self, step_names_udfs: dict[str, dict], lims_id: str) -> str |
)
)

sorted_methods = self._sort_by_date_run(methods)

if sorted_methods:
if sorted_methods := self._sort_by_date_run(methods):
method = sorted_methods[METHOD_INDEX]

if (
Expand Down Expand Up @@ -404,18 +403,18 @@ def _find_twist_capture_kits(artifacts, udf_key):

def get_sample_comment(self, sample_id: str) -> str | None:
"""Return the comment of the sample."""
lims_sample: dict[str, Any] = self.sample(sample_id)
lims_sample: LimsSample = self.sample(sample_id)
comment = None
if lims_sample:
comment: str = lims_sample.get("comment")
comment: str | None = lims_sample.comment
return comment

def get_sample_project(self, sample_id: str) -> str | None:
"""Return the LIMS ID of the sample associated project if sample exists in LIMS."""
lims_sample: dict[str, Any] = self.sample(sample_id)
lims_sample: LimsSample = self.sample(sample_id)
project_id = None
if lims_sample:
project_id: str = lims_sample.get("project").get("id")
project_id: str = lims_sample.project.id
return project_id

def get_sample_rin(self, sample_id: str) -> float | None:
Expand All @@ -440,8 +439,8 @@ def get_sample_dv200(self, sample_id: str) -> float | None:

def has_sample_passed_initial_qc(self, sample_id: str) -> bool | None:
"""Return the outcome of the initial QC protocol of the given sample."""
lims_sample: dict[str, Any] = self.sample(sample_id)
initial_qc_udf: str | None = lims_sample.get("passed_initial_qc")
lims_sample: LimsSample = self.sample(sample_id)
initial_qc_udf: str | None = lims_sample.passed_initial_qc
initial_qc: bool | None = eval(initial_qc_udf) if initial_qc_udf else None
return initial_qc

Expand All @@ -450,21 +449,17 @@ def _get_rna_input_amounts(self, sample_id: str) -> list[tuple[datetime, float]]
step_names_udfs: dict[str] = MASTER_STEPS_UDFS["rna_prep_step"]
input_amounts: list[tuple[datetime, float]] = []
try:
for process_type in step_names_udfs:
for process_type, udf_key in step_names_udfs.items():
artifacts: list[Artifact] = self.get_artifacts(
samplelimsid=sample_id,
process_type=process_type,
type=LimsArtifactTypes.ANALYTE,
)

udf_key: str = step_names_udfs[process_type]
for artifact in artifacts:
input_amounts.append(
(
artifact.parent_process.date_run,
artifact.udf.get(udf_key),
)
)
input_amounts.extend(
(artifact.parent_process.date_run, artifact.udf.get(udf_key))
for artifact in artifacts
)
except HTTPError as error:
LOG.warning(f"Sample {sample_id} not found in LIMS: {error}")
return input_amounts
Expand All @@ -474,9 +469,7 @@ def _get_last_used_input_amount(
) -> float | None:
"""Return the latest used input amount."""
sorted_input_amounts: list[tuple[datetime, float]] = self._sort_by_date_run(input_amounts)
if not sorted_input_amounts:
return None
return sorted_input_amounts[0][1]
return sorted_input_amounts[0][1] if sorted_input_amounts else None

def get_latest_rna_input_amount(self, sample_id: str) -> float | None:
"""Return the input amount used in the latest preparation of an RNA sample."""
Expand Down
2 changes: 0 additions & 2 deletions cg/apps/tb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def add_pending_analysis(
workflow: Workflow = None,
ticket: str = None,
workflow_manager: str = WorkflowManager.Slurm,
tower_workflow_id: str | None = None,
) -> TrailblazerAnalysis:
request_body = {
"case_id": case_id,
Expand All @@ -131,7 +130,6 @@ def add_pending_analysis(
"workflow": workflow.upper(),
"ticket": ticket,
"workflow_manager": workflow_manager,
"tower_workflow_id": tower_workflow_id,
}
LOG.debug(f"Submitting job to Trailblazer: {request_body}")
if response := self.query_trailblazer(
Expand Down
6 changes: 1 addition & 5 deletions cg/cli/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,7 @@ def get_sequencing_run(context: click.Context, samples: bool, flow_cell_id: str)
sequencing_run.device.internal_id,
sequencing_run.sequencer_type,
sequencing_run.sequencer_name,
(
sequencing_run.sequencing_started_at.date()
if sequencing_run.sequencing_started_at
else "Not available"
),
sequencing_run.sequencing_started_at.date(),
sequencing_run.sequencing_completed_at.date(),
sequencing_run.archived_at.date() if sequencing_run.archived_at else "No",
sequencing_run.data_availability,
Expand Down
1 change: 1 addition & 0 deletions cg/cli/post_process/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ def post_process_sequencing_run(context: CGConfig, run_name: str, dry_run: bool)
post_processing_service.post_process(run_name=run_name, dry_run=dry_run)


post_process_group: click.Group
post_process_group.add_command(post_process_sequencing_run)
50 changes: 3 additions & 47 deletions cg/cli/workflow/mutant/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
link,
resolve_compression,
store,
store_available,
)
from cg.constants import EXIT_FAIL, EXIT_SUCCESS
from cg.constants.cli_options import DRY_RUN
from cg.exc import AnalysisNotReadyError, CgError
from cg.meta.workflow.analysis import AnalysisAPI
from cg.meta.workflow.mutant import MutantAnalysisAPI
from cg.models.cg_config import CGConfig
from cg.store.models import Case

LOG = logging.getLogger(__name__)

Expand All @@ -32,6 +32,7 @@ def mutant(context: click.Context) -> None:
mutant.add_command(resolve_compression)
mutant.add_command(link)
mutant.add_command(store)
mutant.add_command(store_available)


@mutant.command("config-case")
Expand Down Expand Up @@ -74,6 +75,7 @@ def start(context: click.Context, dry_run: bool, case_id: str, config_artic: str
context.invoke(link, case_id=case_id, dry_run=dry_run)
context.invoke(config_case, case_id=case_id, dry_run=dry_run)
context.invoke(run, case_id=case_id, dry_run=dry_run, config_artic=config_artic)
context.invoke(store, case_id=case_id, dry_run=dry_run)


@mutant.command("start-available")
Expand All @@ -98,49 +100,3 @@ def start_available(context: click.Context, dry_run: bool = False):
exit_code = EXIT_FAIL
if exit_code:
raise click.Abort


@mutant.command("store-available")
@DRY_RUN
@click.pass_context
def store_available(context: click.Context, dry_run: bool) -> None:
"""Run QC checks and store bundles for all finished analyses in Housekeeper."""

analysis_api: MutantAnalysisAPI = context.obj.meta_apis["analysis_api"]

exit_code: int = EXIT_SUCCESS

cases_ready_for_qc: list[Case] = analysis_api.get_cases_to_perform_qc_on()
LOG.info(f"Found {len(cases_ready_for_qc)} cases to perform QC on!")
for case in cases_ready_for_qc:
LOG.info(f"Performing QC on case {case.internal_id}.")
try:
analysis_api.run_qc_on_case(case=case, dry_run=dry_run)
except Exception:
exit_code = EXIT_FAIL

cases_to_store: list[Case] = analysis_api.get_cases_to_store()
LOG.info(f"Found {len(cases_to_store)} cases to store!")
for case in cases_to_store:
LOG.info(f"Storing deliverables for {case.internal_id}")
try:
context.invoke(store, case_id=case.internal_id, dry_run=dry_run)
except Exception as exception_object:
LOG.error(f"Error storingc {case.internal_id}: {exception_object}")
exit_code = EXIT_FAIL

if exit_code:
raise click.Abort


@mutant.command("run-qc")
@DRY_RUN
@ARGUMENT_CASE_ID
@click.pass_context
def run_qc(context: click.Context, case_id: str, dry_run: bool) -> None:
"""
Run QC on case and generate QC_report file.
"""
analysis_api: MutantAnalysisAPI = context.obj.meta_apis["analysis_api"]

analysis_api.run_qc(case_id=case_id, dry_run=dry_run)
Loading
Loading