Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

feat(cli): add command to import KPIs from a JSON #557

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chaos_genius/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,4 @@ def register_commands(app):
app.cli.add_command(commands.reinstall_db)
app.cli.add_command(commands.insert_demo_data)
app.cli.add_command(commands.run_anomaly_rca_scheduler)
app.cli.add_command(commands.kpi_import)
62 changes: 62 additions & 0 deletions chaos_genius/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Click commands."""
import json
import os
from datetime import datetime
from glob import glob
Expand All @@ -8,6 +9,7 @@
import click
from flask.cli import with_appcontext

from chaos_genius.controllers.kpi_controller import add_kpi

HERE = os.path.abspath(os.path.dirname(__file__))
PROJECT_ROOT = os.path.join(HERE, os.pardir)
Expand Down Expand Up @@ -145,6 +147,66 @@ def run_anomaly_rca_scheduler():
click.echo("Completed running scheduler. Tasks should be running in the worker.")


@click.command()
@with_appcontext
@click.argument("file_name")
def kpi_import(file_name: str):
"""Adds KPIs defined in given JSON file.

The JSON must be in the following format:

\b
```
[
{
"name": "",
"is_certified": false,
"data_source": 0,
"kpi_type": "",
"kpi_query": "",
"schema_name": null,
"table_name": "",
"metric": "",
"aggregation": "",
"datetime_column": "",
"filters": [],
"dimensions": [],
"run_anomaly": true,
"anomaly_params": {
"frequency": "D",
"anomaly_period": 90,
"seasonality": [],
"model_name": "ProphetModel",
"sensitivity": "High"
}
}
]
```
"""
with open(file_name) as f:
kpis = json.load(f)

for data in kpis:
data: dict

try:
kpi, err, _ = add_kpi(data, validate=True, run_analytics=True)

if err != "":
click.echo(click.style(
f"Error in KPI ({kpi.name}): {err}",
fg="red",
bold=True
))

except Exception as e:
click.echo(click.style(
f"Could not set up KPI with name: {data['name']}, skipping. Error: {e}",
fg="red",
bold=True
))


@click.command()
@with_appcontext
def reinstall_db():
Expand Down
123 changes: 116 additions & 7 deletions chaos_genius/controllers/kpi_controller.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,132 @@
import logging
import typing
from datetime import date, datetime, timedelta
from typing import Optional, Union, Iterator
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union

from flask import current_app # noqa: F401
from flask import current_app

from chaos_genius.controllers.dashboard_controller import create_dashboard_kpi_mapper # noqa: F401
from chaos_genius.controllers.task_monitor import checkpoint_failure, checkpoint_success
from chaos_genius.core.anomaly.controller import AnomalyDetectionController
from chaos_genius.core.rca.rca_controller import RootCauseAnalysisController
from chaos_genius.core.utils.data_loader import DataLoader
from chaos_genius.core.utils.kpi_validation import validate_kpi
from chaos_genius.databases.models.kpi_model import Kpi
from chaos_genius.settings import DAYS_OFFSET_FOR_ANALTYICS, MAX_DEEPDRILLS_SLACK_DAYS

from chaos_genius.settings import (
MAX_DEEPDRILLS_SLACK_DAYS,
DAYS_OFFSET_FOR_ANALTYICS,
)
logger = logging.getLogger(__name__)


logger = logging.getLogger(__name__)
def add_kpi(
data: Dict[str, Any],
validate=True,
run_analytics=True
) -> Tuple[Kpi, str, bool]:
"""Adds a new KPI.

Also handles adding the KPI to the default dashboard, running analytics and
validations.

If "anomaly_params" is present in data, it also performs anomaly params validation
and runs anomaly.

Arguments:
data: the KPI data. Field names are same as the KPI model.
validate: whether to perform KPI validation. (Default: True).
run_rca: whether to start RCA task after adding. (Default: True).

Returns:
A tuple of (newly added KPI, error message - empty if success, boolean
indicating whether error was critical)
"""
data["dimensions"] = data.get("dimensions", [])

data["kpi_query"] = _kpi_query_strip_trailing_semicolon(
(data.get("kpi_query", "") or "")
)

has_anomaly_setup = "anomaly_params" in data
new_anomaly_params = {}

new_kpi = Kpi(
name=data.get("name"),
is_certified=data.get("is_certified"),
data_source=data.get("data_source"),
kpi_type=data.get("dataset_type") or data.get("kpi_type"),
kpi_query=data.get("kpi_query"),
schema_name=data.get("schema_name"),
table_name=data.get("table_name"),
metric=data.get("metric"),
aggregation=data.get("aggregation"),
datetime_column=data.get("datetime_column"),
filters=data.get("filters"),
dimensions=data.get("dimensions"),
run_anomaly=data.get("run_anomaly", True),
)

if has_anomaly_setup:
from chaos_genius.views.anomaly_data_view import validate_partial_anomaly_params

# validate anomaly params
err, new_anomaly_params = validate_partial_anomaly_params(
data["anomaly_params"]
)
if err != "":
return (
new_kpi,
f"Error in validating anomaly params for KPI {data['name']}: {err}",
True,
)

if validate:
# Perform KPI Validation
status, message = validate_kpi(new_kpi.as_dict)
if status is not True:
return new_kpi, message, True

new_kpi.save(commit=True)

# Add KPI to dashboard 0 and all required dashboards
_add_kpi_to_dashboards(new_kpi.id, data.get("dashboard", []) + [0])

if has_anomaly_setup:
from chaos_genius.views.anomaly_data_view import update_anomaly_params

# update anomaly params
err, new_kpi = update_anomaly_params(
new_kpi, new_anomaly_params, check_editable=False
)

if err != "":
return (
new_kpi,
f"Error updating anomaly params for KPI {new_kpi.name}: {err}",
True
)

if run_analytics:
# we ensure analytics tasks are run as soon as analytics is configured
from chaos_genius.jobs.anomaly_tasks import queue_kpi_analytics
queue_kpi_analytics(new_kpi.id, has_anomaly_setup)

return new_kpi, "", False


def _kpi_query_strip_trailing_semicolon(query: str) -> str:
if not query:
return ""

query = query.strip()

# remove trailing semicolon
if query[-1] == ";":
query = query[:-1]
return query


def _add_kpi_to_dashboards(kpi_id: int, dashboards: List[int]):
dashboards = list(set(dashboards))
create_dashboard_kpi_mapper(dashboards, [kpi_id])


def _is_data_present_for_end_date(
Expand Down
27 changes: 27 additions & 0 deletions chaos_genius/jobs/anomaly_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,30 @@ def anomaly_scheduler():
g = group(task_group)
res = g.apply_async()
return res


def queue_kpi_analytics(kpi_id: int, run_anomaly=True):
"""Adds analytics tasks to the queue for given KPI."""
anomaly_task = None
if run_anomaly:
anomaly_task = ready_anomaly_task(kpi_id)

# run rca as soon as new KPI is added
rca_task = ready_rca_task(kpi_id)
if rca_task is None:
logger.error(
"Could not run RCA task since newly added KPI was not found: %s",
kpi_id,
)
else:
rca_task.apply_async()

if run_anomaly:
if anomaly_task is None:
logger.error(
"Not running anomaly since it is not configured or KPI "
"(%d) was not found.",
kpi_id,
)
else:
anomaly_task.apply_async()
66 changes: 14 additions & 52 deletions chaos_genius/views/kpi_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import traceback # noqa: F401
from datetime import date, datetime, timedelta
from typing import cast

from flask import ( # noqa: F401
Blueprint,
Expand All @@ -24,7 +25,7 @@
from chaos_genius.databases.models.rca_data_model import RcaData
from chaos_genius.extensions import cache, db
from chaos_genius.databases.db_utils import chech_editable_field
from chaos_genius.controllers.kpi_controller import get_kpi_data_from_id
from chaos_genius.controllers.kpi_controller import add_kpi, get_kpi_data_from_id
from chaos_genius.controllers.dashboard_controller import (
create_dashboard_kpi_mapper,
get_mapper_obj_by_dashboard_ids,
Expand All @@ -46,67 +47,28 @@


@blueprint.route("", methods=["GET", "POST"])
@blueprint.route("/", methods=["GET", "POST"]) # TODO: Remove this
@blueprint.route("/", methods=["GET", "POST"]) # TODO: Remove this
def kpi():
"""kpi list view."""
"""KPI add and list view."""
# Handle logging in
if request.method == "POST":
if not request.is_json:
return jsonify({"error": "The request payload is not in JSON format"})

data = request.get_json()
data["dimensions"] = [] if data["dimensions"] is None else data["dimensions"]

if data.get("kpi_query", "").strip():
data["kpi_query"] = data["kpi_query"].strip()
# remove trailing semicolon
if data["kpi_query"][-1] == ";":
data["kpi_query"] = data["kpi_query"][:-1]

new_kpi = Kpi(
name=data.get("name"),
is_certified=data.get("is_certified"),
data_source=data.get("data_source"),
kpi_type=data.get("dataset_type"),
kpi_query=data.get("kpi_query"),
schema_name=data.get("schema_name"),
table_name=data.get("table_name"),
metric=data.get("metric"),
aggregation=data.get("aggregation"),
datetime_column=data.get("datetime_column"),
filters=data.get("filters"),
dimensions=data.get("dimensions"),
)
# Perform KPI Validation
status, message = validate_kpi(new_kpi.as_dict)
if status is not True:
return jsonify(
{"error": message, "status": "failure", "is_critical": "true"}
)

new_kpi.save(commit=True)

# Add the dashboard id 0 to the kpi
dashboard_list = data.get("dashboard", []) + [0]
dashboard_list = list(set(dashboard_list))
mapper_obj_list = create_dashboard_kpi_mapper(dashboard_list, [new_kpi.id])
if data is None:
return jsonify({"error": "The request payload is not in JSON format"})

# TODO: Fix circular import error
from chaos_genius.jobs.anomaly_tasks import ready_rca_task
kpi, err_msg, critical = add_kpi(data)

# run rca as soon as new KPI is added
rca_task = ready_rca_task(new_kpi.id)
if rca_task is None:
print(
f"Could not run RCA task since newly added KPI was not found: {new_kpi.id}"
)
else:
rca_task.apply_async()
if err_msg != "":
ret = {"error": err_msg, "status": "failure"}
if critical:
ret["is_critical"] = "true"
return jsonify(ret)

return jsonify(
{
"data": {"kpi_id": new_kpi.id},
"message": f"KPI {new_kpi.name} has been created successfully.",
"data": {"kpi_id": kpi.id},
"message": f"KPI {kpi.name} has been created successfully.",
"status": "success",
}
)
Expand Down