From f66f4450c7ed3f3e3c51b659759360a4d547039b Mon Sep 17 00:00:00 2001 From: NickEdwards7502 Date: Wed, 2 Oct 2024 14:51:25 +1000 Subject: [PATCH] CHORE: Remove VSHail python wrapper files (#237) --- python/varspark/hail/__init__.py | 16 --- python/varspark/hail/context.py | 31 ------ python/varspark/hail/hail.py | 111 --------------------- python/varspark/hail/lfdrvs.py | 161 ------------------------------- python/varspark/hail/methods.py | 69 ------------- python/varspark/hail/plot.py | 84 ---------------- python/varspark/hail/rf.py | 117 ---------------------- 7 files changed, 589 deletions(-) delete mode 100644 python/varspark/hail/__init__.py delete mode 100644 python/varspark/hail/context.py delete mode 100644 python/varspark/hail/hail.py delete mode 100644 python/varspark/hail/lfdrvs.py delete mode 100644 python/varspark/hail/methods.py delete mode 100644 python/varspark/hail/plot.py delete mode 100644 python/varspark/hail/rf.py diff --git a/python/varspark/hail/__init__.py b/python/varspark/hail/__init__.py deleted file mode 100644 index 484869a4..00000000 --- a/python/varspark/hail/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -from hail.backend.spark_backend import SparkBackend - -from .context import init, version_info, version -from .hail import SparkBackend__init__ -from .methods import * - -# -# HACK: Replace the SparkBackend.__init__ with our implementation -# Until we can get the required chanegs to hail -# -SparkBackend.__init__ = SparkBackend__init__ - -__all__ = ['init', - 'version', - 'version_info', - 'random_forest_model'] diff --git a/python/varspark/hail/context.py b/python/varspark/hail/context.py deleted file mode 100644 index d792b779..00000000 --- a/python/varspark/hail/context.py +++ /dev/null @@ -1,31 +0,0 @@ -import os -import sys - -import hail as hl -from hail.utils.java import Env - -import varspark as vs - - -def init(sc=None, idempotent=False, quiet=False, spark_conf=None, **kwargs): - """ Initialises hail context with variant-spark support. - - :param kwargs: same as for hail.init() - """ - - jars = [p.strip() for p in spark_conf.get("spark.jars", "").split(",")] if spark_conf else [] - vs_jar_path = vs.find_jar() - assert os.path.exists(vs_jar_path), "%s does not exist" % vs_jar_path - if not quiet: - sys.stderr.write("using variant-spark jar at '%s'\n" % vs_jar_path) - if not vs_jar_path in jars: - jars.append(vs_jar_path) - hl.init(sc=sc, idempotent=idempotent, quiet=quiet, spark_conf={'spark.jars': ",".join(jars)}, **kwargs) - - -def version(): - return Env.jvm().au.csiro.variantspark.python.Metadata.version() - - -def version_info(): - return Env.jvm().au.csiro.variantspark.python.Metadata.gitProperties() diff --git a/python/varspark/hail/hail.py b/python/varspark/hail/hail.py deleted file mode 100644 index 3f464a89..00000000 --- a/python/varspark/hail/hail.py +++ /dev/null @@ -1,111 +0,0 @@ -import os -import sys - -import pkg_resources -import pyspark -from hail.backend.spark_backend import install_exception_handler, connect_logger, \ - __name__ as __hail_name__ -from hail.utils.java import scala_package_object - - -# -# This is a customised version of the constructor honoring spark.jars passed in -# spark_config -# -# pylint: disable=C0415 - -def SparkBackend__init__(self, idempotent, sc, spark_conf, app_name, master, - local, log, quiet, append, min_block_size, - branching_factor, tmpdir, local_tmpdir, skip_logging_configuration, - optimizer_iterations): - if pkg_resources.resource_exists(__hail_name__, "hail-all-spark.jar"): - hail_jar_path = pkg_resources.resource_filename(__hail_name__, "hail-all-spark.jar") - assert os.path.exists(hail_jar_path), f'{hail_jar_path} does not exist' - conf = pyspark.SparkConf() - - base_conf = spark_conf or {} - for k, v in base_conf.items(): - conf.set(k, v) - - # Initialize jars from the current configuraiton - jars = [p.strip() for p in conf.get("spark.jars", "").split(",")] - if not hail_jar_path in jars: - jars.insert(0, hail_jar_path) - - if os.environ.get('HAIL_SPARK_MONITOR'): - import sparkmonitor - jars.append(os.path.join(os.path.dirname(sparkmonitor.__file__), 'listener.jar')) - conf.set("spark.extraListeners", "sparkmonitor.listener.JupyterSparkMonitorListener") - - conf.set('spark.jars', ','.join(jars)) - conf.set('spark.driver.extraClassPath', ','.join(jars)) - conf.set('spark.executor.extraClassPath', './hail-all-spark.jar') - if sc is None: - pyspark.SparkContext._ensure_initialized(conf=conf) - elif not quiet: - sys.stderr.write( - 'pip-installed Hail requires additional configuration options in Spark referring\n' - ' to the path to the Hail Python module directory HAIL_DIR,\n' - ' e.g. /path/to/python/site-packages/hail:\n' - ' spark.jars=HAIL_DIR/hail-all-spark.jar\n' - ' spark.driver.extraClassPath=HAIL_DIR/hail-all-spark.jar\n' - ' spark.executor.extraClassPath=./hail-all-spark.jar') - else: - pyspark.SparkContext._ensure_initialized() - - self._gateway = pyspark.SparkContext._gateway - self._jvm = pyspark.SparkContext._jvm - - hail_package = getattr(self._jvm, 'is').hail - - self._hail_package = hail_package - self._utils_package_object = scala_package_object(hail_package.utils) - - jsc = sc._jsc.sc() if sc else None - - if idempotent: - self._jbackend = hail_package.backend.spark.SparkBackend.getOrCreate( - jsc, app_name, master, local, True, min_block_size, tmpdir, local_tmpdir) - self._jhc = hail_package.HailContext.getOrCreate( - self._jbackend, log, True, append, branching_factor, skip_logging_configuration, - optimizer_iterations) - else: - self._jbackend = hail_package.backend.spark.SparkBackend.apply( - jsc, app_name, master, local, True, min_block_size, tmpdir, local_tmpdir) - self._jhc = hail_package.HailContext.apply( - self._jbackend, log, True, append, branching_factor, skip_logging_configuration, - optimizer_iterations) - - self._jsc = self._jbackend.sc() - if sc: - self.sc = sc - else: - self.sc = pyspark.SparkContext(gateway=self._gateway, - jsc=self._jvm.JavaSparkContext(self._jsc)) - self._jspark_session = self._jbackend.sparkSession() - self._spark_session = pyspark.sql.SparkSession(self.sc, self._jspark_session) - - # This has to go after creating the SparkSession. Unclear why. - # Maybe it does its own patch? - install_exception_handler() - - from hail.context import version - - py_version = version() - jar_version = self._jhc.version() - if jar_version != py_version: - raise RuntimeError(f"Hail version mismatch between JAR and Python library\n" - f" JAR: {jar_version}\n" - f" Python: {py_version}") - - self._fs = None - self._logger = None - - if not quiet: - sys.stderr.write('Running on Apache Spark version {}\n'.format(self.sc.version)) - if self._jsc.uiWebUrl().isDefined(): - sys.stderr.write('SparkUI available at {}\n'.format(self._jsc.uiWebUrl().get())) - - connect_logger(self._utils_package_object, 'localhost', 12888) - - self._jbackend.startProgressBar() diff --git a/python/varspark/hail/lfdrvs.py b/python/varspark/hail/lfdrvs.py deleted file mode 100644 index a56e5a76..00000000 --- a/python/varspark/hail/lfdrvs.py +++ /dev/null @@ -1,161 +0,0 @@ -from varspark.stats.lfdr import * - - -class LocalFdrVs: - local_fdr: object - df_: object - - def __init__(self, df): - """ - Constructor class - :param df: Takes a pandas dataframe as argument with three columns: variant_id, - logImportance and splitCount. - """ - self.df_ = df.sort_values('logImportance', ascending=True) - - @classmethod - def from_imp_df(cls, df): - """ - Alternative class instantiation from a pandas dataframe - :param cls: LocalFdrVs class - :param df: Pandas dataframe with columns locus, alleles, importance, and splitCount. - :return: Initialized class instance. - """ - df = df.assign(logImportance=np.log(df.importance)) - df['variant_id'] = df.apply( - lambda row: str(row['locus'][0]) + '_' + str(row['locus'][1]) + '_' + \ - str('_'.join(row['alleles'])), axis=1) - return cls(df[['variant_id', 'logImportance', 'splitCount']]) - - @classmethod - def from_imp_table(cls, impTable): - """ - Alternative class instantiation from a Hail Table (VariantSpark users). - :param cls: LocalFdrVs class - :param impTable: Hail table with locus, alleles, importance, and splitCount. - :return: Initialized class instance. - """ - impTable = impTable.filter(impTable.splitCount >= 1) - return LocalFdrVs.from_imp_df(impTable.to_spark(flatten=False).toPandas()) - - def find_split_count_th(self, cutoff_list=[1, 2, 3, 4, 5, 10, 15, 20], quantile=0.75, bins=120): - """ - Finds the ideal threshold for the splitCount. Ideal being the lowest differences between - the fitted skewed normal distribution vs the real data - :param cutoff_list: List of all values to be tried for the cutoff in the splitCount - :param quantile: Quantile to evaluate the distribution - :param bins: Number of bins for the distribution - :return: best splitCount threshold - """ - best_split = [1, np.inf] - - for split in cutoff_list: - # impDfWithLog = self.df_[self.df_.splitCount >= split] - impDfWithLog = self.df_[self.df_.splitCount >= split] # temp - impDfWithLog = impDfWithLog[['variant_id', 'logImportance']].set_index( - 'variant_id').squeeze() - - local_fdr = LocalFdr() - local_fdr.bins = bins - - impDfWithLog = impDfWithLog + sys.float_info.epsilon - x, f_observed_y = local_fdr._observed_density(impDfWithLog) - f_y = local_fdr._fit_density(x, f_observed_y) - - C = np.quantile(impDfWithLog, q=quantile) - - initial_f0_params = local_fdr._estimate_skewnorm_params(x[x < C], f_observed_y[x < C], - SkewnormParams.initial_list(impDfWithLog)) - - res = skewnorm.pdf(x, a=initial_f0_params.a, loc=initial_f0_params.loc, - scale=initial_f0_params.scale) - f_observed_y - res = sum(res[x < C] ** 2) - if best_split[1] > res: - best_split[0] = split - best_split[1] = res - - return best_split[0] - - def plot_log_densities(self, ax, cutoff_list=[1, 2, 3, 4, 5, 10, 15, 20], palette='Set1', - find_automatic_best=False, xLabel='log(importance)', yLabel='density'): - """ - Plotting the log densities to visually identify the unimodal distributions. - :param ax: Matplotlib axis as a canvas for this plot. - :param cutoff_list: list of potential splitCount thresholds - :param find_automatic_best: The user may let the computer highlight the potential best option. - :param palette: Matplotlib color palette used for the plotting. - :param xLabel: Label on the x-axis of the plot. - :param yLabel: Label on the y-axis of the plot. - """ - assert type(palette) == str, 'palette should be a string' - assert type(xLabel) == str, 'xLabel should be a string' - assert type(yLabel) == str, 'yLabel should be a string' - - n_lines = len(cutoff_list) - colors = sns.mpl_palette(palette, n_lines) - df = self.df_ - for i, c in zip(cutoff_list, colors): - sns.kdeplot(df.logImportance[df.splitCount >= i], - ax=ax, c=c, bw_adjust=0.5) # bw low show sharper distributions - - if find_automatic_best: - potential_best = self.find_split_count_th(cutoff_list=cutoff_list) - sns.kdeplot(df.logImportance[df.splitCount >= potential_best], - ax=ax, c=colors[potential_best - 1], bw_adjust=0.5, lw=8, linestyle=':') - best_split = [str(x) if x != potential_best else str(x) + '*' for x in cutoff_list] - else: - best_split = cutoff_list - - ax.legend(title='Minimum split counts in distribution') - ax.legend(labels=best_split, bbox_to_anchor=(1, 1)) - ax.set_xlabel(xLabel) - ax.set_ylabel(yLabel) - - def plot_log_hist(self, ax, split_count, bins=120, xLabel='log(importance)', yLabel='count'): - """ - Ploting the log histogram for the chosen split_count - :param ax: Matplotlib axis as a canvas for this plot. - :param split_count: Minimum split count threshold for the plot. - :param bins: Number of bins in the histogram - :param xLabel: Label on the x-axis of the plot. - :param yLabel: Label on the y-axis of the plot. - """ - - assert bins > 0, 'bins should be bigger than 0' - assert split_count > 0, 'split_count should be bigger than 0' - assert type(xLabel) == str, 'xLabel should be a string' - assert type(yLabel) == str, 'yLabel should be a string' - - df = self.df_ - sns.histplot(df.logImportance[df.splitCount >= split_count], ax=ax, bins=bins) - ax.set_xlabel(xLabel) - ax.set_ylabel(yLabel) - - def plot(self, ax): - self.local_fdr.plot(ax) - - def compute_fdr(self, countThreshold=2, local_fdr_cutoff=0.05, bins=120): - """ - Compute the FDR and p-values of the SNPs. - :param countThreshold: The split count threshold for the SNPs to be considered. - :param local_fdr_cutoff: Threshold of False positives over total of genes - :param bins: number of bins to which the log importances will be aggregated - :return: A tuple with a dataframe containing the SNPs and their p-values, - and the expected FDR for the significant genes. - """ - - assert countThreshold > 0, 'countThreshold should be bigger than 0' - assert 0 < local_fdr_cutoff < 1, 'local_fdr_cutoff threshold should be between 0 and 1' - - impDfWithLog = self.df_[self.df_.splitCount >= countThreshold] - impDfWithLog = impDfWithLog[['variant_id', 'logImportance']].set_index( - 'variant_id').squeeze() - - self.local_fdr = LocalFdr() - self.local_fdr.fit(impDfWithLog, bins) - pvals = self.local_fdr.get_pvalues() - fdr, mask = self.local_fdr.get_fdr(local_fdr_cutoff) - return ( - impDfWithLog.reset_index().assign(pvalue=pvals, is_significant=mask), - fdr - ) diff --git a/python/varspark/hail/methods.py b/python/varspark/hail/methods.py deleted file mode 100644 index 3980b100..00000000 --- a/python/varspark/hail/methods.py +++ /dev/null @@ -1,69 +0,0 @@ -from typing import Dict -from hail.utils import wrap_to_list -from hail.utils.java import Env - - -from hail.expr.expressions import * -from hail.typecheck import * -from hail.methods.statgen import _get_regression_row_fields - -from . import rf - - -@typecheck( - y=expr_float64, - x=expr_int32, - covariates=dictof(str, expr_float64), - oob=bool, - mtry_fraction=nullable(float), - min_node_size=nullable(int), - max_depth=nullable(int), - seed=nullable(int), - imputation_type=nullable(str) -) -def random_forest_model(y, x, covariates={}, oob=True, mtry_fraction=None, - min_node_size=None, max_depth=None, seed=None, imputation_type=None): - """Creates an empty random forest classifier with specified parameters. - - :param Float64Expression y: Column-indexed response expressions. - :param Float64Expression x: Entry-indexed expression for input variable. - :param covariates: Dictionary of covariate names and their respective expression. - :param bool oob: Should OOB error be calculated. - :param float mtry_fraction: The fraction of variables to try at each split. - :param int min_node_size: The minimum number of samples in a node to be consider for splitting. - :param int max_depth: The maximum depth of the trees to build. - :param int seed: Random seed to use. - :param string imputation_type: Imputation type to use. Currently only "mode" is supported which performs - basic replacement of missing values with the mode of non missing values. If not provided and input containts - missing data an error is reported. - - :return: An empty random forest classifier. - :rtype: :py:class:`RandomForestModel` - """ - - mt = matrix_table_source('random_forest_model/x', x) - check_entry_indexed('random_forest_model/x', x) - - for key, e in covariates.items(): - analyze('random_forest_model/covariates', e, mt._col_indices) - - '''potential multi-predictions - y = wrap_to_list(y) - y_field = ['y'] - y_dict = dict(zip(y_field, y))''' - y_dict = {'y':y} - - mts = mt._select_all(col_exprs=dict(**y_dict, - **{'cov__'+k:v for k,v in covariates.items()}), - row_exprs={}, - col_key=[], - entry_exprs=dict(e=x)) - - - return rf.RandomForestModel(mts._mir, - oob=oob, - mtry_fraction=mtry_fraction, - min_node_size=min_node_size, - max_depth=max_depth, - seed=seed, - imputation_type=imputation_type) diff --git a/python/varspark/hail/plot.py b/python/varspark/hail/plot.py deleted file mode 100644 index 17885a10..00000000 --- a/python/varspark/hail/plot.py +++ /dev/null @@ -1,84 +0,0 @@ -from hail.plot.plots import * -from hail.plot.plots import _collect_scatter_plot_data, _get_scatter_plot_elements - - -@typecheck(pvals=expr_float64, locus=nullable(expr_locus()), title=nullable(str), - size=int, hover_fields=nullable(dictof(str, expr_any)), collect_all=bool, - n_divisions=int, significance_line=nullable(numeric)) -def manhattan_imp(pvals, locus=None, title=None, size=4, hover_fields=None, collect_all=False, - n_divisions=500, significance_line=5e-8): - """Create a Manhattan plot. (https://en.wikipedia.org/wiki/Manhattan_plot) - - Parameters - ---------- - pvals : :class:`.Float64Expression` - P-values to be plotted. - locus : :class:`.LocusExpression` - Locus values to be plotted. - title : str - Title of the plot. - size : int - Size of markers in screen space units. - hover_fields : Dict[str, :class:`.Expression`] - Dictionary of field names and values to be shown in the HoverTool of the plot. - collect_all : bool - Whether to collect all values or downsample before plotting. - n_divisions : int - Factor by which to downsample (default value = 500). A lower input results in fewer output datapoints. - significance_line : float, optional - p-value at which to add a horizontal, dotted red line indicating - genome-wide significance. If ``None``, no line is added. - - Returns - ------- - :class:`bokeh.plotting.figure.Figure` - """ - if locus is None: - locus = pvals._indices.source.locus - - ref = locus.dtype.reference_genome - - if hover_fields is None: - hover_fields = {} - - hover_fields['locus'] = hail.str(locus) - - # pvals = -hail.log10(pvals) - - source_pd = _collect_scatter_plot_data( - ('_global_locus', locus.global_position()), - ('_pval', pvals), - fields=hover_fields, - n_divisions=None if collect_all else n_divisions - ) - source_pd['p_value'] = list(source_pd['_pval']) - source_pd['_contig'] = [locus.split(":")[0] for locus in source_pd['locus']] - - observed_contigs = set(source_pd['_contig']) - observed_contigs = [contig for contig in ref.contigs.copy() if contig in observed_contigs] - contig_ticks = hail.eval( - [hail.locus(contig, int(ref.lengths[contig] / 2)).global_position() for contig in - observed_contigs]) - color_mapper = CategoricalColorMapper(factors=ref.contigs, - palette=palette[:2] * int((len(ref.contigs) + 1) / 2)) - - p = figure(title=title, x_axis_label='Chromosome', y_axis_label='Importance (gini)', width=1000) - p, _, legend, _, _, _ = _get_scatter_plot_elements( - p, source_pd, x_col='_global_locus', y_col='_pval', - label_cols=['_contig'], colors={'_contig': color_mapper}, - size=size - ) - legend.visible = False - p.xaxis.ticker = contig_ticks - p.xaxis.major_label_overrides = dict(zip(contig_ticks, observed_contigs)) - p.select_one(HoverTool).tooltips = [t for t in p.select_one(HoverTool).tooltips if - not t[0].startswith('_')] - - if significance_line is not None: - p.renderers.append(Span(location=-math.log10(significance_line), - dimension='width', - line_color='red', - line_dash='dashed', - line_width=1.5)) - - return p diff --git a/python/varspark/hail/rf.py b/python/varspark/hail/rf.py deleted file mode 100644 index cf868331..00000000 --- a/python/varspark/hail/rf.py +++ /dev/null @@ -1,117 +0,0 @@ -from hail.ir import * -from hail.table import Table -from hail.typecheck import * -from hail.utils.java import Env -from varspark.hail.lfdrvs import LocalFdrVs - -#from varspark.pvalues_calculation import * - -class RandomForestModel(object): - """ Represents a random forest model object. Do not construct it directly but rather - use `varspark.hail.methods.random_forest_model` function. - """ - - @typecheck_method( - _mir=MatrixIR, - oob=bool, - mtry_fraction=nullable(float), - min_node_size=nullable(int), - max_depth=nullable(int), - seed=nullable(int), - imputation_type=nullable(str) - ) - def __init__(self, _mir, oob=True, mtry_fraction=None, min_node_size=None, - max_depth=None, seed=None, imputation_type=None): - self._mir = _mir - self._jrf_model = Env.backend().jvm().au.csiro.variantspark.hail.methods.RFModel.pyApply( - Env.backend()._jbackend, - Env.spark_backend('rf')._to_java_matrix_ir(self._mir), - mtry_fraction, oob, min_node_size, - max_depth, seed, imputation_type) - - def __enter__(self): - return self - - def __exit__(self, *exc_details): - self.release() - - @typecheck_method( - n_trees=int, - batch_size=int - ) - def fit_trees(self, n_trees=500, batch_size=100): - """ Fits the random forest model. - - :param int n_trees: The number of trees to build in the forest. - :param int batch_size: The number of trees to build in one batch. - """ - - self._jrf_model.fitTrees(n_trees, batch_size) - - def oob_error(self): - """ Returns the Out of Bag (OOB) error for this model. Only available if the model was created with the - `oob` option. - - :rtype: float - """ - return self._jrf_model.oobError() - - def variable_importance(self): - """ Returns the variable importance for this model in a hail `Table` with the following row fields: - - 'locus': locus - 'alleles': array - 'importance': float64 - - and indexed with: ['locus', 'alleles']. - - The `importance` column contains gini importance for each of the variants. - - :rtype: :py:class:`hail.is.Table` - """ - return Table._from_java(self._jrf_model.variableImportance()) - - - def get_lfdr(self): - """ Returns the class with the information preloaded to compute the local FDR - :return: class LocalFdrVs with the importances loaded - """ - return LocalFdrVs.from_imp_table(self.variable_importance()) - - - def covariate_importance(self): - """ Returns the covariate importance for this model in a hail `Table` with the following - row fields: - - 'covariate': str - 'importance': float64 - 'splitCount': float64 - - and indexed with: ['covariate']. - - The `importance` column contains gini importance for each of the covariates. - - :rtype: :py:class:`hail.is.Table` - """ - return Table._from_java(self._jrf_model.covariatesImportance()) - - - @typecheck_method( - filename=str, - resolve_names=bool - ) - def to_json(self, filename, resolve_names=True): - """ Saves the model JSON representation to a file. If `resolve_names` is set - includes the variable names as well as indexes in the output. This does however - incur performance penalty for creation of in-memory variable index. - - :param str filename: The file to save the model to. - :param bool resolve_names: Resolve variable names in the saved JSON. - """ - self._jrf_model.toJson(filename, resolve_names) - - def release(self): - """ Release all in-memory resources associated with this model. - This may include cached datasets, etc. - """ - self._jrf_model.release()