diff --git a/src/gentropy/assets/schemas/amino_acid_variants.json b/src/gentropy/assets/schemas/amino_acid_variants.json index b2fce522b..7aa876401 100644 --- a/src/gentropy/assets/schemas/amino_acid_variants.json +++ b/src/gentropy/assets/schemas/amino_acid_variants.json @@ -14,7 +14,7 @@ }, { "metadata": {}, - "name": "inSilicoPredictors", + "name": "variantEffect", "nullable": true, "type": { "containsNull": true, diff --git a/src/gentropy/assets/schemas/variant_index.json b/src/gentropy/assets/schemas/variant_index.json index 1f1ef787c..6f5f5869e 100644 --- a/src/gentropy/assets/schemas/variant_index.json +++ b/src/gentropy/assets/schemas/variant_index.json @@ -32,7 +32,7 @@ }, { "metadata": {}, - "name": "inSilicoPredictors", + "name": "variantEffect", "nullable": true, "type": { "containsNull": true, diff --git a/src/gentropy/config.py b/src/gentropy/config.py index c5b6783fb..9bac9dbf0 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -436,6 +436,16 @@ class PanUKBBConfig(StepConfig): _target_: str = "gentropy.pan_ukb_ingestion.PanUKBBVariantIndexStep" +@dataclass +class LOFIngestionConfig(StepConfig): + """Step configuration for the ingestion of Loss-of-Function variant data generated by OTAR2075.""" + + lof_curation_dataset_path: str = MISSING + lof_curation_variant_annotations_path: str = MISSING + + _target_: str = "gentropy.lof_curation_ingestion.LOFIngestionStep" + + @dataclass class VariantIndexConfig(StepConfig): """Variant index step configuration.""" @@ -454,7 +464,7 @@ class _ConsequenceToPathogenicityScoreMap(TypedDict): ) vep_output_json_path: str = MISSING variant_index_path: str = MISSING - gnomad_variant_annotations_path: str | None = None + variant_annotations_path: list[str] | None = None hash_threshold: int = 300 consequence_to_pathogenicity_score: ClassVar[ list[_ConsequenceToPathogenicityScoreMap] @@ -739,6 +749,7 @@ def register_config() -> None: cs.store(group="step", name="pics", node=PICSConfig) cs.store(group="step", name="gnomad_variants", node=GnomadVariantConfig) cs.store(group="step", name="ukb_ppp_eur_sumstat_preprocess", node=UkbPppEurConfig) + cs.store(group="step", name="lof_curation_ingestion", node=LOFIngestionConfig) cs.store(group="step", name="variant_index", node=VariantIndexConfig) cs.store(group="step", name="variant_to_vcf", node=ConvertToVcfStepConfig) cs.store( diff --git a/src/gentropy/dataset/variant_index.py b/src/gentropy/dataset/variant_index.py index 6622c822c..0b0bb59d2 100644 --- a/src/gentropy/dataset/variant_index.py +++ b/src/gentropy/dataset/variant_index.py @@ -130,7 +130,7 @@ def add_annotation( """Import annotation from an other variant index dataset. At this point the annotation can be extended with extra cross-references, - in-silico predictions and allele frequencies. + variant effects, allele frequencies, and variant descriptions. Args: annotation_source (VariantIndex): Annotation to add to the dataset @@ -168,7 +168,12 @@ def add_annotation( f.col(column), f.col(f"{prefix}{column}"), fields_order ).alias(column) ) - # Non-array columns are coalesced: + # variantDescription columns are concatenated: + elif column == "variantDescription": + select_expressions.append( + f.concat_ws(" ", f.col(column), f.col(f"{prefix}{column}")).alias(column) + ) + # All other non-array columns are coalesced: else: select_expressions.append( f.coalesce(f.col(column), f.col(f"{prefix}{column}")).alias( @@ -279,7 +284,7 @@ def get_distance_to_gene( def annotate_with_amino_acid_consequences( self: VariantIndex, annotation: AminoAcidVariants ) -> VariantIndex: - """Enriching in silico predictors with amino-acid derived predicted consequences. + """Enriching variant effect assessments with amino-acid derived predicted consequences. Args: annotation (AminoAcidVariants): amio-acid level variant consequences. @@ -287,7 +292,7 @@ def annotate_with_amino_acid_consequences( Returns: VariantIndex: where amino-acid causing variants are enriched with extra annotation """ - w = Window.partitionBy("variantId").orderBy(f.size("inSilicoPredictors").desc()) + w = Window.partitionBy("variantId").orderBy(f.size("variantEffect").desc()) return VariantIndex( _df=self.df @@ -308,17 +313,17 @@ def annotate_with_amino_acid_consequences( ) # Joining with amino-acid predictions: .join( - annotation.df.withColumnRenamed("inSilicoPredictors", "annotations"), + annotation.df.withColumnRenamed("variantEffect", "annotations"), on=["uniprotAccession", "aminoAcidChange"], how="left", ) # Merge predictors: .withColumn( - "inSilicoPredictors", + "variantEffect", f.when( f.col("annotations").isNotNull(), - f.array_union("inSilicoPredictors", "annotations"), - ).otherwise(f.col("inSilicoPredictors")), + f.array_union("variantEffect", "annotations"), + ).otherwise(f.col("variantEffect")), ) # Dropping unused columns: .drop("uniprotAccession", "aminoAcidChange", "annotations") @@ -356,33 +361,33 @@ def get_loftee(self: VariantIndex) -> DataFrame: ) -class InSilicoPredictorNormaliser: - """Class to normalise in silico predictor assessments. +class VariantEffectNormaliser: + """Class to normalise variant effect assessments. Essentially based on the raw scores, it normalises the scores to a range between -1 and 1, and appends the normalised - value to the in silico predictor struct. + value to the variant effect struct. The higher negative values indicate increasingly confident prediction to be a benign variant, while the higher positive values indicate increasingly deleterious predicted effect. - The point of these operations to make the scores comparable across different in silico predictors. + The point of these operations to make the scores comparable across different variant effect assessments. """ @classmethod - def normalise_in_silico_predictors( - cls: type[InSilicoPredictorNormaliser], - in_silico_predictors: Column, + def normalise_variant_effect( + cls: type[VariantEffectNormaliser], + variant_effect: Column, ) -> Column: - """Normalise in silico predictors. Appends a normalised score to the in silico predictor struct. + """Normalise variant effect assessments. Appends a normalised score to the variant effect struct. Args: - in_silico_predictors (Column): Column containing in silico predictors (list of structs). + variant_effect (Column): Column containing variant effect assessments (list of structs). Returns: - Column: Normalised in silico predictors. + Column: Normalised variant effect assessments. """ return f.transform( - in_silico_predictors, + variant_effect, lambda predictor: f.struct( # Extracing all existing columns: predictor.method.alias("method"), @@ -399,20 +404,20 @@ def normalise_in_silico_predictors( @classmethod def resolve_predictor_methods( - cls: type[InSilicoPredictorNormaliser], + cls: type[VariantEffectNormaliser], score: Column, method: Column, assessment: Column, ) -> Column: - """It takes a score, a method, and an assessment, and returns a normalized score for the in silico predictor. + """It takes a score, a method, and an assessment, and returns a normalized score for the variant effect. Args: - score (Column): The raw score from the in silico predictor. + score (Column): The raw score from the variant effect. method (Column): The method used to generate the score. assessment (Column): The assessment of the score. Returns: - Column: Normalised score for the in silico predictor. + Column: Normalised score for the variant effect. """ return ( f.when(method == "LOFTEE", cls._normalise_loftee(assessment)) @@ -421,6 +426,7 @@ def resolve_predictor_methods( .when(method == "AlphaMissense", cls._normalise_alpha_missense(score)) .when(method == "CADD", cls._normalise_cadd(score)) .when(method == "Pangolin", cls._normalise_pangolin(score)) + .when(method == "LossOfFunctionCuration", cls._normalise_lof(assessment)) # The following predictors are not normalised: .when(method == "SpliceAI", score) .when(method == "VEP", score) @@ -454,7 +460,7 @@ def _rescaleColumnValue( @classmethod def _normalise_foldx( - cls: type[InSilicoPredictorNormaliser], score: Column + cls: type[VariantEffectNormaliser], score: Column ) -> Column: """Normalise FoldX ddG energies. @@ -477,7 +483,7 @@ def _normalise_foldx( @classmethod def _normalise_cadd( - cls: type[InSilicoPredictorNormaliser], + cls: type[VariantEffectNormaliser], score: Column, ) -> Column: """Normalise CADD scores. @@ -503,7 +509,7 @@ def _normalise_cadd( @classmethod def _normalise_gerp( - cls: type[InSilicoPredictorNormaliser], + cls: type[VariantEffectNormaliser], score: Column, ) -> Column: """Normalise GERP scores. @@ -533,9 +539,38 @@ def _normalise_gerp( .when(score < -3, f.lit(-1.0)) ) + @classmethod + def _normalise_lof( + cls: type[VariantEffectNormaliser], + assessment: Column, + ) -> Column: + """Normalise loss-of-function verdicts. + + There are five ordinal verdicts. + The normalised score is determined by the verdict: + - lof: 1 + - likely_lof: 0.5 + - uncertain: 0 + - likely_not_lof: -0.5 + - not_lof: -1 + + Args: + assessment (Column): Loss-of-function assessment. + + Returns: + Column: Normalised loss-of-function score. + """ + return ( + f.when(assessment == "lof", f.lit(1)) + .when(assessment == "likely_lof", f.lit(0.5)) + .when(assessment == "uncertain", f.lit(0)) + .when(assessment == "likely_not_lof", f.lit(-0.5)) + .when(assessment == "not_lof", f.lit(-1)) + ) + @classmethod def _normalise_loftee( - cls: type[InSilicoPredictorNormaliser], + cls: type[VariantEffectNormaliser], assessment: Column, ) -> Column: """Normalise LOFTEE scores. @@ -557,7 +592,7 @@ def _normalise_loftee( @classmethod def _normalise_sift( - cls: type[InSilicoPredictorNormaliser], + cls: type[VariantEffectNormaliser], score: Column, assessment: Column, ) -> Column: @@ -601,7 +636,7 @@ def _normalise_sift( @classmethod def _normalise_polyphen( - cls: type[InSilicoPredictorNormaliser], + cls: type[VariantEffectNormaliser], assessment: Column, score: Column, ) -> Column: @@ -632,7 +667,7 @@ def _normalise_polyphen( @classmethod def _normalise_alpha_missense( - cls: type[InSilicoPredictorNormaliser], + cls: type[VariantEffectNormaliser], score: Column, ) -> Column: """Normalise AlphaMissense scores. @@ -656,7 +691,7 @@ def _normalise_alpha_missense( @classmethod def _normalise_pangolin( - cls: type[InSilicoPredictorNormaliser], + cls: type[VariantEffectNormaliser], score: Column, ) -> Column: """Normalise Pangolin scores. diff --git a/src/gentropy/datasource/ensembl/vep_parser.py b/src/gentropy/datasource/ensembl/vep_parser.py index c5c985f3f..98b015cda 100644 --- a/src/gentropy/datasource/ensembl/vep_parser.py +++ b/src/gentropy/datasource/ensembl/vep_parser.py @@ -16,7 +16,7 @@ order_array_of_structs_by_field, order_array_of_structs_by_two_fields, ) -from gentropy.dataset.variant_index import InSilicoPredictorNormaliser, VariantIndex +from gentropy.dataset.variant_index import VariantEffectNormaliser, VariantIndex if TYPE_CHECKING: from pyspark.sql import Column, DataFrame @@ -33,9 +33,9 @@ class VariantEffectPredictorParser: DBXREF_SCHEMA = VariantIndex.get_schema()["dbXrefs"].dataType - # Schema description of the in silico predictor object: - IN_SILICO_PREDICTOR_SCHEMA = get_nested_struct_schema( - VariantIndex.get_schema()["inSilicoPredictors"] + # Schema description of the variant effect object: + VARIANT_EFFECT_SCHEMA = get_nested_struct_schema( + VariantIndex.get_schema()["variantEffect"] ) # Schema for the allele frequency column: @@ -341,7 +341,7 @@ def _get_most_severe_transcript( )[0] @classmethod - @enforce_schema(IN_SILICO_PREDICTOR_SCHEMA) + @enforce_schema(VARIANT_EFFECT_SCHEMA) def _get_vep_prediction(cls, most_severe_consequence: Column) -> Column: return f.struct( f.lit("VEP").alias("method"), @@ -352,7 +352,7 @@ def _get_vep_prediction(cls, most_severe_consequence: Column) -> Column: ) @staticmethod - @enforce_schema(IN_SILICO_PREDICTOR_SCHEMA) + @enforce_schema(VARIANT_EFFECT_SCHEMA) def _get_max_alpha_missense(transcripts: Column) -> Column: """Return the most severe alpha missense prediction from all transcripts. @@ -410,8 +410,8 @@ def _get_max_alpha_missense(transcripts: Column) -> Column: ) @classmethod - @enforce_schema(IN_SILICO_PREDICTOR_SCHEMA) - def _vep_in_silico_prediction_extractor( + @enforce_schema(VARIANT_EFFECT_SCHEMA) + def _vep_variant_effect_extractor( cls: type[VariantEffectPredictorParser], transcript_column_name: str, method_name: str, @@ -419,17 +419,17 @@ def _vep_in_silico_prediction_extractor( assessment_column_name: str | None = None, assessment_flag_column_name: str | None = None, ) -> Column: - """Extract in silico prediction from VEP output. + """Extract variant effect from VEP output. Args: transcript_column_name (str): Name of the column containing the list of transcripts. - method_name (str): Name of the in silico predictor. + method_name (str): Name of the variant effect. score_column_name (str | None): Name of the column containing the score. assessment_column_name (str | None): Name of the column containing the assessment. assessment_flag_column_name (str | None): Name of the column containing the assessment flag. Returns: - Column: In silico predictor. + Column: Variant effect. """ # Get transcript with the highest score: most_severe_transcript: Column = ( @@ -634,34 +634,34 @@ def process_vep_output( cls._extract_clinvar_xrefs(f.col("colocated_variants")).alias( "clinvar_xrefs" ), - # Extracting in silico predictors + # Extracting variant effect assessments f.when( - # The following in-silico predictors are only available for variants with transcript consequences: + # The following variant effect assessments are only available for variants with transcript consequences: f.col("transcript_consequences").isNotNull(), f.filter( f.array( # Extract CADD scores: - cls._vep_in_silico_prediction_extractor( + cls._vep_variant_effect_extractor( transcript_column_name="transcript_consequences", method_name="CADD", score_column_name="cadd_phred", ), # Extract polyphen scores: - cls._vep_in_silico_prediction_extractor( + cls._vep_variant_effect_extractor( transcript_column_name="transcript_consequences", method_name="PolyPhen", score_column_name="polyphen_score", assessment_column_name="polyphen_prediction", ), # Extract sift scores: - cls._vep_in_silico_prediction_extractor( + cls._vep_variant_effect_extractor( transcript_column_name="transcript_consequences", method_name="SIFT", score_column_name="sift_score", assessment_column_name="sift_prediction", ), # Extract loftee scores: - cls._vep_in_silico_prediction_extractor( + cls._vep_variant_effect_extractor( method_name="LOFTEE", transcript_column_name="transcript_consequences", score_column_name="lof", @@ -669,7 +669,7 @@ def process_vep_output( assessment_flag_column_name="lof_filter", ), # Extract GERP conservation score: - cls._vep_in_silico_prediction_extractor( + cls._vep_variant_effect_extractor( method_name="GERP", transcript_column_name="transcript_consequences", score_column_name="conservation", @@ -687,13 +687,13 @@ def process_vep_output( .otherwise( # Extract CADD scores from intergenic object: f.array( - cls._vep_in_silico_prediction_extractor( + cls._vep_variant_effect_extractor( transcript_column_name="intergenic_consequences", method_name="CADD", score_column_name="cadd_phred", ), # Extract GERP conservation score: - cls._vep_in_silico_prediction_extractor( + cls._vep_variant_effect_extractor( method_name="GERP", transcript_column_name="intergenic_consequences", score_column_name="conservation", @@ -702,7 +702,7 @@ def process_vep_output( cls._get_vep_prediction(f.col("most_severe_consequence")), ) ) - .alias("inSilicoPredictors"), + .alias("variantEffect"), # Convert consequence to SO: map_column_by_dictionary( f.col("most_severe_consequence"), cls.SEQUENCE_ONTOLOGY_MAP @@ -882,11 +882,11 @@ def process_vep_output( )[f.size("proteinCodingTranscripts") - 1], ), ) - # Normalising in silico predictor assessments: + # Normalising variant effect assessments: .withColumn( - "inSilicoPredictors", - InSilicoPredictorNormaliser.normalise_in_silico_predictors( - f.col("inSilicoPredictors") + "variantEffect", + VariantEffectNormaliser.normalise_variant_effect( + f.col("variantEffect") ), ) # Dropping intermediate xref columns: diff --git a/src/gentropy/datasource/open_targets/foldex_integration.py b/src/gentropy/datasource/open_targets/foldex_integration.py index 5354fc7b6..60d2c9f70 100644 --- a/src/gentropy/datasource/open_targets/foldex_integration.py +++ b/src/gentropy/datasource/open_targets/foldex_integration.py @@ -8,26 +8,26 @@ from gentropy.common.spark_helpers import enforce_schema from gentropy.dataset.amino_acid_variants import AminoAcidVariants -from gentropy.dataset.variant_index import InSilicoPredictorNormaliser +from gentropy.dataset.variant_index import VariantEffectNormaliser class OpenTargetsFoldX: """Class to parser FoldX dataset generated by the OTAR2081 project.""" - INSILICO_SCHEMA = AminoAcidVariants.get_schema()[ - "inSilicoPredictors" + VARIANT_EFFECT_SCHEMA = AminoAcidVariants.get_schema()[ + "variantEffect" ].dataType.elementType @staticmethod - @enforce_schema(INSILICO_SCHEMA) + @enforce_schema(VARIANT_EFFECT_SCHEMA) def get_foldx_prediction(score_column: Column) -> Column: - """Generate inSilicoPredictor object from ddG column. + """Generate variantEffect object from ddG column. Args: score_column (Column): ddG column from the FoldX dataset. Returns: - Column: struct with the right shape of the in silico predictors. + Column: struct with the right shape of the variantEffect field. """ return f.struct( f.lit("FoldX").alias("method"), @@ -58,21 +58,21 @@ def ingest_foldx_data( f.col("wild_type"), f.col("position"), f.col("mutated_type") ).alias("aminoAcidChange"), cls.get_foldx_prediction(f.col("foldx_ddg")).alias( - "inSilicoPredictor" + "foldx_prediction" ), ) # Collapse all predictors for a single array object to avoid variant explosions: .groupBy("uniprotAccession", "aminoAcidChange") .agg( - f.collect_set(f.col("inSilicoPredictor")).alias( - "inSilicoPredictors" + f.collect_set(f.col("fold_prediction")).alias( + "variantEffect" ) ) # Normalise FoldX free energy changes: .withColumn( - "inSilicoPredictors", - InSilicoPredictorNormaliser.normalise_in_silico_predictors( - f.col("inSilicoPredictors") + "variantEffect", + VariantEffectNormaliser.normalise_variant_effect( + f.col("variantEffect") ), ) ), diff --git a/src/gentropy/datasource/open_targets/lof_curation.py b/src/gentropy/datasource/open_targets/lof_curation.py new file mode 100644 index 000000000..67edcfdd7 --- /dev/null +++ b/src/gentropy/datasource/open_targets/lof_curation.py @@ -0,0 +1,98 @@ +"""Parser for Loss-of-Function variant data from Open Targets Project OTAR2075.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pyspark.sql.functions as f +import pyspark.sql.types as t + +from gentropy.common.spark_helpers import enforce_schema +from gentropy.dataset.variant_index import VariantEffectNormaliser, VariantIndex + +if TYPE_CHECKING: + from pyspark.sql import Column, DataFrame + + +class OpenTargetsLOF: + """Class to parse Loss-of-Function variant data from Open Targets Project OTAR2075.""" + + VARIANT_EFFECT_SCHEMA = VariantIndex.get_schema()[ + "variantEffect" + ].dataType.elementType + + @staticmethod + @enforce_schema(VARIANT_EFFECT_SCHEMA) + def _get_lof_assessment(verdict: Column) -> Column: + """Get curated Loss-of-Function assessment from verdict column. + + Args: + verdict (Column): verdict column from the input dataset. + + Returns: + Column: struct following the variant effect schema. + """ + return f.struct( + f.lit("LossOfFunctionCuration").alias("method"), + verdict.alias("assessment"), + ) + + @staticmethod + def _compose_lof_description(verdict: Column) -> Column: + """Compose variant description based on loss-of-function assessment. + + Args: + verdict (Column): verdict column from the input dataset. + + Returns: + Column: variant description. + """ + lof_description = ( + f.when(verdict == "lof", f.lit("Assessed to cause LoF")) + .when(verdict == "likely_lof", f.lit("Suspected to cause LoF")) + .when(verdict == "uncertain", f.lit("Uncertain LoF assessment")) + .when(verdict == "likely_not_lof", f.lit("Suspected not to cause LoF")) + .when(verdict == "not_lof", f.lit("Assessed not to cause LoF")) + ) + + return f.concat(lof_description, f.lit(" by OTAR2075 variant curation effort.")) + + @classmethod + def as_variant_index( + cls: type[OpenTargetsLOF], + lof_dataset: DataFrame + ) -> VariantIndex: + """Ingest Loss-of-Function information as a VariantIndex object. + + Args: + lof_dataset (DataFrame): curated input dataset from OTAR2075. + + Returns: + VariantIndex: variant annotations with loss-of-function assessments. + """ + return VariantIndex( + _df=( + lof_dataset + .select( + f.from_csv(f.col("Variant ID GRCh37"), "chr string, pos string, ref string, alt string", {"sep": "-"}).alias("h37"), + f.from_csv(f.col("Variant ID GRCh38"), "chr string, pos string, ref string, alt string", {"sep": "-"}).alias("h38"), + "Verdict" + ) + .select( + # As some GRCh37 variants do not correctly lift over to the correct GRCh38 variant, + # chr_pos is taken from the GRCh38 variant id, and ref_alt from the GRCh37 variant id + f.concat_ws("_", f.col("h38.chr"), f.col("h38.pos"), f.col("h37.ref"), f.col("h37.alt")).alias("variantId"), + # Mandatory fields for VariantIndex: + f.col("h38.chr").alias("chromosome"), + f.col("h38.pos").cast(t.IntegerType()).alias("position"), + f.col("h37.ref").alias("referenceAllele"), + f.col("h37.alt").alias("alternateAllele"), + # Populate variantEffect and variantDescription fields: + f.array(cls._get_lof_assessment(f.col("Verdict"))).alias("variantEffect"), + cls._compose_lof_description(f.col("Verdict")).alias("variantDescription"), + ) + # Convert assessments to normalised scores: + .withColumn("variantEffect", VariantEffectNormaliser.normalise_variant_effect(f.col("variantEffect"))) + ), + _schema=VariantIndex.get_schema(), + ) diff --git a/src/gentropy/lof_curation_ingestion.py b/src/gentropy/lof_curation_ingestion.py new file mode 100644 index 000000000..a2d66bc3c --- /dev/null +++ b/src/gentropy/lof_curation_ingestion.py @@ -0,0 +1,38 @@ +"""Ingest Loss-of-Function variant data generated by OTAR2075.""" + +from gentropy.common.session import Session +from gentropy.datasource.open_targets.lof_curation import OpenTargetsLOF + + +class LOFIngestionStep: + """Step to ingest the Loss-of-Function dataset generated by OTAR2075.""" + + def __init__( + self, + session: Session, + lof_curation_dataset_path: str, + lof_curation_variant_annotations_path: str, + ) -> None: + """Initialize step. + + Args: + session (Session): Session object. + lof_curation_dataset_path (str): path of the curated LOF dataset. + lof_curation_variant_annotations_path (str): path of the resulting variant annotations. + """ + # Read in data: + lof_dataset = session.spark.read.csv( + lof_curation_dataset_path, + sep=",", + header=True, + multiLine=True + ) + # Extract relevant information to a VariantIndex + lof_variant_annotations = OpenTargetsLOF.as_variant_index(lof_dataset) + # Write to file: + ( + lof_variant_annotations.df + .coalesce(session.output_partitions) + .write.mode(session.write_mode) + .parquet(lof_curation_variant_annotations_path) + ) diff --git a/src/gentropy/variant_index.py b/src/gentropy/variant_index.py index 595546f17..773075c70 100644 --- a/src/gentropy/variant_index.py +++ b/src/gentropy/variant_index.py @@ -27,7 +27,7 @@ def __init__( vep_output_json_path: str, variant_index_path: str, hash_threshold: int, - gnomad_variant_annotations_path: str | None = None, + variant_annotations_path: list[str] | None = None, amino_acid_change_annotations: list[str] | None = None, ) -> None: """Run VariantIndex step. @@ -37,7 +37,7 @@ def __init__( vep_output_json_path (str): Variant effect predictor output path (in json format). variant_index_path (str): Variant index dataset path to save resulting data. hash_threshold (int): Hash threshold for variant identifier length. - gnomad_variant_annotations_path (str | None): Path to extra variant annotation dataset. + variant_annotations_path (list[str] | None): List of paths to extra variant annotation datasets. amino_acid_change_annotations (list[str] | None): list of paths to amino-acid based variant annotations. """ # Extract variant annotations from VEP output: @@ -46,19 +46,20 @@ def __init__( ) # Process variant annotations if provided: - if gnomad_variant_annotations_path: - # Read variant annotations from parquet: - annotations = VariantIndex.from_parquet( - session=session, - path=gnomad_variant_annotations_path, - recursiveFileLookup=True, - id_threshold=hash_threshold, - ) + if variant_annotations_path: + for annotation_path in variant_annotations_path: + # Read variant annotations from parquet: + annotations = VariantIndex.from_parquet( + session=session, + path=annotation_path, + recursiveFileLookup=True, + id_threshold=hash_threshold, + ) - # Update index with extra annotations: - variant_index = variant_index.add_annotation(annotations) + # Update index with extra annotations: + variant_index = variant_index.add_annotation(annotations) - # If provided read amion-acid based annotation and enrich variant index: + # If provided read amino-acid based annotation and enrich variant index: if amino_acid_change_annotations: for annotation_path in amino_acid_change_annotations: annotation_data = AminoAcidVariants.from_parquet( diff --git a/tests/gentropy/conftest.py b/tests/gentropy/conftest.py index 185c0e177..9d436817d 100644 --- a/tests/gentropy/conftest.py +++ b/tests/gentropy/conftest.py @@ -306,7 +306,7 @@ def mock_variant_index(spark: SparkSession) -> VariantIndex: # https://github.com/databrickslabs/dbldatagen/issues/135 # It's a workaround for nested column handling in dbldatagen. .withColumnSpec( - "inSilicoPredictors", + "variantEffect", expr=""" array( named_struct( diff --git a/tests/gentropy/datasource/ensembl/test_vep_variants.py b/tests/gentropy/datasource/ensembl/test_vep_variants.py index f0127b9b2..b2dded70d 100644 --- a/tests/gentropy/datasource/ensembl/test_vep_variants.py +++ b/tests/gentropy/datasource/ensembl/test_vep_variants.py @@ -15,8 +15,8 @@ from pyspark.sql import SparkSession -class TestVEPParserInSilicoExtractor: - """Testing the _vep_in_silico_prediction_extractor method of the VEP parser class. +class TestVEPParserVariantEffectExtractor: + """Testing the _vep_variant_effect_extractor method of the VEP parser class. These tests assumes that the _get_most_severe_transcript() method works correctly, as it's not tested. @@ -42,7 +42,7 @@ class TestVEPParserInSilicoExtractor: SAMPLE_COLUMNS = ["variantId", "assessment", "score", "gene_id", "flag"] @pytest.fixture(autouse=True) - def _setup(self: TestVEPParserInSilicoExtractor, spark: SparkSession) -> None: + def _setup(self: TestVEPParserVariantEffectExtractor, spark: SparkSession) -> None: """Setup fixture.""" parsed_df = ( spark.createDataFrame(self.SAMPLE_DATA, self.SAMPLE_COLUMNS) @@ -59,18 +59,18 @@ def _setup(self: TestVEPParserInSilicoExtractor, spark: SparkSession) -> None: ) .select( "variantId", - VariantEffectPredictorParser._vep_in_silico_prediction_extractor( + VariantEffectPredictorParser._vep_variant_effect_extractor( "transcripts", "method_name", "score", "assessment", "flag" - ).alias("in_silico_predictions"), + ).alias("variant_effect"), ) ).persist() self.df = parsed_df - def test_in_silico_output_missing_value( - self: TestVEPParserInSilicoExtractor, + def test_variant_effect_missing_value( + self: TestVEPParserVariantEffectExtractor, ) -> None: - """Test if the in silico output count is correct.""" + """Test if the variant effect count is correct.""" variant_with_missing_score = [ x[0] for x in filter(lambda x: x[2] is None, self.SAMPLE_DATA) ] @@ -78,12 +78,10 @@ def test_in_silico_output_missing_value( assert ( [ x["variantId"] - for x in self.df.filter( - f.col("in_silico_predictions").isNull() - ).collect() + for x in self.df.filter(f.col("variant_effect").isNull()).collect() ] == variant_with_missing_score - ), "Not the right variants got nullified in-silico predictor object." + ), "Not the right variants got nullified in variant effect object." class TestVEPParser: