Skip to content

Commit

Permalink
✨ Add GP outlier detector
Browse files Browse the repository at this point in the history
  • Loading branch information
Marigold committed Oct 15, 2024
1 parent 09184fc commit a461a41
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 109 deletions.
65 changes: 19 additions & 46 deletions apps/anomalist/anomalist_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
AnomalyUpgradeMissing,
get_long_format_score_df,
)
from apps.anomalist.gp_detector import AnomalyGaussianProcessOutlier
from apps.wizard.utils.paths import WIZARD_ANOMALIES_RELATIVE
from etl import grapher_model as gm
from etl.config import OWID_ENV
Expand All @@ -28,16 +29,20 @@
# Name of index columns for dataframe.
INDEX_COLUMNS = ["entity_name", "year"]

# Define anomaly types.
ANOMALY_TYPE = Literal["upgrade_change", "time_change", "upgrade_missing"]

# Define mapping of available anomaly types to anomaly detectors.
ANOMALY_DETECTORS = {
"time_change": AnomalyTimeChange,
"upgrade_change": AnomalyUpgradeChange,
"upgrade_missing": AnomalyUpgradeMissing,
detector.anomaly_type: detector
for detector in [
AnomalyTimeChange,
AnomalyUpgradeChange,
AnomalyUpgradeMissing,
AnomalyGaussianProcessOutlier,
]
}

# Define anomaly types.
ANOMALY_TYPE = Literal[tuple(ANOMALY_DETECTORS.keys())]

########################################################################################################################

# AGGREGATE ANOMALIES:
Expand Down Expand Up @@ -181,7 +186,6 @@ def get_analytics_score(df_aggregated: pd.DataFrame, df_views: pd.DataFrame) ->

def anomaly_detection(
anomaly_types: Optional[Tuple[str, ...]] = None,
dataset_ids: Optional[list[int]] = None,
variable_mapping: Optional[dict[int, int]] = None,
variable_ids: Optional[list[int]] = None,
dry_run: bool = False,
Expand Down Expand Up @@ -212,12 +216,9 @@ def anomaly_detection(
(set(variable_mapping.keys()) | set(variable_mapping.values())) if variable_mapping else set()
)
variable_ids_all = list(variable_ids_mapping | set(variable_ids or []))
if dataset_ids is None:
dataset_ids = []
# Dictionary variable_id: Variable object, for all variables (old and new).
variables = {
variable.id: variable
for variable in _load_variables_meta(engine=engine, dataset_ids=dataset_ids, variable_ids=variable_ids_all)
variable.id: variable for variable in _load_variables_meta(engine=engine, variable_ids=variable_ids_all)
}

# Create a dictionary of all variable_ids for each dataset_id (only for new variables).
Expand Down Expand Up @@ -267,7 +268,7 @@ def anomaly_detection(
# TODO: validate format of the output dataframe
anomaly = gm.Anomaly(
datasetId=dataset_id,
anomalyType=detector.anomaly_type,
anomalyType=anomaly_type,
)
anomaly.dfScore = df_score_long

Expand Down Expand Up @@ -339,40 +340,12 @@ def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd.


# @memory.cache
def _load_variables_meta(
engine: Engine, dataset_ids: Optional[list[int]], variable_ids: Optional[list[int]]
) -> list[gm.Variable]:
if dataset_ids:
q = """
select id from variables
where datasetId in %(dataset_ids)s
"""
df_from_dataset_ids = read_sql(q, engine, params={"dataset_ids": dataset_ids})
else:
df_from_dataset_ids = pd.DataFrame()

if variable_ids:
q = """
select id from variables
where id in %(variable_ids)s
"""
df_from_variable_ids = read_sql(q, engine, params={"variable_ids": variable_ids})
else:
df_from_variable_ids = pd.DataFrame()

# Combine both dataframes to get all possible variables required.
df = pd.concat([df_from_dataset_ids, df_from_variable_ids]).drop_duplicates()

# load all variables from a random dataset
if df.empty:
q = """
with t as (
select id from datasets order by rand() limit 1
)
select id from variables
where datasetId in (select id from t)
"""
df = read_sql(q, engine)
def _load_variables_meta(engine: Engine, variable_ids: list[int]) -> list[gm.Variable]:
q = """
select id from variables
where id in %(variable_ids)s
"""
df = read_sql(q, engine, params={"variable_ids": variable_ids})

# select all variables using SQLAlchemy
with Session(engine) as session:
Expand Down
24 changes: 0 additions & 24 deletions apps/anomalist/bard_anomaly.py

This file was deleted.

13 changes: 11 additions & 2 deletions apps/anomalist/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from rich_click.rich_command import RichCommand

from apps.anomalist.anomalist_api import ANOMALY_TYPE, anomaly_detection
from etl.db import get_engine, read_sql
from etl.paths import CACHE_DIR

log = structlog.get_logger()
Expand Down Expand Up @@ -96,11 +97,19 @@ def cli(
else:
variable_mapping_dict = {}

# Load all variables from given datasets
if dataset_ids:
assert not variable_ids, "Cannot specify both dataset IDs and variable IDs."
q = """
select id from variables
where datasetId in %(dataset_ids)s
"""
variable_ids = list(read_sql(q, get_engine(), params={"dataset_ids": dataset_ids})["id"])

anomaly_detection(
anomaly_types=anomaly_types,
dataset_ids=dataset_ids,
variable_mapping=variable_mapping_dict,
variable_ids=variable_ids,
variable_ids=list(variable_ids) if variable_ids else None,
dry_run=dry_run,
reset_db=reset_db,
)
Expand Down
43 changes: 33 additions & 10 deletions apps/anomalist/detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,46 @@ def estimate_bard_epsilon(series: pd.Series) -> float:

def get_long_format_score_df(df_score: pd.DataFrame) -> pd.DataFrame:
# Create a reduced score dataframe.
df_score_long = df_score.melt(
id_vars=["entity_name", "year"], var_name="variable_id", value_name="anomaly_score"
).fillna(0)
# For now, keep only the latest year affected for each country-indicator.
df_score_long = df_score.melt(id_vars=["entity_name", "year"], var_name="variable_id", value_name="anomaly_score")

# Drop NaN anomalies.
df_score_long = df_score_long.dropna(subset=["anomaly_score"])

# For now, keep only the highest anomaly score for each country-indicator.
df_score_long = (
df_score_long.sort_values("anomaly_score", ascending=False)
.drop_duplicates(subset=["variable_id", "entity_name"], keep="first")
.reset_index(drop=True)
)

# Save memory by converting to categoricals.
df_score_long = df_score_long.astype({"entity_name": "category", "year": "category", "variable_id": "category"})

return df_score_long


class AnomalyDetector:
anomaly_type: str

def get_text(self, entity: str, year: int) -> str:
return f"Anomaly happened in {entity} in {year}!"

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
raise NotImplementedError

def get_zeros_df(self, df: pd.DataFrame, variable_ids: List[int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_zeros = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids]
df_zeros[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy()
return df_zeros

def get_nans_df(self, df: pd.DataFrame, variable_ids: List[int]) -> pd.DataFrame:
# Create a dataframe of nans.
df_nans = pd.DataFrame(np.empty_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids]
df_nans[variable_ids] = np.nan
df_nans[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy()
return df_nans

# Visually inspect the most significant anomalies on a certain scores dataframe.
def inspect_anomalies(
self,
Expand Down Expand Up @@ -109,14 +133,15 @@ class AnomalyUpgradeMissing(AnomalyDetector):

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_lost = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids]
df_lost[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy()
df_lost = self.get_zeros_df(df, variable_ids)

# Make 1 all cells that used to have data in the old version, but are missing in the new version.
for variable_id_old, variable_id_new in variable_mapping.items():
affected_rows = df[(df[variable_id_old].notnull()) & (df[variable_id_new].isnull())].index
df_lost.loc[affected_rows, variable_id_new] = 1

__import__("ipdb").set_trace()

return df_lost


Expand All @@ -127,8 +152,7 @@ class AnomalyUpgradeChange(AnomalyDetector):

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_version_change = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids]
df_version_change[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy()
df_version_change = self.get_zeros_df(df, variable_ids)

for variable_id_old, variable_id_new in variable_mapping.items():
# Calculate the BARD epsilon for each variable.
Expand All @@ -148,8 +172,7 @@ class AnomalyTimeChange(AnomalyDetector):

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_time_change = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids]
df_time_change[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy()
df_time_change = self.get_zeros_df(df, variable_ids)

# Sanity check.
error = "The function that detects abrupt time changes assumes the data is sorted by entity_name and year. But this is not the case. Either ensure the data is sorted, or fix the function."
Expand Down
58 changes: 31 additions & 27 deletions apps/anomalist/gp_anomaly.py → apps/anomalist/gp_detector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import random
import time
import warnings
from typing import Optional, cast
from typing import Dict, List, Optional

import matplotlib.pyplot as plt
import numpy as np
Expand All @@ -13,52 +13,54 @@

from etl import grapher_model as gm

log = structlog.get_logger()


class AnomalyDetector:
anomaly_type: str
from .detectors import AnomalyDetector

log = structlog.get_logger()

class SampleAnomalyDetector(AnomalyDetector):
anomaly_type = "sample"

def get_score_df(self, df: pd.DataFrame, variables: list[gm.Variable]) -> pd.DataFrame:
score_df = cast(pd.DataFrame, df.sample(1))
score_df.iloc[0, :] = [random.normalvariate(0, 1) for _ in range(df.shape[1])]
return score_df
class AnomalyGaussianProcessOutlier(AnomalyDetector):
anomaly_type = "gp_outlier"

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_score = self.get_nans_df(df, variable_ids)

class GPAnomalyDetector(AnomalyDetector):
anomaly_type = "gp"
# Filter to only a couple of countries
# TODO: make this fast enough or process only the most populous countries
df = df.loc[df.entity_name.isin(["France", "Japan", "Spain"]), :]

def get_score_df(self, df: pd.DataFrame, variables: list[gm.Variable]) -> pd.DataFrame:
score_df = df * np.nan
for variable_id in df.columns:
for _, group in df[variable_id].groupby("entityName", observed=True):
group = group.dropna()
for entity_name, group_variables in df.groupby("entity_name", observed=True):
for variable_id in variable_ids:
group = group_variables[["year", variable_id]].dropna()
if group.empty:
continue

country = group.index.get_level_values("entityName")[0]
series = pd.Series(group.values, index=group.index.get_level_values("year"))
series = pd.Series(group[variable_id].values, index=group["year"])

if len(series) <= 1:
log.warning(f"Insufficient data for {country}")
continue

X, y = self.get_Xy(series)

if y.std() == 0:
continue

t = time.time()
mean_pred, std_pred = self.fit_predict(X, y)
log.info("Fitted GP", country=country, n_samples=len(X), elapsed=round(time.time() - t, 2))
log.info(
"Fitted GP",
variable_id=variable_id,
entity_name=entity_name,
n_samples=len(X),
elapsed=round(time.time() - t, 2),
)

# Calculate Z-score for all points
z = (y - mean_pred) / std_pred

score_df.loc[group.index, variable_id] = z
df_score.loc[group.index, variable_id] = np.abs(z)

score_df = cast(pd.DataFrame, score_df)

return score_df.abs() # type: ignore
return df_score

def get_Xy(self, series: pd.Series) -> tuple[np.ndarray, np.ndarray]:
X = series.index.values.reshape(-1, 1)
Expand All @@ -70,6 +72,8 @@ def fit_predict(self, X, y):
X_mean = np.mean(X)
y_mean = np.mean(y)
y_std = np.std(y)
assert y_std > 0, "Standard deviation of y is zero"

X_normalized = X - X_mean
y_normalized = (y - y_mean) / y_std

Expand Down

0 comments on commit a461a41

Please sign in to comment.