Skip to content

Commit

Permalink
πŸŽ‰ anomalist: Add population and analytics scores (#3412)
Browse files Browse the repository at this point in the history
* ✨ anomalist: stop using mock

* style

* ✨ anomalist: stop using mock data

* re-order mock data

* replace mock data with real data

* πŸŽ‰ anomalist: Add population and analytics scores

* Store scores with all years and combine them on app

* Add anomaly and population score, as well as weighted score

* Move get_scores to utils

---------

Co-authored-by: lucasrodes <[email protected]>
  • Loading branch information
pabloarosado and lucasrodes authored Oct 16, 2024
1 parent fe3027b commit 000d0c5
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 121 deletions.
118 changes: 62 additions & 56 deletions apps/anomalist/anomalist_api.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import tempfile
from pathlib import Path
from typing import Dict, List, Literal, Optional, Tuple, get_args
from typing import List, Literal, Optional, Tuple, cast, get_args

import numpy as np
import pandas as pd
import structlog
from owid.catalog import find
from owid.datautils.dataframes import map_series
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -98,88 +97,84 @@ def get_variables_views_in_charts(
# Handle potential duplicated rows
df = df.drop_duplicates().reset_index(drop=True)

return df


def aggregate_anomalies(
df_scores: pd.DataFrame,
df_population: pd.DataFrame,
df_views: pd.DataFrame,
metadata: Dict[int, gm.Variable],
) -> pd.DataFrame:
# Create a dataframe of zeros.
df_aggregated = df_scores.copy()

df_aggregated["variable"] = map_series(
df_aggregated["variable_id"],
{variable_id: metadata[variable_id]["shortName"] for variable_id in metadata}, # type: ignore
warn_on_missing_mappings=True,
warn_on_unused_mappings=False,
)

# Add population score to the aggregated scores dataframe.
df_aggregated = df_aggregated.merge(
get_population_score(df_aggregated, df_population), on=["entity_name", "year"], how="left"
)

# Add analytics score to the main scores dataframe.
# TODO: Improve this, currently df_aggregated is not used.
df_score_analytics = get_analytics_score(df_aggregated, df_views)
df_aggregated = df_aggregated.merge(df_score_analytics, on=["variable_id"], how="left")
if len(df) == 0:
df = pd.DataFrame(columns=["variable_id", "chart_id", "chart_slug", "views_7d", "views_14d", "views_365d"])

# NOTE: Variables that have do not have charts will have an analytics score nan.
# Fill them with zeros.
df_aggregated["analytics_score"] = df_score_analytics["analytics_score"].fillna(0)
return df

return df_aggregated

def add_population_score(df_reduced: pd.DataFrame) -> pd.DataFrame:
# To normalize the analytics score to the range 0, 1, divide by an absolute maximum number of people.
# NOTE: This should a safe assumption before ~2060.
absolute_maximum_population = 1e10
# Value to use to fill missing values in the population score (e.g. for regions like "Middle East" that are not included in our population dataset).
fillna_value = 0.5

def get_population_score(df_aggregated: pd.DataFrame, df_population: pd.DataFrame) -> pd.DataFrame:
# NOTE: This is a special type of score that is added afterwards to help rank anomalies.
# It would not make sense to calculate a population score at the beginning and select the largest anomalies based on it (which would trivially pick the most populated countries).
# Load the latest population data.
df_population = load_latest_population()
error = f"Expected a maximum population below {absolute_maximum_population}."
assert df_population[df_population["year"] < 2040]["population"].max() < absolute_maximum_population, error

# First, get the unique combinations of country-years in the scores dataframe, and add population to it.
df_score_population = (
df_aggregated[["entity_name", "year"]] # type: ignore
df_reduced[["entity_name", "year"]] # type: ignore
.drop_duplicates()
.merge(df_population, on=["entity_name", "year"], how="left")
)
# To normalize the population score to the range 0, 1, divide by an absolute maximum population of 10 billion.

# To normalize the population score to the range 0, 1, divide by an absolute maximum population.
# To have more convenient numbers, take the natural logarithm of the population.
df_score_population["population_score"] = np.log(df_score_population["population"]) / np.log(10e9)
# It's unclear what to do with entities that do not have a population (e.g. "Middle East").
# For now, add a score of 0.5 to them.
df_score_population["population_score"] = df_score_population["population_score"].fillna(0.5)
df_score_population["score_population"] = np.log(df_score_population["population"]) / np.log(
absolute_maximum_population
)

df_score_population = df_score_population[["entity_name", "year", "population_score"]]
# Add the population score to the scores dataframe.
df_reduced = df_reduced.merge(df_score_population, on=["entity_name", "year"], how="left").drop(
columns="population", errors="raise"
)

return df_score_population
# Variables that do not have population data will have a population score nan. Fill them with a low value.
df_reduced["score_population"] = df_reduced["score_population"].fillna(fillna_value)

return df_reduced

def get_analytics_score(df_aggregated: pd.DataFrame, df_views: pd.DataFrame) -> pd.DataFrame:

def add_analytics_score(df_reduced: pd.DataFrame) -> pd.DataFrame:
# Focus on the following specific analytics column.
analytics_column = "views_14d"
# To normalize the analytics score to the range 0, 1, divide by an absolute maximum number of views.
absolute_maximum_views = 1e6
# Value to use to fill missing values in the analytics score (e.g. for variables that are not used in charts).
fillna_value = 0.1

# Get number of views in charts for each variable id.
df_views = get_variables_views_in_charts(list(df_reduced["variable_id"].unique()))
# Sanity check.
if not df_views.empty:
error = f"Expected a maximum number of views below {absolute_maximum_views}. Change this limit."
assert df_views[analytics_column].max() < absolute_maximum_views, error

# Get the sum of the number of views in charts for each variable id in the last 14 days.
# So, if an indicator is used in multiple charts, their views are summed.
# This rewards indicators that are used multiple times, and on popular charts.
# NOTE: The analytics table often contains nans. Not sure why this happens, e.g. to coal-electricity-per-capita: https://admin.owid.io/admin/charts/4451/edit
# For now, for convenience, fill them with 1.1 views (to avoid zeros when calculating the log).
# NOTE: The analytics table often contains nans. For now, for convenience, fill them with 1.1 views (to avoid zeros when calculating the log).
df_score_analytics = (
df_views.fillna(1.1)
.groupby("variable_id")
df_views.groupby("variable_id")
.agg({analytics_column: "sum"})
.reset_index()
.rename(columns={analytics_column: "views"})
)
# To normalize the analytics score to the range 0, 1, divide by an absolute maximum number of views.
absolute_maximum_views = 1e6
error = f"Expected a maximum number of views below {absolute_maximum_views}. Change this limit."
assert df_views[analytics_column].max() < absolute_maximum_views, error
# To have more convenient numbers, take the natural logarithm of the views.
df_score_analytics["analytics_score"] = np.log(df_score_analytics["views"]) / np.log(absolute_maximum_views)
df_score_analytics["score_analytics"] = np.log(df_score_analytics["views"]) / np.log(absolute_maximum_views)

return df_score_analytics
# Add the analytics score to the scores dataframe.
df_reduced = df_reduced.merge(df_score_analytics, on=["variable_id"], how="left")

# Variables that do not have charts will have an analytics score nan.
# Fill them with a low value (e.g. 0.1) to avoid zeros when calculating the final score.
df_reduced["score_analytics"] = df_reduced["score_analytics"].fillna(fillna_value)

return df_reduced


########################################################################################################################
Expand Down Expand Up @@ -376,3 +371,14 @@ def _load_variables_meta(engine: Engine, variable_ids: list[int]) -> list[gm.Var
# select all variables using SQLAlchemy
with Session(engine) as session:
return gm.Variable.load_variables(session, list(df["id"]))


def combine_and_reduce_scores_df(anomalies: List[gm.Anomaly]) -> pd.DataFrame:
"""Get the combined dataframe with scores for all anomalies, and reduce it to include only the largest anomaly for each contry-indicator."""
# Combine the reduced dataframes for all anomalies into a single dataframe.
dfs = [cast(pd.DataFrame, anomaly.dfReduced).assign(**{"type": anomaly.anomalyType}) for anomaly in anomalies]
df_reduced = cast(pd.DataFrame, pd.concat(dfs, ignore_index=True))
# Dtypes
# df = df.astype({"year": int})

return df_reduced
79 changes: 15 additions & 64 deletions apps/wizard/app_pages/anomalist/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
"""

from typing import List, Tuple, cast
from typing import List, Tuple

import pandas as pd
import streamlit as st

from apps.anomalist.anomalist_api import anomaly_detection
from apps.wizard.app_pages.anomalist.utils import AnomalyTypeEnum, create_tables, get_datasets_and_mapping_inputs
from apps.wizard.app_pages.anomalist.utils import (
AnomalyTypeEnum,
create_tables,
get_datasets_and_mapping_inputs,
get_scores,
)
from apps.wizard.utils import cached, set_states
from apps.wizard.utils.chart_config import bake_chart_config
from apps.wizard.utils.components import Pagination, grapher_chart, st_horizontal, tag_in_md
Expand Down Expand Up @@ -118,67 +123,6 @@ def get_variable_mapping(variable_ids):
return mapping


def parse_anomalies_to_df() -> pd.DataFrame | None:
"""Given a list of anomalies, parse them into a dataframe.
This function takes the anomalies stored in st.session_state.anomalist_anomalies and parses them into a single dataframe.
- It loads dfScore from each anomaly.
- It keeps only one row per entity and anomaly type, based on the highest anomaly score.
- Concatenates all dataframes.
- Renames columns to appropriate names.
- Adjusts dtypes.
- Adds population and analytics scores.
- Calculates weighed combined score
"""
# Only return dataframe if there are anomalies!
if len(st.session_state.anomalist_anomalies) > 0:
dfs = []
for anomaly in st.session_state.anomalist_anomalies:
# Load
df = anomaly.dfReduced
if isinstance(df, pd.DataFrame):
# Assign anomaly type in df
df["type"] = anomaly.anomalyType
# Add to list
dfs.append(df)
else:
raise ValueError(f"Anomaly {anomaly} has no dfScore attribute.")

# Concatenate all dfs
df = cast(pd.DataFrame, pd.concat(dfs, ignore_index=True))

# Rename columns
df = df.rename(
columns={
"variable_id": "indicator_id",
"anomaly_score": "score",
}
)

# Dtypes
df = df.astype(
{
"year": int,
}
)

# Add population and analytics score:
df["score_population"] = 1
df["score_analytics"] = 1

# Weighed combined score
w_score = 1
w_pop = 1
w_views = 1
df["score_weighed"] = (
w_score * df["score"] + w_pop * df["score_population"] + w_views * df["score_analytics"]
) / (w_score + w_pop + w_views)

return df


def ask_llm_for_summary(df: pd.DataFrame):
pass

Expand Down Expand Up @@ -459,7 +403,14 @@ def _change_chart_selection(df, key_table, key_selection):
st.session_state.anomalist_anomalies_out_of_date = False

# 3.4/ Parse obtained anomalist into dataframe
st.session_state.anomalist_df = parse_anomalies_to_df()
if len(st.session_state.anomalist_anomalies) > 0:
# Combine scores from all anomalies, reduce them (to get the maximum anomaly score for each entity-indicator),
# and add population and analytics scores.
df = get_scores(anomalies=st.session_state.anomalist_anomalies)

st.session_state.anomalist_df = df
else:
st.session_state.anomalist_df = None

# 4/ SHOW ANOMALIES (only if any are found)
if st.session_state.anomalist_df is not None:
Expand Down
29 changes: 28 additions & 1 deletion apps/wizard/app_pages/anomalist/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Utils for chart revision tool."""
from enum import Enum
from typing import Dict, Tuple
from typing import Dict, List, Tuple

import pandas as pd
import streamlit as st
from structlog import get_logger

import etl.grapher_model as gm
from apps.anomalist.anomalist_api import add_analytics_score, add_population_score, combine_and_reduce_scores_df
from apps.wizard.utils.db import WizardDB
from apps.wizard.utils.io import get_steps_df
from etl.config import OWID_ENV, OWIDEnv
Expand Down Expand Up @@ -80,3 +82,28 @@ def create_tables(_owid_env: OWIDEnv = OWID_ENV):
If exist, nothing is created.
"""
gm.Anomaly.create_table(_owid_env.engine)


@st.cache_data(show_spinner=False)
def get_scores(anomalies: List[gm.Anomaly]) -> pd.DataFrame:
"""Combine and reduce scores dataframe."""
df = combine_and_reduce_scores_df(anomalies)

# Add a population score.
df = add_population_score(df_reduced=df)

# Add an analytics score.
df = add_analytics_score(df_reduced=df)

# Rename columns for convenience.
df = df.rename(columns={"variable_id": "indicator_id", "anomaly_score": "score"}, errors="raise")

# Create a weighed combined score.
w_score = 1
w_pop = 1
w_views = 1
df["score_weighed"] = (w_score * df["score"] + w_pop * df["score_population"] + w_views * df["score_analytics"]) / (
w_score + w_pop + w_views
)

return df

0 comments on commit 000d0c5

Please sign in to comment.