From a461a4150325f08a128d5ccaf622dc304d9809c4 Mon Sep 17 00:00:00 2001 From: Marigold Date: Tue, 15 Oct 2024 19:28:51 +0200 Subject: [PATCH] :sparkles: Add GP outlier detector --- apps/anomalist/anomalist_api.py | 65 ++++++------------- apps/anomalist/bard_anomaly.py | 24 ------- apps/anomalist/cli.py | 13 +++- apps/anomalist/detectors.py | 43 +++++++++--- .../{gp_anomaly.py => gp_detector.py} | 58 +++++++++-------- 5 files changed, 94 insertions(+), 109 deletions(-) delete mode 100644 apps/anomalist/bard_anomaly.py rename apps/anomalist/{gp_anomaly.py => gp_detector.py} (71%) diff --git a/apps/anomalist/anomalist_api.py b/apps/anomalist/anomalist_api.py index b0fe7733c48..f544e1b97ef 100644 --- a/apps/anomalist/anomalist_api.py +++ b/apps/anomalist/anomalist_api.py @@ -16,6 +16,7 @@ AnomalyUpgradeMissing, get_long_format_score_df, ) +from apps.anomalist.gp_detector import AnomalyGaussianProcessOutlier from apps.wizard.utils.paths import WIZARD_ANOMALIES_RELATIVE from etl import grapher_model as gm from etl.config import OWID_ENV @@ -28,16 +29,20 @@ # Name of index columns for dataframe. INDEX_COLUMNS = ["entity_name", "year"] -# Define anomaly types. -ANOMALY_TYPE = Literal["upgrade_change", "time_change", "upgrade_missing"] - # Define mapping of available anomaly types to anomaly detectors. ANOMALY_DETECTORS = { - "time_change": AnomalyTimeChange, - "upgrade_change": AnomalyUpgradeChange, - "upgrade_missing": AnomalyUpgradeMissing, + detector.anomaly_type: detector + for detector in [ + AnomalyTimeChange, + AnomalyUpgradeChange, + AnomalyUpgradeMissing, + AnomalyGaussianProcessOutlier, + ] } +# Define anomaly types. +ANOMALY_TYPE = Literal[tuple(ANOMALY_DETECTORS.keys())] + ######################################################################################################################## # AGGREGATE ANOMALIES: @@ -181,7 +186,6 @@ def get_analytics_score(df_aggregated: pd.DataFrame, df_views: pd.DataFrame) -> def anomaly_detection( anomaly_types: Optional[Tuple[str, ...]] = None, - dataset_ids: Optional[list[int]] = None, variable_mapping: Optional[dict[int, int]] = None, variable_ids: Optional[list[int]] = None, dry_run: bool = False, @@ -212,12 +216,9 @@ def anomaly_detection( (set(variable_mapping.keys()) | set(variable_mapping.values())) if variable_mapping else set() ) variable_ids_all = list(variable_ids_mapping | set(variable_ids or [])) - if dataset_ids is None: - dataset_ids = [] # Dictionary variable_id: Variable object, for all variables (old and new). variables = { - variable.id: variable - for variable in _load_variables_meta(engine=engine, dataset_ids=dataset_ids, variable_ids=variable_ids_all) + variable.id: variable for variable in _load_variables_meta(engine=engine, variable_ids=variable_ids_all) } # Create a dictionary of all variable_ids for each dataset_id (only for new variables). @@ -267,7 +268,7 @@ def anomaly_detection( # TODO: validate format of the output dataframe anomaly = gm.Anomaly( datasetId=dataset_id, - anomalyType=detector.anomaly_type, + anomalyType=anomaly_type, ) anomaly.dfScore = df_score_long @@ -339,40 +340,12 @@ def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd. # @memory.cache -def _load_variables_meta( - engine: Engine, dataset_ids: Optional[list[int]], variable_ids: Optional[list[int]] -) -> list[gm.Variable]: - if dataset_ids: - q = """ - select id from variables - where datasetId in %(dataset_ids)s - """ - df_from_dataset_ids = read_sql(q, engine, params={"dataset_ids": dataset_ids}) - else: - df_from_dataset_ids = pd.DataFrame() - - if variable_ids: - q = """ - select id from variables - where id in %(variable_ids)s - """ - df_from_variable_ids = read_sql(q, engine, params={"variable_ids": variable_ids}) - else: - df_from_variable_ids = pd.DataFrame() - - # Combine both dataframes to get all possible variables required. - df = pd.concat([df_from_dataset_ids, df_from_variable_ids]).drop_duplicates() - - # load all variables from a random dataset - if df.empty: - q = """ - with t as ( - select id from datasets order by rand() limit 1 - ) - select id from variables - where datasetId in (select id from t) - """ - df = read_sql(q, engine) +def _load_variables_meta(engine: Engine, variable_ids: list[int]) -> list[gm.Variable]: + q = """ + select id from variables + where id in %(variable_ids)s + """ + df = read_sql(q, engine, params={"variable_ids": variable_ids}) # select all variables using SQLAlchemy with Session(engine) as session: diff --git a/apps/anomalist/bard_anomaly.py b/apps/anomalist/bard_anomaly.py deleted file mode 100644 index 6e92dd0bcf7..00000000000 --- a/apps/anomalist/bard_anomaly.py +++ /dev/null @@ -1,24 +0,0 @@ -import pandas as pd - -from etl import grapher_model as gm - -from .gp_anomaly import AnomalyDetector - - -class NaNAnomalyDetector(AnomalyDetector): - anomaly_type = "nan" - - def get_score_df(self, df: pd.DataFrame, variables: list[gm.Variable]) -> pd.DataFrame: - return df.isnull().astype(float) - - -class LostAnomalyDetector(AnomalyDetector): - anomaly_type = "lost" - - -class VersionChangeAnomalyDetector(AnomalyDetector): - anomaly_type = "version_change" - - -class TimeChangeAnomalyDetector(AnomalyDetector): - anomaly_type = "time_change" diff --git a/apps/anomalist/cli.py b/apps/anomalist/cli.py index cc713d7cf68..5243c169eb6 100644 --- a/apps/anomalist/cli.py +++ b/apps/anomalist/cli.py @@ -7,6 +7,7 @@ from rich_click.rich_command import RichCommand from apps.anomalist.anomalist_api import ANOMALY_TYPE, anomaly_detection +from etl.db import get_engine, read_sql from etl.paths import CACHE_DIR log = structlog.get_logger() @@ -96,11 +97,19 @@ def cli( else: variable_mapping_dict = {} + # Load all variables from given datasets + if dataset_ids: + assert not variable_ids, "Cannot specify both dataset IDs and variable IDs." + q = """ + select id from variables + where datasetId in %(dataset_ids)s + """ + variable_ids = list(read_sql(q, get_engine(), params={"dataset_ids": dataset_ids})["id"]) + anomaly_detection( anomaly_types=anomaly_types, - dataset_ids=dataset_ids, variable_mapping=variable_mapping_dict, - variable_ids=variable_ids, + variable_ids=list(variable_ids) if variable_ids else None, dry_run=dry_run, reset_db=reset_db, ) diff --git a/apps/anomalist/detectors.py b/apps/anomalist/detectors.py index aa6c27372c4..99460e79fed 100644 --- a/apps/anomalist/detectors.py +++ b/apps/anomalist/detectors.py @@ -33,22 +33,46 @@ def estimate_bard_epsilon(series: pd.Series) -> float: def get_long_format_score_df(df_score: pd.DataFrame) -> pd.DataFrame: # Create a reduced score dataframe. - df_score_long = df_score.melt( - id_vars=["entity_name", "year"], var_name="variable_id", value_name="anomaly_score" - ).fillna(0) - # For now, keep only the latest year affected for each country-indicator. + df_score_long = df_score.melt(id_vars=["entity_name", "year"], var_name="variable_id", value_name="anomaly_score") + + # Drop NaN anomalies. + df_score_long = df_score_long.dropna(subset=["anomaly_score"]) + + # For now, keep only the highest anomaly score for each country-indicator. df_score_long = ( df_score_long.sort_values("anomaly_score", ascending=False) .drop_duplicates(subset=["variable_id", "entity_name"], keep="first") .reset_index(drop=True) ) + # Save memory by converting to categoricals. + df_score_long = df_score_long.astype({"entity_name": "category", "year": "category", "variable_id": "category"}) + return df_score_long class AnomalyDetector: anomaly_type: str + def get_text(self, entity: str, year: int) -> str: + return f"Anomaly happened in {entity} in {year}!" + + def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: + raise NotImplementedError + + def get_zeros_df(self, df: pd.DataFrame, variable_ids: List[int]) -> pd.DataFrame: + # Create a dataframe of zeros. + df_zeros = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids] + df_zeros[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy() + return df_zeros + + def get_nans_df(self, df: pd.DataFrame, variable_ids: List[int]) -> pd.DataFrame: + # Create a dataframe of nans. + df_nans = pd.DataFrame(np.empty_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids] + df_nans[variable_ids] = np.nan + df_nans[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy() + return df_nans + # Visually inspect the most significant anomalies on a certain scores dataframe. def inspect_anomalies( self, @@ -109,14 +133,15 @@ class AnomalyUpgradeMissing(AnomalyDetector): def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: # Create a dataframe of zeros. - df_lost = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids] - df_lost[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy() + df_lost = self.get_zeros_df(df, variable_ids) # Make 1 all cells that used to have data in the old version, but are missing in the new version. for variable_id_old, variable_id_new in variable_mapping.items(): affected_rows = df[(df[variable_id_old].notnull()) & (df[variable_id_new].isnull())].index df_lost.loc[affected_rows, variable_id_new] = 1 + __import__("ipdb").set_trace() + return df_lost @@ -127,8 +152,7 @@ class AnomalyUpgradeChange(AnomalyDetector): def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: # Create a dataframe of zeros. - df_version_change = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids] - df_version_change[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy() + df_version_change = self.get_zeros_df(df, variable_ids) for variable_id_old, variable_id_new in variable_mapping.items(): # Calculate the BARD epsilon for each variable. @@ -148,8 +172,7 @@ class AnomalyTimeChange(AnomalyDetector): def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: # Create a dataframe of zeros. - df_time_change = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids] - df_time_change[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy() + df_time_change = self.get_zeros_df(df, variable_ids) # Sanity check. error = "The function that detects abrupt time changes assumes the data is sorted by entity_name and year. But this is not the case. Either ensure the data is sorted, or fix the function." diff --git a/apps/anomalist/gp_anomaly.py b/apps/anomalist/gp_detector.py similarity index 71% rename from apps/anomalist/gp_anomaly.py rename to apps/anomalist/gp_detector.py index 45515c3cdf7..da983bd4bd4 100644 --- a/apps/anomalist/gp_anomaly.py +++ b/apps/anomalist/gp_detector.py @@ -1,7 +1,7 @@ import random import time import warnings -from typing import Optional, cast +from typing import Dict, List, Optional import matplotlib.pyplot as plt import numpy as np @@ -13,52 +13,54 @@ from etl import grapher_model as gm -log = structlog.get_logger() - - -class AnomalyDetector: - anomaly_type: str +from .detectors import AnomalyDetector +log = structlog.get_logger() -class SampleAnomalyDetector(AnomalyDetector): - anomaly_type = "sample" - def get_score_df(self, df: pd.DataFrame, variables: list[gm.Variable]) -> pd.DataFrame: - score_df = cast(pd.DataFrame, df.sample(1)) - score_df.iloc[0, :] = [random.normalvariate(0, 1) for _ in range(df.shape[1])] - return score_df +class AnomalyGaussianProcessOutlier(AnomalyDetector): + anomaly_type = "gp_outlier" + def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: + # Create a dataframe of zeros. + df_score = self.get_nans_df(df, variable_ids) -class GPAnomalyDetector(AnomalyDetector): - anomaly_type = "gp" + # Filter to only a couple of countries + # TODO: make this fast enough or process only the most populous countries + df = df.loc[df.entity_name.isin(["France", "Japan", "Spain"]), :] - def get_score_df(self, df: pd.DataFrame, variables: list[gm.Variable]) -> pd.DataFrame: - score_df = df * np.nan - for variable_id in df.columns: - for _, group in df[variable_id].groupby("entityName", observed=True): - group = group.dropna() + for entity_name, group_variables in df.groupby("entity_name", observed=True): + for variable_id in variable_ids: + group = group_variables[["year", variable_id]].dropna() + if group.empty: + continue - country = group.index.get_level_values("entityName")[0] - series = pd.Series(group.values, index=group.index.get_level_values("year")) + series = pd.Series(group[variable_id].values, index=group["year"]) if len(series) <= 1: - log.warning(f"Insufficient data for {country}") continue X, y = self.get_Xy(series) + if y.std() == 0: + continue + t = time.time() mean_pred, std_pred = self.fit_predict(X, y) - log.info("Fitted GP", country=country, n_samples=len(X), elapsed=round(time.time() - t, 2)) + log.info( + "Fitted GP", + variable_id=variable_id, + entity_name=entity_name, + n_samples=len(X), + elapsed=round(time.time() - t, 2), + ) # Calculate Z-score for all points z = (y - mean_pred) / std_pred - score_df.loc[group.index, variable_id] = z + df_score.loc[group.index, variable_id] = np.abs(z) - score_df = cast(pd.DataFrame, score_df) - - return score_df.abs() # type: ignore + return df_score def get_Xy(self, series: pd.Series) -> tuple[np.ndarray, np.ndarray]: X = series.index.values.reshape(-1, 1) @@ -70,6 +72,8 @@ def fit_predict(self, X, y): X_mean = np.mean(X) y_mean = np.mean(y) y_std = np.std(y) + assert y_std > 0, "Standard deviation of y is zero" + X_normalized = X - X_mean y_normalized = (y - y_mean) / y_std