Skip to content

Commit

Permalink
DEV: Update fdr estimation to return p-value cutoff (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
NickEdwards7502 committed Sep 19, 2024
1 parent e9a23cf commit 4379b0a
Showing 1 changed file with 49 additions and 26 deletions.
75 changes: 49 additions & 26 deletions python/varspark/stats/lfdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def initial_list(cls, z, a=1, loc=1, scale=2):
return [
cls.default(a, loc, scale),
cls.from_mean(z, a, scale),
cls.from_data(z)
cls.from_data(z),
]


Expand All @@ -51,6 +51,7 @@ class LocalFdr:
p0 - the proportion of the null observations
local_fdr - FDR array for each position
"""

bins: np.int
z: np.array
x: np.array
Expand Down Expand Up @@ -81,9 +82,12 @@ def _fit_density(self, x, y, df=10):
:param df: Number of degrees of freedom for the splines
:return: returns the smoothed data for the density distribution
"""
transformed_x = patsy.dmatrix(f"cr(x, df={df})", {"x": x}, return_type='dataframe')
transformed_x = patsy.dmatrix(
f"cr(x, df={df})", {"x": x}, return_type="dataframe"
)
transformed_x = sm.add_constant(
transformed_x) # Makes no difference but added for consistency
transformed_x
) # Makes no difference but added for consistency
model = sm.GLM(y, transformed_x, family=sm.families.Poisson()).fit()
return model.predict(transformed_x)

Expand Down Expand Up @@ -111,8 +115,9 @@ def _local_fdr(self, f, f0, p0=1):
f_normalised = (np.sum(f0) * f) / np.sum(f)
return np.minimum((p0 * f0) / f_normalised, 1)

def _estimate_skewnorm_params(self, x, y, initial_params_list=SkewnormParams.default(),
max_nfev=400):
def _estimate_skewnorm_params(
self, x, y, initial_params_list=SkewnormParams.default(), max_nfev=400
):
"""
Estimate the best parameters for the observed function
:param x: x-axis values
Expand All @@ -128,7 +133,8 @@ def _fit_skew_normal(initial_params):
# skewnorm.pdf residuals
x0=np.array(initial_params),
args=[x, y],
method='lm', max_nfev=max_nfev
method="lm",
max_nfev=max_nfev,
)

def _has_converged(optimisation_result):
Expand All @@ -137,13 +143,16 @@ def _has_converged(optimisation_result):
if isinstance(initial_params_list, SkewnormParams):
initial_params_list = [initial_params_list]

converged_results = filter(_has_converged, map(_fit_skew_normal, initial_params_list))
converged_results = filter(
_has_converged, map(_fit_skew_normal, initial_params_list)
)
if converged_results:
best_result = ft.reduce(lambda r1, r2: r2 if r2.cost < r1.cost else r1,
converged_results)
best_result = ft.reduce(
lambda r1, r2: r2 if r2.cost < r1.cost else r1, converged_results
)
return SkewnormParams._make(best_result.x)
else:
raise ValueError('All fittings failed')
raise ValueError("All fittings failed")

def fit(self, z, bins):
"""
Expand All @@ -159,12 +168,14 @@ def fit(self, z, bins):
# Estimate the tentative of the null distribution (skew-normal)
# from the normalised histogram data.
#
initial_f0_params = self._estimate_skewnorm_params(self.x, self.f_observed_y,
SkewnormParams.initial_list(z))
initial_f0_params = self._estimate_skewnorm_params(
self.x, self.f_observed_y, SkewnormParams.initial_list(z)
)

self.C = skewnorm.ppf(0.95, **initial_f0_params._asdict())
self.f0_params = self._estimate_skewnorm_params(self.x[self.x < self.C],
self.f_observed_y[self.x < self.C])
self.f0_params = self._estimate_skewnorm_params(
self.x[self.x < self.C], self.f_observed_y[self.x < self.C]
)

self.f0_y = skewnorm.pdf(self.x, **self.f0_params._asdict())
self.p0 = self._estimate_p0(skewnorm.cdf(self.z, **self.f0_params._asdict()))
Expand All @@ -187,35 +198,47 @@ def get_fdr(self, local_fdr_cutoff=0.05):
start_x = scipy.stats.skewnorm.ppf(0.5, **self.f0_params._asdict())
start_x_index = np.where(self.x > start_x)[0][0]
cutoff_index = start_x_index + np.argmin(
np.abs(self.local_fdr.iloc[start_x_index:self.bins] - local_fdr_cutoff))
np.abs(self.local_fdr.iloc[start_x_index : self.bins] - local_fdr_cutoff)
)
cutoff_x = self.x[cutoff_index]
cutoff_pvalue = 1 - skewnorm.cdf(cutoff_x, **self.f0_params._asdict())
significant_mask = (self.z > cutoff_x).to_numpy()
# the proportion of false positives expected from null distribution
# to the identified positives
FDR = cutoff_pvalue * len(self.z) / sum(significant_mask)
return FDR, significant_mask
print(cutoff_pvalue)
return FDR, cutoff_pvalue, significant_mask

def plot(self, ax):
"""
Returns the built canvas for a sanity check.
:param ax: Matplot axis
:return:
"""
sns.histplot(self.z, ax=ax, stat='density', bins=self.bins, color='purple', label="Binned "
"importances")
ax.plot(self.x, skewnorm.pdf(self.x, **self.f0_params._asdict()), color='red',
label='fitted curve')

ax.axvline(x=self.C, color='orange', label="C")
sns.histplot(
self.z,
ax=ax,
stat="density",
bins=self.bins,
color="purple",
label="Binned " "importances",
)
ax.plot(
self.x,
skewnorm.pdf(self.x, **self.f0_params._asdict()),
color="red",
label="fitted curve",
)

ax.axvline(x=self.C, color="orange", label="C")
ax.set_xlabel("Importances", fontsize=14)
ax.set_ylabel("Density", fontsize=14)

ax.plot(np.nan, np.nan, color='blue', label='local fdr') # Adding to the legend
ax.axhline(y=np.nan, color='black', label="local fdr cutoff = 0.05")
ax.plot(np.nan, np.nan, color="blue", label="local fdr") # Adding to the legend
ax.axhline(y=np.nan, color="black", label="local fdr cutoff = 0.05")
ax2 = ax.twinx()
ax2.set_ylabel("Local FDR", fontsize=14)
ax2.axhline(y=0.05, color='black', label="local fdr cutoff = 0.05")
ax2.plot(self.x, self.local_fdr, color='blue')
ax2.axhline(y=0.05, color="black", label="local fdr cutoff = 0.05")
ax2.plot(self.x, self.local_fdr, color="blue")

ax.legend(loc="upper right")

0 comments on commit 4379b0a

Please sign in to comment.