Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 anomalist: Improve anomalist CLI #3399

Merged
merged 3 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 117 additions & 86 deletions apps/anomalist/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Literal, Optional, get_args
import json
from typing import Dict, Literal, Optional, Tuple, get_args

import click
import pandas as pd
Expand All @@ -8,14 +9,13 @@
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session

from apps.anomalist.bard_anomaly import NaNAnomalyDetector
from apps.anomalist.gp_anomaly import GPAnomalyDetector, SampleAnomalyDetector
from etl import grapher_model as gm
from etl.db import get_engine, read_sql
from etl.grapher_io import variable_data_df_from_s3
from etl.paths import CACHE_DIR

from .bard_anomaly import NaNAnomalyDetector
from .gp_anomaly import GPAnomalyDetector, SampleAnomalyDetector

log = structlog.get_logger()

memory = Memory(CACHE_DIR, verbose=0)
Expand All @@ -25,26 +25,31 @@

@click.command(name="anomalist", cls=RichCommand, help=__doc__)
@click.option(
"--type",
"--anomaly-types",
type=click.Choice(get_args(ANOMALY_TYPE)),
multiple=True,
help="Type of anomaly detection algorithm to use.",
default=None,
help="Type (or types) of anomaly detection algorithm to use.",
)
@click.option(
"--dataset-id",
"--dataset-ids",
type=int,
help="Generate anomalies for a specific dataset ID.",
multiple=True,
default=None,
help="Generate anomalies for the variables of a specific dataset ID (or multiple dataset IDs).",
)
@click.option(
"--previous-dataset-id",
type=int,
help="Dataset ID of the previous version.",
"--variable-mapping",
type=str,
default=None,
help="Optional JSON dictionary mapping variable IDs from a previous to a new version (where at least some of the new variable IDs must belong to the datasets whose IDs were given).",
)
@click.option(
"--variable-id",
"--variable-ids",
type=int,
multiple=True,
help="Generate anomalies for a list of variable IDs.",
default=None,
help="Generate anomalies for a list of variable IDs (in addition to the ones from dataset ID, if any dataset was given).",
)
@click.option(
"--dry-run/--no-dry-run",
Expand All @@ -59,10 +64,10 @@
help="Drop anomalies table and recreate it. This is useful for development when the schema changes.",
)
def cli(
type: Optional[ANOMALY_TYPE],
dataset_id: Optional[int],
previous_dataset_id: Optional[int],
variable_id: Optional[list[int]],
anomaly_types: Optional[Tuple[str, ...]],
dataset_ids: Optional[list[int]],
variable_mapping: Optional[str], # type: ignore
variable_ids: Optional[list[int]],
dry_run: bool,
reset_db: bool,
) -> None:
Expand Down Expand Up @@ -98,73 +103,92 @@ def cli(
gm.Anomaly.__table__.create(engine) # type: ignore
return

assert type, "Anomaly type must be specified."

# load metadata
variables = _load_variables_meta(engine, dataset_id, variable_id)

# set dataset_id if we're using variables
if not dataset_id:
assert set(v.datasetId for v in variables) == {variables[0].datasetId}
dataset_id = variables[0].datasetId
# If no anomaly types are provided, default to all available types
if not anomaly_types:
anomaly_types = get_args(ANOMALY_TYPE)

# Parse the variable_mapping if any provided.
if variable_mapping:
try:
variable_mapping: Dict[int, int] = {
int(variable_old): int(variable_new)
for variable_old, variable_new in json.loads(variable_mapping).items()
}
except json.JSONDecodeError:
raise ValueError("Invalid JSON format for variable_mapping.")
else:
variable_mapping = dict()

# Load metadata for all variables in dataset_ids (if any given) and variable_ids, and new variables in variable_mapping.
variable_ids_all = (list(variable_mapping.values()) if variable_mapping else []) + (
list(variable_ids) if variable_ids else []
)
if dataset_ids is None:
dataset_ids = []
variables = _load_variables_meta(engine, dataset_ids, variable_ids_all)

# Create a dictionary of all variable_ids for each dataset_id.
dataset_variable_ids = {}
for variable in variables:
if variable.datasetId not in dataset_variable_ids:
dataset_variable_ids[variable.datasetId] = []
dataset_variable_ids[variable.datasetId].append(variable)

log.info("Detecting anomalies")
anomalies = []

for typ in type:
if typ == "gp":
detector = GPAnomalyDetector()
elif typ == "sample":
detector = SampleAnomalyDetector()
elif typ == "nan":
detector = NaNAnomalyDetector()
else:
raise ValueError(f"Unsupported anomaly type: {typ}")

# dataframe with (entityName, year) as index and variableId as columns
log.info("Loading data from S3")
df = load_data_for_variables(engine, variables)

# detect anomalies
log.info("Detecting anomalies")
# the output has the same shape as the input dataframe, but we should make
# it possible to return anomalies in a long format (for detectors that return
# just a few anomalies)
df_score = detector.get_score_df(df, variables)

# validate format of the output dataframe
# TODO

anomaly = gm.Anomaly(
datasetId=dataset_id,
anomalyType=detector.anomaly_type,
)
anomaly.dfScore = df_score

anomalies.append(anomaly)

if dry_run:
for anomaly in anomalies:
log.info(anomaly)
else:
with Session(engine) as session:
log.info("Deleting existing anomalies")
session.query(gm.Anomaly).filter(
gm.Anomaly.datasetId == dataset_id,
gm.Anomaly.anomalyType.in_([a.anomalyType for a in anomalies]),
).delete(synchronize_session=False)
session.commit()

# Insert new anomalies
log.info("Writing anomalies to database")
session.add_all(anomalies)
session.commit()
for dataset_id, variables_in_dataset in dataset_variable_ids.items():
for anomaly_type in anomaly_types:
if anomaly_type == "gp":
detector = GPAnomalyDetector()
elif anomaly_type == "sample":
detector = SampleAnomalyDetector()
elif anomaly_type == "nan":
detector = NaNAnomalyDetector()
else:
raise ValueError(f"Unsupported anomaly type: {anomaly_type}")

# dataframe with (entityName, year) as index and variableId as columns
log.info("Loading data from S3")
df = load_data_for_variables(engine, variables_in_dataset)

# TODO: If any of the variables are in variable_mapping, load df_old as well.

# detect anomalies
log.info("Detecting anomalies")
# the output has the same shape as the input dataframe, but we should make
# it possible to return anomalies in a long format (for detectors that return
# just a few anomalies)
df_score = detector.get_score_df(df, variables_in_dataset)

# TODO: validate format of the output dataframe

anomaly = gm.Anomaly(
datasetId=dataset_id,
anomalyType=detector.anomaly_type,
)
anomaly.dfScore = df_score

if not dry_run:
with Session(engine) as session:
# TODO: Is this right? I suppose it should also delete if already existing.
log.info("Deleting existing anomalies")
session.query(gm.Anomaly).filter(
gm.Anomaly.datasetId == dataset_id,
gm.Anomaly.anomalyType.in_([a.anomalyType for a in anomalies]),
).delete(synchronize_session=False)
session.commit()

# Insert new anomalies
log.info("Writing anomalies to database")
session.add_all(anomalies)
session.commit()


# @memory.cache
def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd.DataFrame:
# TODO: cache this on disk & re-validate with etags
df_long = variable_data_df_from_s3(engine, [v.id for v in variables])
df_long = variable_data_df_from_s3(engine, [v.id for v in variables], workers=None)

# pivot dataframe
df = df_long.pivot(index=["entityName", "year"], columns="variableId", values="value")
Expand All @@ -184,32 +208,39 @@ def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd.

@memory.cache
def _load_variables_meta(
engine: Engine, dataset_id: Optional[int], variable_ids: Optional[list[int]]
engine: Engine, dataset_ids: Optional[list[int]], variable_ids: Optional[list[int]]
) -> list[gm.Variable]:
if dataset_id and variable_ids:
raise ValueError("Cannot specify both dataset ID and variable IDs.")

if variable_ids:
if dataset_ids:
q = """
select id from variables
where id in %(variable_ids)s
where datasetId in %(dataset_ids)s
"""
elif dataset_id:
df_from_dataset_ids = read_sql(q, engine, params={"dataset_ids": dataset_ids})
else:
df_from_dataset_ids = pd.DataFrame()

if variable_ids:
q = """
select id from variables
where datasetId = %(dataset_id)s
where id in %(variable_ids)s
"""
# load all variables from a random dataset
df_from_variable_ids = read_sql(q, engine, params={"variable_ids": variable_ids})
else:
df_from_variable_ids = pd.DataFrame()

# Combine both dataframes to get all possible variables required.
df = pd.concat([df_from_dataset_ids, df_from_variable_ids]).drop_duplicates()

# load all variables from a random dataset
if df.empty:
q = """
with t as (
select id from datasets order by rand() limit 1
)
select id from variables
where datasetId in (select id from t)
"""

df = read_sql(q, engine, params={"variable_ids": variable_ids, "dataset_id": dataset_id})
df = read_sql(q, engine)

# select all variables using SQLAlchemy
with Session(engine) as session:
Expand Down
2 changes: 1 addition & 1 deletion etl/grapher_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def _ensure_variable_ids(
def variable_data_df_from_s3(
engine: Engine,
variable_ids: List[int] = [],
workers: int = 1,
workers: Optional[int] = 1,
value_as_str: bool = True,
) -> pd.DataFrame:
"""Fetch data from S3 and add entity code and name from DB."""
Expand Down
Loading