Skip to content

Commit

Permalink
refactoring+cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
rizac committed Jul 20, 2023
1 parent 76b85f2 commit f739534
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 131 deletions.
16 changes: 8 additions & 8 deletions mecompute/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,17 @@ def cli(d_config, start, end, time_window, force_overwrite, p_config, h_template
me-compute -s 2016-01-02 -t 2 OUT_DIR
"""
ret = False
if start is None and end is None and time_window is None:
click.BadParameter('No time bounds specified. Please provide '
'-s, -e or -t').show()
else:
start, end = _get_timebounds(start, end, time_window)
print(f'Computing Me for events within: [{start}, {end}]', file=sys.stderr)
dest_dir = output_dir.replace("%S%", start).replace("%E%", end)
ret = compute_me(d_config, start, end, dest_dir, seg_sel=segments_selection,
force_overwrite=force_overwrite, p_config=p_config,
html_template=h_template)
sys.exit(1)

start, end = _get_timebounds(start, end, time_window)
print(f'Computing Me for events within: [{start}, {end}]', file=sys.stderr)
dest_dir = output_dir.replace("%S%", start).replace("%E%", end)
ret = compute_me(d_config, start, end, dest_dir, seg_sel=segments_selection,
force_overwrite=force_overwrite, p_config=p_config,
html_template=h_template)
if ret:
sys.exit(0)
print('WARNING: the program did not complete successfully, '
Expand Down
202 changes: 79 additions & 123 deletions mecompute/event_me.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,56 +10,18 @@

logger = logging.getLogger('me-compute.event_me')

pct = (5, 95) # percentiles
score_th = 0.75 # anomaly score threshold
ROUND = 2 # round to be used in mean and stddev. Set to None for no rounding


class Stats(Enum):

# legacy code for docstrings (not used anymore. Also note we compute only Me_p now):
Me = 'Me stats are computed on all waveforms'
Me_p = 'Me stats computed on the waveforms in the {0:d}-{1:d} ' \
'percentiles'.format(*pct)
Me_t = 'Me stats computed on the waveforms with anomaly score < %f' % score_th
Me_w = 'Me stats are computed on all waveforms, weighting values with ' \
'the inverse of the anomaly score (mapping linearly score and weights)'
Me_w2 = 'Me stats are computed on all waveforms, weighting values with ' \
'the inverse of the anomaly score (mapping non-linearly score and weights)'

def compute(self, values, *options):
values = np.asarray(values)
weights = None

if self is Stats.Me:
pass
if self is Stats.Me_p:
p_low, p_high = np.nanpercentile(values, pct)
values = values[(values >= p_low) & (values <= p_high)]
elif self is Stats.Me_t:
anomalyscores = np.asarray(options[0])
values = values[anomalyscores < score_th]
elif self is Stats.Me_w or self is Stats.Me_w2:
anomalyscores = np.asarray(options[0])
weights = (LinearScore2Weight if self is Stats.Me_w
else ParabolicScore2Weight).convert(anomalyscores)
else:
# should never happen right? for safety:
raise ValueError('%s is not a Stats enumeration item' % str(self))

return avg_std_count(values, weights, round=ROUND)


def compute_events_me(hdf_path_or_df, dburl):

def compute_events_me(station_me: pd.DataFrame, dburl):
"""Yield Events in form of dicts from the given station_me and dburl"""
session = get_session(dburl)
try:
yield from _compute_events_me(hdf_path_or_df, session)
yield from _compute_events_me(station_me, session)
finally:
session.close()


def _compute_events_me(station_me: pd.DataFrame, db_session):
"""Yield a series of dicts denoting a row of the report"""
"""Yield Events in form of dicts from the given station_me and db session"""
# see process.py:main for a list of columns:
dfr = station_me
for ev_db_id, evt_df in dfr.groupby('event_db_id'):
Expand All @@ -69,7 +31,7 @@ def _compute_events_me(station_me: pd.DataFrame, db_session):
if not waveforms_count:
logger.warning(f'Event {ev_db_id} skipped: no finite Me value found')
continue
me, me_std, num_waveforms = Stats.Me_p.compute(me_values)
me, me_std, num_waveforms = avg_std_count_within_percentiles(me_values)
with pd.option_context('mode.use_inf_as_na', True):
if pd.isna(me):
logger.warning(f'Event {ev_db_id} skipped: Me is NaN '
Expand Down Expand Up @@ -130,85 +92,9 @@ def get_html_report_rows(station_me: pd.DataFrame, events: dict):
yield events[ev_db_id], stas


class Score2Weight:
# these are the min score and max score of the currently selected model
# in sdaas:
maxscore = 0.86059521298447561
minscore = 0.41729107098205132

@classmethod
def convert(cls, scores):
"""converts scores to weights, returning a numpy array of values in [0, 1]
"""
scores = np.asarray(scores)
if not np.isfinite(scores).any():
return None # no weight
weights = cls._to_weights(scores, cls.minscore, cls.maxscore)
return np.clip(weights, 0., 1.)

@classmethod
def _to_weights(cls, scores, minscore, maxscore):
"""converts scores to weights
:param scores: numpy array of floats (usually but not necessarily in [0, 1])
:param minscore: the min possible score (not necessarily `min(scores)`)
:param maxscore: the max possible score (not necessarily `max(scores)`)
:return: a numpy array of floats the same size of `scores`. Values
outside [0, 1] will be clipped (limited) within that interval
"""
raise NotImplementedError('Not implemented')


class LinearScore2Weight(Score2Weight):

@classmethod
def _to_weights(cls, scores, minscore, maxscore):
"""converts scores to weights
:param scores: numpy array of floats (usually but not necessarily in [0, 1])
:param minscore: the min possible score (not necessarily `min(scores)`)
:param maxscore: the max possible score (not necessarily `max(scores)`)
:return: a numpy array of floats the same size of `scores`. Values
outside [0, 1] will be clipped (limited) within that interval
"""
# linear weight normalized between 0 and 1 and inverted
# (lower anomaly scores -> higher weight)
return 1 - ((scores-minscore) / (maxscore-minscore))


class ParabolicScore2Weight(Score2Weight):

@classmethod
def _to_weights(cls, scores, minscore, maxscore):
"""converts scores to weights
:param scores: numpy array of floats (usually but not necessarily in [0, 1])
:param minscore: the min possible score (not necessarily `min(scores)`)
:param maxscore: the max possible score (not necessarily `max(scores)`)
:return: a numpy array of floats the same size of `scores`. Values
outside [0, 1] will be clipped (limited) within that interval
"""
# use a parabola fitting with vertex in (0.5, 1): all scores
# <=0.5 are converted to weight 1, all scores >0.5 have weights
# decreasing parabolically. The fitting parabola has vertex in (0.5, 1)
# and passes through (maxscore, 0)
# the coefficients (calculated manually) are:
b = 1. / (maxscore**2 - maxscore + 0.25)
a = -b
c = 1. - b/4.0
weights = a * scores**2 + b * scores + c
weights[scores <= 0.5] = 1.
return weights


def avg_std_count(values, weights=None, na_repr=None, round=None):
"""Return the weighted average, standard deviation and count of values
as dict with keys 'mean', 'stddev', 'count'
values, weights -- Numpy ndarrays with the same shape.
"""Return the weighted average, standard deviation and count of finite values
from the arguments
"""
# we use np.average because it supports weights, but contrarily to np.mean it
# does not have a 'nanaverage' counterpart. So:
Expand All @@ -221,10 +107,80 @@ def avg_std_count(values, weights=None, na_repr=None, round=None):
mean, std, count = na_repr, na_repr, len(values)
if count > 0:
average = np.average(values, weights=weights)
# stdev:
# stddev:
variance = np.average((values - average) ** 2, weights=weights)
mean, std = float(average), math.sqrt(variance)
if round is not None:
mean = np.round(mean, round)
std = np.round(std, round)
return [mean, std, count]


def avg_std_count_within_percentiles(values, plow=5, phight=95, round=2):
"""Return the average, stddev and count of the given values within the
given percentiles (5-95 by default)
See avg_std_count for details
"""
p_low, p_high = np.nanpercentile(values, [plow, phight])
values = values[(values >= p_low) & (values <= p_high)]
return avg_std_count(values, None, round=round)


# legacy code used in the test phase (ignore):

def avg_std_count_anomalyscore_th(values, anomalyscores, score_th=.75, round=2):
"""Return the average, stddev and count of the given values, discarding
values whose anomaly score is higher than the specified threshold (.75)
See avg_std_count for details
"""

values = values[anomalyscores < score_th]
return avg_std_count(values, None, round=round)


def avg_std_count_anomalyscore_weight(values, anomalyscores, round=2):
"""Return the average, stddev and count of the given values, with weights
mapped linearly from the inverse of the given anomaly scores
See avg_std_count for details
"""
maxscore = 0.86059521298447561
minscore = 0.41729107098205132
weights = 1 - ((anomalyscores-minscore) / (maxscore-minscore))
# |
# 1 + ooo
# weight | o
# 0 + ooo
# +----+-----+-----
# .4 .8
# anomalyscore
return avg_std_count(values, np.clip(weights, 0., 1.), None, round=round)


def avg_std_count_anomalyscore_weight2(values, anomalyscores, round=2):
"""Return the average, stddev and count of the given values, with weights
mapped non-linearly from the inverse of the given anomaly scores
See avg_std_count for details
"""
maxscore = 0.86059521298447561
minscore = 0.41729107098205132
# use a parabola fitting with vertex in (0.5, 1): all scores
# <=0.5 are converted to weight 1, all scores >0.5 have weights
# decreasing parabolically:
# |
# 1 + ooo
# | o
# weight | o
# 0 + ooo0
# +----+----+-----
# .5 .8
# anomalyscore
#
# The fitting parabola has vertex in (0.5, 1)
# and passes through (maxscore, 0)
# the coefficients (calculated manually) are:
b = 1. / (maxscore ** 2 - maxscore + 0.25)
a = -b
c = 1. - b / 4.0
weights = a * anomalyscores ** 2 + b * anomalyscores + c
weights[anomalyscores <= 0.5] = 1.
return avg_std_count(values, np.clip(weights, 0., 1.), None, round=round)

0 comments on commit f739534

Please sign in to comment.