diff --git a/rules/fixtures/rules_topological_sort_needed.json b/rules/fixtures/rules_topological_sort_needed.json new file mode 100644 index 000000000..7d3f08590 --- /dev/null +++ b/rules/fixtures/rules_topological_sort_needed.json @@ -0,0 +1,246 @@ +{ + "groups": [ + { + "name": "test-group", + "rules": [ + { + "record": "pf:nginx_http_requests:rate5m", + "expr": "sum by (lp_service, k8scluster) (rate(nginx_http_requests_total{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\"}[5m]))" + }, + { + "record": "pf:nginx_http_requests:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:nginx_http_requests:rate5m[1w])" + }, + { + "record": "pf:nginx_http_requests:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:nginx_http_requests:rate5m[1w])" + }, + { + "record": "pf:nginx_http_requests:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:nginx_http_requests:rate5m[4h] offset 6d22h) + pf:nginx_http_requests:rate5m:avg_over_time_1w - pf:nginx_http_requests:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_requests:rate5m[4h] offset 13d22h) + pf:nginx_http_requests:rate5m:avg_over_time_1w - pf:nginx_http_requests:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_requests:rate5m[4h] offset 20d22h) + pf:nginx_http_requests:rate5m:avg_over_time_1w - pf:nginx_http_requests:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:nginx_response_time:avg_over_time_5m", + "expr": "sum by (lp_service, k8scluster) (rate(nginx_http_request_duration_seconds_sum{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\"}[5m])) / sum by (lp_service, k8scluster) (rate(nginx_http_request_duration_seconds_count{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\"}[5m]))" + }, + { + "record": "pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w", + "expr": "avg_over_time(pf:nginx_response_time:avg_over_time_5m[1w])" + }, + { + "record": "pf:nginx_response_time:avg_over_time_5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:nginx_response_time:avg_over_time_5m[1w])" + }, + { + "record": "pf:nginx_response_time:avg_over_time_5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:nginx_response_time:avg_over_time_5m[4h] offset 6d22h) + pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w - pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_response_time:avg_over_time_5m[4h] offset 13d22h) + pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w - pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_response_time:avg_over_time_5m[4h] offset 20d22h) + pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w - pf:nginx_response_time:avg_over_time_5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:nginx_http_4xx_responses:rate5m", + "expr": "sum by (lp_service, k8scluster) (rate(nginx_http_requests_total{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\",status=~\"4.*\"}[5m]))" + }, + { + "record": "pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:nginx_http_4xx_responses:rate5m[1w])" + }, + { + "record": "pf:nginx_http_4xx_responses:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:nginx_http_4xx_responses:rate5m[1w])" + }, + { + "record": "pf:nginx_http_4xx_responses:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:nginx_http_4xx_responses:rate5m[4h] offset 6d22h) + pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_4xx_responses:rate5m[4h] offset 13d22h) + pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_4xx_responses:rate5m[4h] offset 20d22h) + pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_4xx_responses:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:nginx_http_5xx_responses:rate5m", + "expr": "sum by (lp_service, k8scluster) (rate(nginx_http_requests_total{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt)-web\",status=~\"5.*\"}[5m]))" + }, + { + "record": "pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:nginx_http_5xx_responses:rate5m[1w])" + }, + { + "record": "pf:nginx_http_5xx_responses:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:nginx_http_5xx_responses:rate5m[1w])" + }, + { + "record": "pf:nginx_http_5xx_responses:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:nginx_http_5xx_responses:rate5m[4h] offset 6d22h) + pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_5xx_responses:rate5m[4h] offset 13d22h) + pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:nginx_http_5xx_responses:rate5m[4h] offset 20d22h) + pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w - pf:nginx_http_5xx_responses:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_http_requests:rate5m", + "expr": "sum by (application, k8scluster) (rate(http_server_requests_seconds_count{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\"}[5m]))" + }, + { + "record": "pf:app_http_requests:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_http_requests:rate5m[1w])" + }, + { + "record": "pf:app_http_requests:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_http_requests:rate5m[1w])" + }, + { + "record": "pf:app_http_requests:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_http_requests:rate5m[4h] offset 6d22h) + pf:app_http_requests:rate5m:avg_over_time_1w - pf:app_http_requests:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_requests:rate5m[4h] offset 13d22h) + pf:app_http_requests:rate5m:avg_over_time_1w - pf:app_http_requests:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_requests:rate5m[4h] offset 20d22h) + pf:app_http_requests:rate5m:avg_over_time_1w - pf:app_http_requests:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_response_time:avg_over_time_5m", + "expr": "sum by (application, k8scluster) (rate(http_server_requests_seconds_sum{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\"}[5m])) / sum by (application, k8scluster) (rate(http_server_requests_seconds_count{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\"}[5m]))" + }, + { + "record": "pf:app_response_time:avg_over_time_5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_response_time:avg_over_time_5m[1w])" + }, + { + "record": "pf:app_response_time:avg_over_time_5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_response_time:avg_over_time_5m[1w])" + }, + { + "record": "pf:app_response_time:avg_over_time_5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_response_time:avg_over_time_5m[4h] offset 6d22h) + pf:app_response_time:avg_over_time_5m:avg_over_time_1w - pf:app_response_time:avg_over_time_5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_response_time:avg_over_time_5m[4h] offset 13d22h) + pf:app_response_time:avg_over_time_5m:avg_over_time_1w - pf:app_response_time:avg_over_time_5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_response_time:avg_over_time_5m[4h] offset 20d22h) + pf:app_response_time:avg_over_time_5m:avg_over_time_1w - pf:app_response_time:avg_over_time_5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_http_4xx_responses:rate5m", + "expr": "sum by (application, k8scluster) (rate(http_server_requests_seconds_count{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\",status=~\"4.*\"}[5m]))" + }, + { + "record": "pf:app_http_4xx_responses:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_http_4xx_responses:rate5m[1w])" + }, + { + "record": "pf:app_http_4xx_responses:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_http_4xx_responses:rate5m[1w])" + }, + { + "record": "pf:app_http_4xx_responses:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_http_4xx_responses:rate5m[4h] offset 6d22h) + pf:app_http_4xx_responses:rate5m:avg_over_time_1w - pf:app_http_4xx_responses:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_4xx_responses:rate5m[4h] offset 13d22h) + pf:app_http_4xx_responses:rate5m:avg_over_time_1w - pf:app_http_4xx_responses:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_4xx_responses:rate5m[4h] offset 20d22h) + pf:app_http_4xx_responses:rate5m:avg_over_time_1w - pf:app_http_4xx_responses:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_http_5xx_responses:rate5m", + "expr": "sum by (application, k8scluster) (rate(http_server_requests_seconds_count{application=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\",k8scluster=\"sy-kube01\",status=~\"5.*\"}[5m]))" + }, + { + "record": "pf:app_http_5xx_responses:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_http_5xx_responses:rate5m[1w])" + }, + { + "record": "pf:app_http_5xx_responses:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_http_5xx_responses:rate5m[1w])" + }, + { + "record": "pf:app_http_5xx_responses:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_http_5xx_responses:rate5m[4h] offset 6d22h) + pf:app_http_5xx_responses:rate5m:avg_over_time_1w - pf:app_http_5xx_responses:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_5xx_responses:rate5m[4h] offset 13d22h) + pf:app_http_5xx_responses:rate5m:avg_over_time_1w - pf:app_http_5xx_responses:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_http_5xx_responses:rate5m[4h] offset 20d22h) + pf:app_http_5xx_responses:rate5m:avg_over_time_1w - pf:app_http_5xx_responses:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf:app_log_events:rate5m", + "expr": "sum by (lp_service, level, k8scluster) (rate(log4j2_events_total{k8scluster=\"sy-kube01\",level=~\"error|warn\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\"}[5m]))" + }, + { + "record": "pf:app_log_events:rate5m:avg_over_time_1w", + "expr": "avg_over_time(pf:app_log_events:rate5m[1w])" + }, + { + "record": "pf:app_log_events:rate5m:stddev_over_time_1w", + "expr": "stddev_over_time(pf:app_log_events:rate5m[1w])" + }, + { + "record": "pf:app_log_events:rate5m_prediction", + "expr": "clamp_min(quantile without (offset) (0.5, label_replace(avg_over_time(pf:app_log_events:rate5m[4h] offset 6d22h) + pf:app_log_events:rate5m:avg_over_time_1w - pf:app_log_events:rate5m:avg_over_time_1w offset 1w, \"offset\", \"1w\", \"\", \"\") or label_replace(avg_over_time(pf:app_log_events:rate5m[4h] offset 13d22h) + pf:app_log_events:rate5m:avg_over_time_1w - pf:app_log_events:rate5m:avg_over_time_1w offset 2w, \"offset\", \"2w\", \"\", \"\") or label_replace(avg_over_time(pf:app_log_events:rate5m[4h] offset 20d22h) + pf:app_log_events:rate5m:avg_over_time_1w - pf:app_log_events:rate5m:avg_over_time_1w offset 3w, \"offset\", \"3w\", \"\", \"\")), 0)" + }, + { + "record": "pf_pods_restart_too_much", + "expr": "rate(kube_pod_container_status_restarts_total{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection)-(web|app)\"}[5m]) > 0" + }, + { + "record": "pf_pods_are_unhealthy", + "expr": "health{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection)-(web|app)\"} > 0" + }, + { + "record": "pf_pod_dependencies_are_unhealthy", + "expr": "health_dependency{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection)-(web|app)\"} > 0" + }, + { + "record": "pf_nginx_request_rate_is_too_low", + "expr": "pf:nginx_http_requests:rate5m == 0" + }, + { + "record": "pf_app_request_rate_is_too_low", + "expr": "pf:app_http_requests:rate5m{application!~\"(lp-encryptionmgmt-app|lp-rtbf-app)\"} == 0" + }, + { + "record": "pf_nginx_request_rate_is_too_high", + "expr": "pf:nginx_http_requests:rate5m > 10000" + }, + { + "record": "pf_app_request_rate_is_too_high", + "expr": "pf:app_http_requests:rate5m > 10000" + }, + { + "record": "pf_nginx_request_rate_is_outside_normal_range", + "expr": "abs((pf:nginx_http_requests:rate5m - pf:nginx_http_requests:rate5m_prediction) / pf:nginx_http_requests:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_request_rate_is_outside_normal_range", + "expr": "abs((pf:app_http_requests:rate5m - pf:app_http_requests:rate5m_prediction) / pf:app_http_requests:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_nginx_response_time_is_too_high", + "expr": "pf:nginx_response_time:avg_over_time_5m > 0.5" + }, + { + "record": "pf_app_response_time_is_too_high", + "expr": "pf:app_response_time:avg_over_time_5m > 0.5" + }, + { + "record": "pf_nginx_response_time_is_outside_normal_range", + "expr": "abs((pf:nginx_response_time:avg_over_time_5m - pf:nginx_response_time:avg_over_time_5m_prediction) / pf:nginx_response_time:avg_over_time_5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_response_time_is_outside_normal_range", + "expr": "abs((pf:app_response_time:avg_over_time_5m - pf:app_response_time:avg_over_time_5m_prediction) / pf:app_response_time:avg_over_time_5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_nginx_4xx_rate_is_outside_normal_range", + "expr": "abs((pf:nginx_http_4xx_responses:rate5m - pf:nginx_http_4xx_responses:rate5m_prediction) / pf:nginx_http_4xx_responses:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_4xx_rate_is_outside_normal_range", + "expr": "abs((pf:app_http_4xx_responses:rate5m - pf:app_http_4xx_responses:rate5m_prediction) / pf:app_http_4xx_responses:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_nginx_4xx_ratio_exceeds_20%", + "expr": "pf:nginx_http_4xx_responses:rate5m / pf:nginx_http_requests:rate5m > 20" + }, + { + "record": "pf_app_4xx_ratio_exceeds_20%", + "expr": "pf:app_http_4xx_responses:rate5m / pf:app_http_requests:rate5m > 20" + }, + { + "record": "pf_nginx_5xx_rate_is_outside_normal_range", + "expr": "abs((pf:nginx_http_5xx_responses:rate5m - pf:nginx_http_5xx_responses:rate5m_prediction) / pf:nginx_http_5xx_responses:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_5xx_rate_is_outside_normal_range", + "expr": "abs((pf:app_http_5xx_responses:rate5m - pf:app_http_5xx_responses:rate5m_prediction) / pf:app_http_5xx_responses:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_nginx_5xx_ratio_exceeds_20%", + "expr": "pf:nginx_http_5xx_responses:rate5m / pf:nginx_http_requests:rate5m > 20" + }, + { + "record": "pf_app_5xx_ratio_exceeds_20%", + "expr": "pf:app_http_5xx_responses:rate5m / pf:app_http_requests:rate5m > 20" + }, + { + "record": "pf_log_rate_is_outside_normal_range", + "expr": "abs((pf:app_log_events:rate5m - pf:app_log_events:rate5m_prediction) / pf:app_log_events:rate5m:stddev_over_time_1w) > 2" + }, + { + "record": "pf_app_heap_usage_too_high", + "expr": "100 * (avg by (k8scluster, lp_service, kubernetes_pod_name) (container_memory_working_set_bytes{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\"}) / avg by (k8scluster, lp_service, kubernetes_pod_name) (container_spec_memory_limit_bytes{k8scluster=\"sy-kube01\",lp_service=~\"lp-(csds|mtls|rtbf|encryptionmgmt|acdefaults|acprovision|acsitesetting|acdomainprotection|rollover|providersubscription|providersubscriptionv2)-app\"})) > 90" + } + ], + "interval": "60s" + } + ] +} diff --git a/rules/manager.go b/rules/manager.go index 703b60868..b7ef47a8a 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -147,9 +147,13 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } + if o.Logger == nil { + o.Logger = promslog.NewNopLogger() + } + if o.RuleConcurrencyController == nil { if o.ConcurrentEvalsEnabled { - o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals) + o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals, o.Logger) } else { o.RuleConcurrencyController = sequentialRuleEvalController{} } @@ -159,10 +163,6 @@ func NewManager(o *ManagerOptions) *Manager { o.RuleDependencyController = ruleDependencyController{} } - if o.Logger == nil { - o.Logger = promslog.NewNopLogger() - } - m := &Manager{ groups: map[string]*Group{}, opts: o, @@ -516,12 +516,14 @@ type RuleConcurrencyController interface { // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { - sema *semaphore.Weighted + sema *semaphore.Weighted + logger *slog.Logger } -func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController { +func newRuleConcurrencyController(maxConcurrency int64, logger *slog.Logger) RuleConcurrencyController { return &concurrentRuleEvalController{ - sema: semaphore.NewWeighted(maxConcurrency), + sema: semaphore.NewWeighted(maxConcurrency), + logger: logger, } } @@ -530,36 +532,65 @@ func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule R } func (c *concurrentRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules { - // Using the rule dependency controller information (rules being identified as having no dependencies or no dependants), - // we can safely run the following concurrent groups: - // 1. Concurrently, all rules that have no dependencies - // 2. Sequentially, all rules that have both dependencies and dependants - // 3. Concurrently, all rules that have no dependants - - var noDependencies []int - var dependenciesAndDependants []int - var noDependants []int + sequentialController := sequentialRuleEvalController{} + type ruleInfo struct { + ruleIdx int + unevaluatedDependencies map[Rule]struct{} + } + remainingRules := make(map[Rule]ruleInfo) + firstBatch := ConcurrentRules{} for i, r := range g.rules { - switch { - case r.NoDependencyRules(): - noDependencies = append(noDependencies, i) - case !r.NoDependentRules() && !r.NoDependencyRules(): - dependenciesAndDependants = append(dependenciesAndDependants, i) - case r.NoDependentRules(): - noDependants = append(noDependants, i) + if r.NoDependencyRules() { + firstBatch = append(firstBatch, i) + continue } + // Initialize the rule info with the rule's dependencies. + // Use a copy of the dependencies to avoid mutating the rule. + info := ruleInfo{ruleIdx: i, unevaluatedDependencies: map[Rule]struct{}{}} + for _, dep := range r.DependencyRules() { + info.unevaluatedDependencies[dep] = struct{}{} + } + remainingRules[r] = info } - - var order []ConcurrentRules - if len(noDependencies) > 0 { - order = append(order, noDependencies) - } - for _, r := range dependenciesAndDependants { - order = append(order, []int{r}) + if len(firstBatch) == 0 { + // There are no rules without dependencies. + // Fall back to sequential evaluation. + c.logger.With("group", g.Name()).Info("No rules without dependencies found, falling back to sequential rule evaluation. This may be due to indeterminate rule dependencies.") + return sequentialController.SplitGroupIntoBatches(context.Background(), g) } - if len(noDependants) > 0 { - order = append(order, noDependants) + order := []ConcurrentRules{firstBatch} + + // Build the order of rules to evaluate based on dependencies. + for len(remainingRules) > 0 { + previousBatch := order[len(order)-1] + // Remove the batch's rules from the dependencies of its dependents. + for _, idx := range previousBatch { + rule := g.rules[idx] + for _, dependent := range rule.DependentRules() { + dependentInfo := remainingRules[dependent] + delete(dependentInfo.unevaluatedDependencies, rule) + } + } + + var batch ConcurrentRules + // Find rules that have no remaining dependencies. + for name, info := range remainingRules { + if len(info.unevaluatedDependencies) == 0 { + batch = append(batch, info.ruleIdx) + delete(remainingRules, name) + } + } + + if len(batch) == 0 { + // There is a cycle in the rules' dependencies. + // We can't evaluate them concurrently. + // Fall back to sequential evaluation. + c.logger.With("group", g.Name()).Warn("Cyclic rule dependencies detected, falling back to sequential rule evaluation") + return sequentialController.SplitGroupIntoBatches(context.Background(), g) + } + + order = append(order, batch) } return order diff --git a/rules/manager_test.go b/rules/manager_test.go index 843d67497..545610f1f 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -2226,7 +2226,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ + requireConcurrentRulesEqual(t, []ConcurrentRules{ {0}, {1}, {2}, @@ -2313,7 +2313,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Expected evaluation order (isn't affected by concurrency settings) order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ + requireConcurrentRulesEqual(t, []ConcurrentRules{ {0, 1, 2, 3, 4, 5}, }, order) @@ -2358,7 +2358,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ + requireConcurrentRulesEqual(t, []ConcurrentRules{ {0, 1, 2, 3, 4, 5}, }, order) @@ -2443,7 +2443,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ + requireConcurrentRulesEqual(t, []ConcurrentRules{ {0, 4}, {1, 2, 3, 5, 6, 7}, }, order) @@ -2489,16 +2489,63 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ + requireConcurrentRulesEqual(t, []ConcurrentRules{ {0, 1}, {2}, - {3}, - {4, 5, 6}, + {3, 4}, + {5, 6}, }, order) group.Eval(ctx, start) - require.EqualValues(t, 3, maxInflight.Load()) + require.EqualValues(t, 2, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + }) + + t.Run("attempted asynchronous evaluation of highly parallelizable group", func(t *testing.T) { + t.Parallel() + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 59 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_topological_sort_needed.json"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + var group *Group + for _, g := range groups { + group = g + } + + start := time.Now() + + // Expected evaluation order + order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) + requireConcurrentRulesEqual(t, []ConcurrentRules{ + {0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 37, 38, 58}, + {1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21, 22, 25, 26, 29, 30, 33, 34, 39, 40, 41, 42, 45, 46, 51, 52, 55, 56}, + {3, 7, 11, 15, 19, 23, 27, 31, 35}, + {43, 44, 47, 48, 49, 50, 53, 54, 57}, + }, order) + + group.Eval(ctx, start) + + require.EqualValues(t, 28, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. @@ -2803,3 +2850,14 @@ func BenchmarkRuleDependencyController_AnalyseRules(b *testing.B) { } } } + +func requireConcurrentRulesEqual(t *testing.T, expected, actual []ConcurrentRules) { + t.Helper() + + // Like require.Equals but ignores the order of elements in the slices. + require.Len(t, actual, len(expected)) + for i, expectedBatch := range expected { + actualBatch := actual[i] + require.ElementsMatch(t, expectedBatch, actualBatch) + } +}