Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add service label for alerting integrations #5373

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
102 changes: 59 additions & 43 deletions engine/apps/api/serializers/alert_receive_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
from apps.alerts.models import AlertReceiveChannel
from apps.base.messaging import get_messaging_backends
from apps.integrations.legacy_prefix import has_legacy_prefix
from apps.labels.models import AlertReceiveChannelAssociatedLabel, LabelKeyCache, LabelValueCache
from apps.labels.models import LabelKeyCache, LabelValueCache
from apps.labels.types import LabelKey
from apps.labels.utils import get_service_label_custom
from apps.user_management.models import Organization
from common.api_helpers.custom_fields import TeamPrimaryKeyRelatedField
from common.api_helpers.exceptions import BadRequest
Expand Down Expand Up @@ -120,25 +121,48 @@ def pop_alert_group_labels(validated_data: dict) -> IntegrationAlertGroupLabels
}

@classmethod
def update(
cls, instance: AlertReceiveChannel, alert_group_labels: IntegrationAlertGroupLabels | None
) -> AlertReceiveChannel:
if alert_group_labels is None:
return instance

# update DB cache for custom labels
cls._create_custom_labels(instance.organization, alert_group_labels["custom"])
# save static labels as integration labels
# todo: it's needed to cover delay between backend and frontend rollout, and can be removed later
cls._save_static_labels_as_integration_labels(instance, alert_group_labels["custom"])
# update custom labels
instance.alert_group_labels_custom = cls._custom_labels_to_internal_value(alert_group_labels["custom"])
def update_validated_data_and_label_cache(
cls,
integration: str,
organization: "Organization",
validated_data: dict,
alert_group_labels: IntegrationAlertGroupLabels | None,
new_integration: bool,
) -> dict:
"""
Update validated data with alert group custom labels and labels template.
Update label cache.
Add `service_name` dynamic label for new Grafana Alerting integration.
"""

# update template
instance.alert_group_labels_template = alert_group_labels["template"]
is_new_alerting_integration = (
new_integration and integration == AlertReceiveChannel.INTEGRATION_GRAFANA_ALERTING
)
if not organization.is_grafana_labels_enabled or (
alert_group_labels is None and not is_new_alerting_integration
):
return validated_data

instance.save(update_fields=["alert_group_labels_custom", "alert_group_labels_template"])
return instance
custom_labels = cls._custom_labels_to_internal_value(alert_group_labels["custom"]) if alert_group_labels else []
# update DB cache for custom labels
cls._create_custom_labels(organization, custom_labels)

# if it's a new alerting integration, add service label to custom labels if it's not there
if is_new_alerting_integration:
service_label_custom = get_service_label_custom(organization)
if service_label_custom:
is_service_label_added = False
# check if service_name label has already been added to custom labels by user
for key in set(label[0] for label in custom_labels):
if key == service_label_custom[0]:
is_service_label_added = True
break
if not is_service_label_added:
custom_labels.append(service_label_custom)

validated_data["alert_group_labels_custom"] = custom_labels or None
validated_data["alert_group_labels_template"] = alert_group_labels["template"] if alert_group_labels else None
return validated_data

@staticmethod
def _create_custom_labels(organization: Organization, labels: AlertGroupCustomLabelsAPI) -> None:
Expand Down Expand Up @@ -168,25 +192,6 @@ def _create_custom_labels(organization: Organization, labels: AlertGroupCustomLa
LabelKeyCache.objects.bulk_create(label_keys, ignore_conflicts=True, batch_size=5000)
LabelValueCache.objects.bulk_create(label_values, ignore_conflicts=True, batch_size=5000)

@staticmethod
def _save_static_labels_as_integration_labels(instance: AlertReceiveChannel, labels: AlertGroupCustomLabelsAPI):
labels_associations_to_create = []
labels_copy = labels[:]
for label in labels_copy:
if label["value"]["id"] is not None:
labels_associations_to_create.append(
AlertReceiveChannelAssociatedLabel(
key_id=label["key"]["id"],
value_id=label["value"]["id"],
organization=instance.organization,
alert_receive_channel=instance,
)
)
labels.remove(label)
AlertReceiveChannelAssociatedLabel.objects.bulk_create(
labels_associations_to_create, ignore_conflicts=True, batch_size=5000
)

@classmethod
def to_representation(cls, instance: AlertReceiveChannel) -> IntegrationAlertGroupLabels:
"""
Expand Down Expand Up @@ -414,6 +419,14 @@ def create(self, validated_data):
# pop associated labels and alert group labels, so they are not passed to AlertReceiveChannel.create
labels = validated_data.pop("labels", None)
alert_group_labels = IntegrationAlertGroupLabelsSerializer.pop_alert_group_labels(validated_data)
# update validated data with alert group labels and update label cache if needed
validated_data = IntegrationAlertGroupLabelsSerializer.update_validated_data_and_label_cache(
integration=integration,
organization=organization,
validated_data=validated_data,
alert_group_labels=alert_group_labels,
new_integration=True,
)

try:
instance = AlertReceiveChannel.create(
Expand All @@ -425,9 +438,8 @@ def create(self, validated_data):
except AlertReceiveChannel.DuplicateDirectPagingError:
raise BadRequest(detail=AlertReceiveChannel.DuplicateDirectPagingError.DETAIL)

# Create label associations first, then update alert group labels
# Create label associations
self.update_labels_association_if_needed(labels, instance, organization)
instance = IntegrationAlertGroupLabelsSerializer.update(instance, alert_group_labels)

# Create default webhooks if needed
if create_default_webhooks and hasattr(instance.config, "create_default_webhooks"):
Expand All @@ -439,10 +451,14 @@ def update(self, instance, validated_data):
# update associated labels
labels = validated_data.pop("labels", None)
self.update_labels_association_if_needed(labels, instance, self.context["request"].auth.organization)

# update alert group labels
instance = IntegrationAlertGroupLabelsSerializer.update(
instance, IntegrationAlertGroupLabelsSerializer.pop_alert_group_labels(validated_data)
alert_group_labels = IntegrationAlertGroupLabelsSerializer.pop_alert_group_labels(validated_data)
# update validated data with alert group labels and update label cache if needed
validated_data = IntegrationAlertGroupLabelsSerializer.update_validated_data_and_label_cache(
integration=instance.integration,
organization=instance.organization,
validated_data=validated_data,
alert_group_labels=alert_group_labels,
new_integration=False,
)

try:
Expand Down
5 changes: 5 additions & 0 deletions engine/apps/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@
LabelsViewSet.as_view({"get": "get_key", "put": "rename_key"}),
name="get_update_key",
),
re_path(
r"^labels/name/(?P<key_name>[\w\-]+)/?$",
LabelsViewSet.as_view({"get": "get_key_by_name"}),
name="get_key_by_name",
),
re_path(
r"^labels/id/(?P<key_id>[\w\-]+)/values/?$", LabelsViewSet.as_view({"post": "add_value"}), name="add_value"
),
Expand Down
15 changes: 14 additions & 1 deletion engine/apps/api/views/labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from apps.auth_token.auth import PluginAuthentication
from apps.labels.client import LabelsAPIClient, LabelsRepoAPIException
from apps.labels.tasks import update_instances_labels_cache, update_label_option_cache
from apps.labels.types import LabelOption
from apps.labels.utils import is_labels_feature_enabled
from common.api_helpers.exceptions import BadRequest

Expand Down Expand Up @@ -66,6 +67,18 @@ def get_key(self, request, key_id):
self._update_labels_cache(label_option)
return Response(label_option, status=response.status_code)

@extend_schema(responses=LabelOptionSerializer)
def get_key_by_name(self, request, key_name):
"""
get_key_by_name returns LabelOption – key with the list of values
"""
organization = self.request.auth.organization
label_option, response = LabelsAPIClient(
organization.grafana_url,
organization.api_token,
).get_label_by_key_name(key_name)
return Response(label_option, status=response.status_code)

@extend_schema(responses=LabelValueSerializer)
def get_value(self, request, key_id, value_id):
"""get_value returns a Value"""
Expand Down Expand Up @@ -133,7 +146,7 @@ def rename_value(self, request, key_id, value_id):
self._update_labels_cache(label_option)
return Response(label_option, status=status)

def _update_labels_cache(self, label_option):
def _update_labels_cache(self, label_option: LabelOption):
if not label_option:
return
serializer = LabelOptionSerializer(data=label_option)
Expand Down
9 changes: 9 additions & 0 deletions engine/apps/labels/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ def get_label_by_key_id(
self._check_response(response)
return response.json(), response

def get_label_by_key_name(
self, key_name: str
) -> typing.Tuple[typing.Optional["LabelOption"], requests.models.Response]:
url = urljoin(self.api_url, f"name/{key_name}")

response = requests.get(url, timeout=TIMEOUT, headers=self._request_headers)
self._check_response(response)
return response.json(), response

def get_value(
self, key_id: str, value_id: str
) -> typing.Tuple[typing.Optional["LabelValue"], requests.models.Response]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# TODO: MOVE IT TO /migrations DIRECTORY IN FUTURE RELEASE

# Generated by Django 4.2.15 on 2024-11-26 13:37

from django.db import migrations
Expand Down
18 changes: 18 additions & 0 deletions engine/apps/labels/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.db import models
from django.utils import timezone

from apps.labels.client import LabelsAPIClient
from apps.labels.tasks import update_label_pairs_cache
from apps.labels.types import LabelPair
from apps.labels.utils import LABEL_OUTDATED_TIMEOUT_MINUTES
Expand All @@ -26,6 +27,23 @@ class LabelKeyCache(models.Model):
def is_outdated(self) -> bool:
return timezone.now() - self.last_synced > timezone.timedelta(minutes=LABEL_OUTDATED_TIMEOUT_MINUTES)

@classmethod
def get_or_create_by_name(cls, organization: "Organization", key_name: str) -> typing.Optional["LabelKeyCache"]:
label_key = cls.objects.filter(organization=organization, name=key_name).first()
if label_key:
return label_key
label, _ = LabelsAPIClient(organization.grafana_url, organization.api_token).get_label_by_key_name(label_key)
if not label:
return None
label_key = LabelKeyCache(
id=label["key"]["id"],
name=label["key"]["name"],
organization=organization,
prescribed=label["key"]["prescribed"],
).save()

return label_key


class LabelValueCache(models.Model):
id = models.CharField(primary_key=True, editable=False, max_length=36)
Expand Down
61 changes: 57 additions & 4 deletions engine/apps/labels/tasks.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove update_labels_cache task, it seems it's deprecated

Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

from apps.labels.client import LabelsAPIClient, LabelsRepoAPIException
from apps.labels.types import LabelOption, LabelPair
from apps.labels.utils import LABEL_OUTDATED_TIMEOUT_MINUTES, get_associating_label_model
from apps.labels.utils import LABEL_OUTDATED_TIMEOUT_MINUTES, get_associating_label_model, get_service_label_custom
from apps.user_management.models import Organization
from common.custom_celery_tasks import shared_dedicated_queue_retry_task

logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)

MAX_RETRIES = 1 if settings.DEBUG else 10


class KVPair(typing.TypedDict):
value_name: str
Expand Down Expand Up @@ -129,9 +131,7 @@ def _update_labels_cache(values_id_to_pair: typing.Dict[str, LabelPair]):
LabelValueCache.objects.bulk_update(values, fields=["name", "last_synced", "prescribed"])


@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10
)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def update_instances_labels_cache(organization_id: int, instance_ids: typing.List[int], instance_model_name: str):
from apps.labels.models import LabelValueCache

Expand Down Expand Up @@ -162,3 +162,56 @@ def update_instances_labels_cache(organization_id: int, instance_ids: typing.Lis
continue
if label_option:
update_label_option_cache.apply_async((label_option,))


@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def add_service_label_for_alerting_integrations():
"""Starts tasks that add `service_name` dynamic label to custom labels for alerting integrations"""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add comment that it's intended for 1-time run. Also, let's stop use term custom labels


from apps.alerts.models import AlertReceiveChannel

organization_ids = (
AlertReceiveChannel.objects.filter(
integration=AlertReceiveChannel.INTEGRATION_GRAFANA_ALERTING,
organization__is_grafana_labels_enabled=True,
organization__deleted_at__isnull=True,
)
.values_list("organization", flat=True)
.distinct()
)

for idx, organization_id in enumerate(organization_ids):
countdown = idx // 10
add_service_label_per_org.apply_async((organization_id,), countdown=countdown)


@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def add_service_label_per_org(organization_id: int):
"""Add `service_name` dynamic label to custom labels for alerting integrations"""

from apps.alerts.models import AlertReceiveChannel
from apps.user_management.models import Organization

organization = Organization.objects.get(id=organization_id)
service_label_custom = get_service_label_custom(organization)
if not service_label_custom:
return
integrations = AlertReceiveChannel.objects.filter(
integration=AlertReceiveChannel.INTEGRATION_GRAFANA_ALERTING,
organization=organization,
)
integrations_to_update = []
# add service label to integration custom labels if it's not already there
for integration in integrations:
custom_service_label_exists = False
custom_labels = integration.alert_group_labels_custom if integration.alert_group_labels_custom else []
for label in custom_labels:
if label[0] == service_label_custom[0]:
custom_service_label_exists = True
break
if custom_service_label_exists:
continue
integration.alert_group_labels_custom = custom_labels + [service_label_custom]
integrations_to_update.append(integration)

AlertReceiveChannel.objects.bulk_update(integrations_to_update, fields=["alert_group_labels_custom"])
14 changes: 14 additions & 0 deletions engine/apps/labels/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from django.apps import apps # noqa: I251
from django.conf import settings

from apps.metrics_exporter.constants import SERVICE_LABEL

if typing.TYPE_CHECKING:
from apps.alerts.models import AlertGroup
from apps.labels.models import AssociatedLabel
Expand All @@ -14,6 +16,7 @@

LABEL_OUTDATED_TIMEOUT_MINUTES = 30
ASSOCIATED_MODEL_NAME = "AssociatedLabel"
SERVICE_LABEL_TEMPLATE_FOR_ALERTING_INTEGRATION = "{{ payload.common_labels.service_name }}"


def get_associating_label_model(obj_model_name: str) -> typing.Type["AssociatedLabel"]:
Expand Down Expand Up @@ -43,3 +46,14 @@ def get_alert_group_labels_dict(alert_group: "AlertGroup") -> dict[str, str]:
It's different from get_labels_dict, because AlertGroupAssociated labels store key/value_name, not key/value_id
"""
return {label.key_name: label.value_name for label in alert_group.labels.all()}


def get_service_label_custom(organization: "Organization") -> list[str, None, str] | None:
"""
Returns `service_name` label template in custom label format: [key_id, None, template]
(see AlertReceiveChannel.alert_group_labels_custom).
"""
from apps.labels.models import LabelKeyCache

service_label_key = LabelKeyCache.get_or_create_by_name(organization, SERVICE_LABEL)
return [service_label_key.id, None, SERVICE_LABEL_TEMPLATE_FOR_ALERTING_INTEGRATION] if service_label_key else None
11 changes: 11 additions & 0 deletions engine/apps/public_api/serializers/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from apps.alerts.models import AlertReceiveChannel
from apps.base.messaging import get_messaging_backends
from apps.integrations.legacy_prefix import has_legacy_prefix, remove_legacy_prefix
from apps.labels.utils import get_service_label_custom
from apps.user_management.models import Organization
from common.api_helpers.custom_fields import TeamPrimaryKeyRelatedField
from common.api_helpers.exceptions import BadRequest
from common.api_helpers.mixins import PHONE_CALL, SLACK, SMS, TELEGRAM, WEB, EagerLoadingMixin
Expand Down Expand Up @@ -123,6 +125,7 @@ def create(self, validated_data):
connection_error = GrafanaAlertingSyncManager.check_for_connection_errors(organization)
if connection_error:
raise serializers.ValidationError(connection_error)
validated_data = self._add_service_label_if_needed(organization, validated_data)
user = self.context["request"].user
with transaction.atomic():
try:
Expand Down Expand Up @@ -385,6 +388,14 @@ def _get_default_route_iterative(self, obj):
if filter.is_default:
return filter

def _add_service_label_if_needed(self, organization: "Organization", validated_data: dict) -> dict:
if not organization.is_grafana_labels_enabled:
return validated_data
service_label_custom = get_service_label_custom(organization)
if service_label_custom:
validated_data["alert_group_labels_custom"] = [service_label_custom]
return validated_data


class IntegrationUpdateSerializer(IntegrationSerializer):
type = IntegrationTypeField(source="integration", read_only=True)
Expand Down
Loading
Loading