diff --git a/python/varspark/stats/lfdr.py b/python/varspark/stats/lfdr.py index 4458ec2e..98979868 100644 --- a/python/varspark/stats/lfdr.py +++ b/python/varspark/stats/lfdr.py @@ -32,7 +32,7 @@ def initial_list(cls, z, a=1, loc=1, scale=2): return [ cls.default(a, loc, scale), cls.from_mean(z, a, scale), - cls.from_data(z) + cls.from_data(z), ] @@ -51,6 +51,7 @@ class LocalFdr: p0 - the proportion of the null observations local_fdr - FDR array for each position """ + bins: np.int z: np.array x: np.array @@ -81,9 +82,12 @@ def _fit_density(self, x, y, df=10): :param df: Number of degrees of freedom for the splines :return: returns the smoothed data for the density distribution """ - transformed_x = patsy.dmatrix(f"cr(x, df={df})", {"x": x}, return_type='dataframe') + transformed_x = patsy.dmatrix( + f"cr(x, df={df})", {"x": x}, return_type="dataframe" + ) transformed_x = sm.add_constant( - transformed_x) # Makes no difference but added for consistency + transformed_x + ) # Makes no difference but added for consistency model = sm.GLM(y, transformed_x, family=sm.families.Poisson()).fit() return model.predict(transformed_x) @@ -111,8 +115,9 @@ def _local_fdr(self, f, f0, p0=1): f_normalised = (np.sum(f0) * f) / np.sum(f) return np.minimum((p0 * f0) / f_normalised, 1) - def _estimate_skewnorm_params(self, x, y, initial_params_list=SkewnormParams.default(), - max_nfev=400): + def _estimate_skewnorm_params( + self, x, y, initial_params_list=SkewnormParams.default(), max_nfev=400 + ): """ Estimate the best parameters for the observed function :param x: x-axis values @@ -128,7 +133,8 @@ def _fit_skew_normal(initial_params): # skewnorm.pdf residuals x0=np.array(initial_params), args=[x, y], - method='lm', max_nfev=max_nfev + method="lm", + max_nfev=max_nfev, ) def _has_converged(optimisation_result): @@ -137,13 +143,16 @@ def _has_converged(optimisation_result): if isinstance(initial_params_list, SkewnormParams): initial_params_list = [initial_params_list] - converged_results = filter(_has_converged, map(_fit_skew_normal, initial_params_list)) + converged_results = filter( + _has_converged, map(_fit_skew_normal, initial_params_list) + ) if converged_results: - best_result = ft.reduce(lambda r1, r2: r2 if r2.cost < r1.cost else r1, - converged_results) + best_result = ft.reduce( + lambda r1, r2: r2 if r2.cost < r1.cost else r1, converged_results + ) return SkewnormParams._make(best_result.x) else: - raise ValueError('All fittings failed') + raise ValueError("All fittings failed") def fit(self, z, bins): """ @@ -159,12 +168,14 @@ def fit(self, z, bins): # Estimate the tentative of the null distribution (skew-normal) # from the normalised histogram data. # - initial_f0_params = self._estimate_skewnorm_params(self.x, self.f_observed_y, - SkewnormParams.initial_list(z)) + initial_f0_params = self._estimate_skewnorm_params( + self.x, self.f_observed_y, SkewnormParams.initial_list(z) + ) self.C = skewnorm.ppf(0.95, **initial_f0_params._asdict()) - self.f0_params = self._estimate_skewnorm_params(self.x[self.x < self.C], - self.f_observed_y[self.x < self.C]) + self.f0_params = self._estimate_skewnorm_params( + self.x[self.x < self.C], self.f_observed_y[self.x < self.C] + ) self.f0_y = skewnorm.pdf(self.x, **self.f0_params._asdict()) self.p0 = self._estimate_p0(skewnorm.cdf(self.z, **self.f0_params._asdict())) @@ -187,14 +198,16 @@ def get_fdr(self, local_fdr_cutoff=0.05): start_x = scipy.stats.skewnorm.ppf(0.5, **self.f0_params._asdict()) start_x_index = np.where(self.x > start_x)[0][0] cutoff_index = start_x_index + np.argmin( - np.abs(self.local_fdr.iloc[start_x_index:self.bins] - local_fdr_cutoff)) + np.abs(self.local_fdr.iloc[start_x_index : self.bins] - local_fdr_cutoff) + ) cutoff_x = self.x[cutoff_index] cutoff_pvalue = 1 - skewnorm.cdf(cutoff_x, **self.f0_params._asdict()) significant_mask = (self.z > cutoff_x).to_numpy() # the proportion of false positives expected from null distribution # to the identified positives FDR = cutoff_pvalue * len(self.z) / sum(significant_mask) - return FDR, significant_mask + print(cutoff_pvalue) + return FDR, cutoff_pvalue, significant_mask def plot(self, ax): """ @@ -202,20 +215,30 @@ def plot(self, ax): :param ax: Matplot axis :return: """ - sns.histplot(self.z, ax=ax, stat='density', bins=self.bins, color='purple', label="Binned " - "importances") - ax.plot(self.x, skewnorm.pdf(self.x, **self.f0_params._asdict()), color='red', - label='fitted curve') - - ax.axvline(x=self.C, color='orange', label="C") + sns.histplot( + self.z, + ax=ax, + stat="density", + bins=self.bins, + color="purple", + label="Binned " "importances", + ) + ax.plot( + self.x, + skewnorm.pdf(self.x, **self.f0_params._asdict()), + color="red", + label="fitted curve", + ) + + ax.axvline(x=self.C, color="orange", label="C") ax.set_xlabel("Importances", fontsize=14) ax.set_ylabel("Density", fontsize=14) - ax.plot(np.nan, np.nan, color='blue', label='local fdr') # Adding to the legend - ax.axhline(y=np.nan, color='black', label="local fdr cutoff = 0.05") + ax.plot(np.nan, np.nan, color="blue", label="local fdr") # Adding to the legend + ax.axhline(y=np.nan, color="black", label="local fdr cutoff = 0.05") ax2 = ax.twinx() ax2.set_ylabel("Local FDR", fontsize=14) - ax2.axhline(y=0.05, color='black', label="local fdr cutoff = 0.05") - ax2.plot(self.x, self.local_fdr, color='blue') + ax2.axhline(y=0.05, color="black", label="local fdr cutoff = 0.05") + ax2.plot(self.x, self.local_fdr, color="blue") ax.legend(loc="upper right")