Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

feat(cli): add command to import KPIs from a JSON #557

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chaos_genius/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,4 @@ def register_commands(app):
app.cli.add_command(commands.reinstall_db)
app.cli.add_command(commands.insert_demo_data)
app.cli.add_command(commands.run_anomaly_rca_scheduler)
app.cli.add_command(commands.kpi_import)
69 changes: 69 additions & 0 deletions chaos_genius/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Click commands."""
import json
import os
from datetime import datetime
from glob import glob
Expand All @@ -8,6 +9,14 @@
import click
from flask.cli import with_appcontext

from chaos_genius.controllers.dashboard_controller import create_dashboard_kpi_mapper
from chaos_genius.controllers.kpi_controller import add_kpi
from chaos_genius.core.utils.kpi_validation import validate_kpi
from chaos_genius.databases.models.kpi_model import Kpi
from chaos_genius.views.anomaly_data_view import (
update_anomaly_params,
validate_partial_anomaly_params,
)

HERE = os.path.abspath(os.path.dirname(__file__))
PROJECT_ROOT = os.path.join(HERE, os.pardir)
Expand Down Expand Up @@ -145,6 +154,66 @@ def run_anomaly_rca_scheduler():
click.echo("Completed running scheduler. Tasks should be running in the worker.")


@click.command()
@with_appcontext
@click.argument("file_name")
def kpi_import(file_name: str):
"""Adds KPIs defined in given JSON file.

The JSON must be in the following format:

\b
```
[
{
"name": "",
"is_certified": false,
"data_source": 0,
"kpi_type": "",
"kpi_query": "",
"schema_name": null,
"table_name": "",
"metric": "",
"aggregation": "",
"datetime_column": "",
"filters": [],
"dimensions": [],
"run_anomaly": true,
"anomaly_params": {
"frequency": "D",
"anomaly_period": 90,
"seasonality": [],
"model_name": "ProphetModel",
"sensitivity": "High"
}
}
]
```
"""
with open(file_name) as f:
kpis = json.load(f)

for data in kpis:
data: dict

try:
kpi, err, _ = add_kpi(data, validate=True, run_analytics=True)

if err != "":
click.echo(click.style(
f"Error in KPI ({kpi.name}): {err}",
fg="red",
bold=True
))

except Exception as e:
click.echo(click.style(
f"Could not set up KPI with name: {data['name']}, skipping. Error: {e}",
fg="red",
bold=True
))


@click.command()
@with_appcontext
def reinstall_db():
Expand Down
123 changes: 116 additions & 7 deletions chaos_genius/controllers/kpi_controller.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,132 @@
import logging
import typing
from datetime import date, datetime, timedelta
from typing import Optional, Union, Iterator
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union

from flask import current_app # noqa: F401
from flask import current_app

from chaos_genius.controllers.dashboard_controller import create_dashboard_kpi_mapper # noqa: F401
from chaos_genius.controllers.task_monitor import checkpoint_failure, checkpoint_success
from chaos_genius.core.anomaly.controller import AnomalyDetectionController
from chaos_genius.core.rca.rca_controller import RootCauseAnalysisController
from chaos_genius.core.utils.data_loader import DataLoader
from chaos_genius.core.utils.kpi_validation import validate_kpi
from chaos_genius.databases.models.kpi_model import Kpi
from chaos_genius.settings import DAYS_OFFSET_FOR_ANALTYICS, MAX_DEEPDRILLS_SLACK_DAYS

from chaos_genius.settings import (
MAX_DEEPDRILLS_SLACK_DAYS,
DAYS_OFFSET_FOR_ANALTYICS,
)
logger = logging.getLogger(__name__)


logger = logging.getLogger(__name__)
def add_kpi(
data: Dict[str, Any],
validate=True,
run_analytics=True
) -> Tuple[Kpi, str, bool]:
"""Adds a new KPI.

Also handles adding the KPI to the default dashboard, running analytics and
validations.

If "anomaly_params" is present in data, it also performs anomaly params validation
and runs anomaly.

Arguments:
data: the KPI data. Field names are same as the KPI model.
validate: whether to perform KPI validation. (Default: True).
run_rca: whether to start RCA task after adding. (Default: True).

Returns:
A tuple of (newly added KPI, error message - empty if success, boolean
indicating whether error was critical)
"""
data["dimensions"] = [] if data.get("dimensions") is None else data["dimensions"]

data["kpi_query"] = _kpi_query_strip_trailing_semicolon(
(data.get("kpi_query", "") or "")
)

has_anomaly_setup = "anomaly_params" in data
new_anomaly_params = {}

new_kpi = Kpi(
name=data.get("name"),
is_certified=data.get("is_certified"),
data_source=data.get("data_source"),
kpi_type=data.get("dataset_type") or data.get("kpi_type"),
kpi_query=data.get("kpi_query"),
schema_name=data.get("schema_name"),
table_name=data.get("table_name"),
metric=data.get("metric"),
aggregation=data.get("aggregation"),
datetime_column=data.get("datetime_column"),
filters=data.get("filters"),
dimensions=data.get("dimensions"),
run_anomaly=data.get("run_anomaly", True),
)

if has_anomaly_setup:
from chaos_genius.views.anomaly_data_view import validate_partial_anomaly_params

# validate anomaly params
err, new_anomaly_params = validate_partial_anomaly_params(
data["anomaly_params"]
)
if err != "":
return (
new_kpi,
f"Error in validating anomaly params for KPI {data['name']}: {err}",
True,
)

if validate:
# Perform KPI Validation
status, message = validate_kpi(new_kpi.as_dict)
if status is not True:
return new_kpi, message, True

new_kpi.save(commit=True)

# Add KPI to dashboard 0 and all required dashboards
_add_kpi_to_dashboards(new_kpi.id, data.get("dashboard", []) + [0])

if has_anomaly_setup:
from chaos_genius.views.anomaly_data_view import update_anomaly_params

# update anomaly params
err, new_kpi = update_anomaly_params(
new_kpi, new_anomaly_params, check_editable=False
)

if err != "":
return (
new_kpi,
f"Error updating anomaly params for KPI {new_kpi.name}: {err}",
True
)

if run_analytics:
# we ensure analytics tasks are run as soon as analytics is configured
from chaos_genius.jobs.anomaly_tasks import queue_kpi_analytics
queue_kpi_analytics(new_kpi.id, has_anomaly_setup)

return new_kpi, "", False


def _kpi_query_strip_trailing_semicolon(query: str) -> str:
if not query:
return ""

query = query.strip()

# remove trailing semicolon
if query[-1] == ";":
query = query[:-1]
return query


def _add_kpi_to_dashboards(kpi_id: int, dashboards: List[int]):
dashboards = list(set(dashboards))
create_dashboard_kpi_mapper(dashboards, [kpi_id])


def _is_data_present_for_end_date(
Expand Down
26 changes: 26 additions & 0 deletions chaos_genius/jobs/anomaly_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,29 @@ def anomaly_scheduler():
g = group(task_group)
res = g.apply_async()
return res


def queue_kpi_analytics(kpi_id: int, run_anomaly=True):
"""Adds analytics tasks to the queue for given KPI."""
anomaly_task = None
if run_anomaly:
anomaly_task = ready_anomaly_task(kpi_id)

# run rca as soon as new KPI is added
rca_task = ready_rca_task(kpi_id)
if rca_task is None:
logger.error(
"Could not run RCA task since newly added KPI was not found: %s",
kpi_id,
)
else:
rca_task.apply_async()

if anomaly_task is None:
logger.error(
"Not running anomaly since it is not configured or KPI "
"(%d) was not found.",
kpi_id,
)
else:
anomaly_task.apply_async()
66 changes: 14 additions & 52 deletions chaos_genius/views/kpi_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import traceback # noqa: F401
from datetime import date, datetime, timedelta
from typing import cast

from flask import ( # noqa: F401
Blueprint,
Expand All @@ -24,7 +25,7 @@
from chaos_genius.databases.models.rca_data_model import RcaData
from chaos_genius.extensions import cache, db
from chaos_genius.databases.db_utils import chech_editable_field
from chaos_genius.controllers.kpi_controller import get_kpi_data_from_id
from chaos_genius.controllers.kpi_controller import add_kpi, get_kpi_data_from_id
from chaos_genius.controllers.dashboard_controller import (
create_dashboard_kpi_mapper,
get_mapper_obj_by_dashboard_ids,
Expand All @@ -46,67 +47,28 @@


@blueprint.route("", methods=["GET", "POST"])
@blueprint.route("/", methods=["GET", "POST"]) # TODO: Remove this
@blueprint.route("/", methods=["GET", "POST"]) # TODO: Remove this
def kpi():
"""kpi list view."""
"""KPI add and list view."""
# Handle logging in
if request.method == "POST":
if not request.is_json:
return jsonify({"error": "The request payload is not in JSON format"})

data = request.get_json()
data["dimensions"] = [] if data["dimensions"] is None else data["dimensions"]

if data.get("kpi_query", "").strip():
data["kpi_query"] = data["kpi_query"].strip()
# remove trailing semicolon
if data["kpi_query"][-1] == ";":
data["kpi_query"] = data["kpi_query"][:-1]

new_kpi = Kpi(
name=data.get("name"),
is_certified=data.get("is_certified"),
data_source=data.get("data_source"),
kpi_type=data.get("dataset_type"),
kpi_query=data.get("kpi_query"),
schema_name=data.get("schema_name"),
table_name=data.get("table_name"),
metric=data.get("metric"),
aggregation=data.get("aggregation"),
datetime_column=data.get("datetime_column"),
filters=data.get("filters"),
dimensions=data.get("dimensions"),
)
# Perform KPI Validation
status, message = validate_kpi(new_kpi.as_dict)
if status is not True:
return jsonify(
{"error": message, "status": "failure", "is_critical": "true"}
)

new_kpi.save(commit=True)

# Add the dashboard id 0 to the kpi
dashboard_list = data.get("dashboard", []) + [0]
dashboard_list = list(set(dashboard_list))
mapper_obj_list = create_dashboard_kpi_mapper(dashboard_list, [new_kpi.id])
if data is None:
return jsonify({"error": "The request payload is not in JSON format"})

# TODO: Fix circular import error
from chaos_genius.jobs.anomaly_tasks import ready_rca_task
kpi, err_msg, critical = add_kpi(data)

# run rca as soon as new KPI is added
rca_task = ready_rca_task(new_kpi.id)
if rca_task is None:
print(
f"Could not run RCA task since newly added KPI was not found: {new_kpi.id}"
)
else:
rca_task.apply_async()
if err_msg != "":
ret = {"error": err_msg, "status": "failure"}
if critical:
ret["is_critical"] = "true"
return jsonify(ret)

return jsonify(
{
"data": {"kpi_id": new_kpi.id},
"message": f"KPI {new_kpi.name} has been created successfully.",
"data": {"kpi_id": kpi.id},
"message": f"KPI {kpi.name} has been created successfully.",
"status": "success",
}
)
Expand Down