Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anomaly detection): Maintain historical timeseries #1670

Merged
merged 7 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/celery_app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

import seer.app # noqa: F401
from celery_app.app import celery_app as celery # noqa: F401
from seer.anomaly_detection.tasks import cleanup_disabled_alerts, cleanup_timeseries # noqa: F401
from seer.anomaly_detection.tasks import ( # noqa: F401
cleanup_disabled_alerts,
cleanup_old_timeseries_history,
cleanup_timeseries,
)
from seer.automation.autofix.tasks import check_and_mark_recent_autofix_runs
from seer.automation.tasks import delete_data_for_ttl
from seer.configuration import AppConfig
Expand Down Expand Up @@ -43,3 +47,9 @@ def setup_periodic_tasks(sender, config: AppConfig = injected, **kwargs):
cleanup_disabled_alerts.signature(kwargs={}, queue=config.CELERY_WORKER_QUEUE),
name="Clean up old disabled timeseries every week",
)

sender.add_periodic_task(
crontab(minute="0", hour="0", day_of_week="0"), # Run once a week on Sunday
cleanup_old_timeseries_history.signature(kwargs={}, queue=config.CELERY_WORKER_QUEUE),
name="Clean up old timeseries history every week",
)
37 changes: 37 additions & 0 deletions src/migrations/versions/5a7ff418f726_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Migration

Revision ID: 5a7ff418f726
Revises: 3914e6cdb818
Create Date: 2024-12-30 23:36:51.280449

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "5a7ff418f726"
down_revision = "3914e6cdb818"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"dynamic_alert_time_series_history",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious how come alert_id is biginteger but this id is integer? We will be having more rows in this table than the alert table, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch -- fixed

sa.Column("alert_id", sa.BigInteger(), nullable=False),
sa.Column("timestamp", sa.DateTime(), nullable=False),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are cleaning up history based on timestamp, do we need an index on this field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The id column is to avoid duplicate values in the rows since the other columns can be the same including the timestamped ones if they are added at the same time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was asking about the timestamp column. Since the where clause of the SQL will have timestamp, we need an index (not primary index as id will still be the primary index).

sa.Column("value", sa.Float(), nullable=False),
sa.Column("anomaly_type", sa.String(), nullable=False),
sa.Column("saved_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id", "alert_id"),
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("dynamic_alert_time_series_history")
# ### end Alembic commands ###
48 changes: 48 additions & 0 deletions src/migrations/versions/86ba1a6c0cc3_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Migration

Revision ID: 86ba1a6c0cc3
Revises: 5a7ff418f726
Create Date: 2025-01-03 19:33:47.489760

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "86ba1a6c0cc3"
down_revision = "5a7ff418f726"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("dynamic_alert_time_series_history", schema=None) as batch_op:
batch_op.alter_column(
"id",
existing_type=sa.INTEGER(),
type_=sa.BigInteger(),
existing_nullable=False,
autoincrement=True,
)
batch_op.create_index(
"ix_dynamic_alert_time_series_history_timestamp", ["timestamp"], unique=False
)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("dynamic_alert_time_series_history", schema=None) as batch_op:
batch_op.drop_index("ix_dynamic_alert_time_series_history_timestamp")
batch_op.alter_column(
"id",
existing_type=sa.BigInteger(),
type_=sa.INTEGER(),
existing_nullable=False,
autoincrement=True,
)

# ### end Alembic commands ###
41 changes: 40 additions & 1 deletion src/seer/anomaly_detection/tasks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import logging
from datetime import datetime, timedelta
from operator import and_, or_
from typing import List

import numpy as np
import sentry_sdk
from sqlalchemy import delete

from celery_app.app import celery_app
from seer.anomaly_detection.accessors import DbAlertDataAccessor
from seer.anomaly_detection.detectors.anomaly_detectors import MPBatchAnomalyDetector
from seer.anomaly_detection.models import AlgoConfig, TimeSeries
from seer.anomaly_detection.models.external import AnomalyDetectionConfig
from seer.db import DbDynamicAlert, Session, TaskStatus
from seer.db import (
DbDynamicAlert,
DbDynamicAlertTimeSeries,
DbDynamicAlertTimeSeriesHistory,
Session,
TaskStatus,
)
from seer.dependency_injection import inject, injected

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -68,12 +76,30 @@ def delete_old_timeseries_points(alert: DbDynamicAlert, date_threshold: float):
for ts in alert.timeseries:
if ts.timestamp.timestamp() < date_threshold:
to_remove.append(ts)

# Save history records before removing
save_timeseries_history(alert, to_remove)

for ts in to_remove:
alert.timeseries.remove(ts)
deleted_count += 1
return deleted_count


def save_timeseries_history(alert: DbDynamicAlert, timeseries: List[DbDynamicAlertTimeSeries]):
with Session() as session:
for ts in timeseries:
history_record = DbDynamicAlertTimeSeriesHistory(
alert_id=alert.external_alert_id,
timestamp=ts.timestamp,
anomaly_type=ts.anomaly_type,
value=ts.value,
saved_at=datetime.now(),
)
session.add(history_record)
session.commit()


@inject
def update_matrix_profiles(
alert: DbDynamicAlert,
Expand Down Expand Up @@ -163,3 +189,16 @@ def cleanup_disabled_alerts():

session.commit()
logger.info(f"Deleted {deleted_count} alerts")


@celery_app.task
@sentry_sdk.trace
def cleanup_old_timeseries_history():
date_threshold = datetime.now() - timedelta(days=90)
with Session() as session:
stmt = delete(DbDynamicAlertTimeSeriesHistory).where(
DbDynamicAlertTimeSeriesHistory.timestamp < date_threshold
)
res = session.execute(stmt)
session.commit()
logger.info(f"Deleted {res.rowcount} timeseries history records older than 90 days")
15 changes: 15 additions & 0 deletions src/seer/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,18 @@ class DbIssueSummary(Base):
created_at: Mapped[datetime.datetime] = mapped_column(
DateTime, nullable=False, default=datetime.datetime.now(datetime.UTC)
)


class DbDynamicAlertTimeSeriesHistory(Base):
__tablename__ = "dynamic_alert_time_series_history"
__table_args__ = (Index("ix_dynamic_alert_time_series_history_timestamp", "timestamp"),)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
alert_id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
timestamp: Mapped[datetime.datetime] = mapped_column(
DateTime, nullable=False, default=datetime.datetime.now(datetime.UTC)
)
value: Mapped[float] = mapped_column(Float, nullable=False)
anomaly_type: Mapped[str] = mapped_column(String, nullable=False)
saved_at: Mapped[datetime.datetime] = mapped_column(
DateTime, nullable=False, default=datetime.datetime.now(datetime.UTC)
)
47 changes: 43 additions & 4 deletions tests/seer/anomaly_detection/test_cleanup_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,18 @@
from seer.anomaly_detection.models import MPTimeSeriesAnomalies
from seer.anomaly_detection.models.external import AnomalyDetectionConfig, TimeSeriesPoint
from seer.anomaly_detection.models.timeseries import TimeSeries
from seer.anomaly_detection.tasks import cleanup_disabled_alerts, cleanup_timeseries
from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session, TaskStatus
from seer.anomaly_detection.tasks import (
cleanup_disabled_alerts,
cleanup_old_timeseries_history,
cleanup_timeseries,
)
from seer.db import (
DbDynamicAlert,
DbDynamicAlertTimeSeries,
DbDynamicAlertTimeSeriesHistory,
Session,
TaskStatus,
)


class TestCleanupTasks(unittest.TestCase):
Expand All @@ -25,11 +35,11 @@ def _save_alert(self, external_alert_id: int, num_old_points: int, num_new_point
time_period=15, sensitivity="high", direction="both", expected_seasonality="auto"
)
points_old = [
TimeSeriesPoint(timestamp=past_ts + (i * 1000), value=random.randint(1, 100))
TimeSeriesPoint(timestamp=past_ts + (i * 100), value=random.randint(1, 100))
for i in range(num_old_points)
]
points_new = [
TimeSeriesPoint(timestamp=cur_ts + (i * 1000), value=random.randint(1, 100))
TimeSeriesPoint(timestamp=cur_ts + (i * 100), value=random.randint(1, 100))
for i in range(num_new_points)
]
points = [*points_old, *points_new]
Expand Down Expand Up @@ -236,3 +246,32 @@ def test_cleanup_disabled_alerts(self):

assert alert is not None
assert len(alert.timeseries) == 500

def test_cleanup_old_timeseries_history(self):
# Create and save alert with 1000 points (all old)
external_alert_id, config, points, anomalies = self._save_alert(0, 1000, 0)
date_threshold = (datetime.now() - timedelta(days=28)).timestamp()
cleanup_timeseries(external_alert_id, date_threshold)

# Confirm the historical table is populated with 1000 points

with Session() as session:
history = (
session.query(DbDynamicAlertTimeSeriesHistory)
.filter(DbDynamicAlertTimeSeriesHistory.alert_id == external_alert_id)
.all()
)
assert len(history) == 1000

# Timeseries History should be deleted as these points have been added over 90 days ago

with Session() as session:

cleanup_old_timeseries_history()

history = (
session.query(DbDynamicAlertTimeSeriesHistory)
.filter(DbDynamicAlertTimeSeriesHistory.alert_id == external_alert_id)
.all()
)
assert len(history) == 0
3 changes: 3 additions & 0 deletions tests/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def test_detected_celery_jobs():
assert set(k for k in celery_app.tasks.keys() if not k.startswith("celery.")) == set(
[
"seer.anomaly_detection.tasks.cleanup_timeseries",
"seer.anomaly_detection.tasks.cleanup_old_timeseries_history",
"seer.anomaly_detection.tasks.cleanup_disabled_alerts",
"seer.automation.autofix.steps.change_describer_step.autofix_change_describer_task",
"seer.automation.autofix.steps.coding_step.autofix_coding_task",
Expand All @@ -32,6 +33,7 @@ def test_detected_celery_jobs():
"Check and mark recent autofix runs every hour",
"Delete old Automation runs for 90 day time-to-live",
"Clean up old disabled timeseries every week",
"Clean up old timeseries history every week",
]
)

Expand All @@ -48,6 +50,7 @@ def test_anomaly_beat_jobs():
assert set(k for k in app.conf.beat_schedule.keys()) == set(
[
"Clean up old disabled timeseries every week",
"Clean up old timeseries history every week",
]
)

Expand Down
Loading