Skip to content

Commit

Permalink
feat(alerta-service): Add custom severities to Alerta handler
Browse files Browse the repository at this point in the history
Fix influxdata#2056

This commit allows usage of all or some Alerta Severities. It provides two keywords to fine tune kapacitor built-in severities.

1. First, you can rename kapacitor serverity levels: crit, warn, info, ok to any other severities configured in your Alerta:

    |alert()
        // ...
        .alerta()
            // ...
            .renameSeverity('crit', 'major')
            .renameSeverity('info', 'notice')

I suppose this will cover most of the cases. But if you do want a lot of severity levels:

2. You can add custom severity levels, which will be avaluated on Alerta handler level after built-in alert was triggered.

    |alert()
        // ...
        .warn(lambda: "cpu" > 50)
        .alerta()
            // ...
            .addSeverity('minor', 3, lambda: "cpu" > 60)
            .addSeverity('major', 2, lambda: "cpu" > 70)
            .addSeverity('critical', 1, lambda: "cpu" > 80)
            .addSeverity('fatal', 0, lambda: "cpu" > 90)

Note: evaluation of addSeverity condition only happen after build-in alert is triggered, so you need some entry point (like .warn() in exmple), which should cover all range of values interesting to you.
Note: this severities use Alerta's code order - higher severity has lower code (0 for fatal, 9 for ok)
Note: .addSeverity() is quite useless in combination with .stateChangesOnly(), but Alerta has decent deduplication mechanism, so it shouldn't be a problem
  • Loading branch information
sp1r committed Jun 17, 2021
1 parent cee3f4e commit a7c1fa8
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [#2559](https://github.com/influxdata/kapacitor/pull/2559): kapacitor cli supports flux tasks
- [#2560](https://github.com/influxdata/kapacitor/pull/2560): enable new-style slack apps
- [#2576](https://github.com/influxdata/kapacitor/pull/2576): shared secret auth to influxdb in OSS
- [#2584](https://github.com/influxdata/kapacitor/pull/2584): Add custom severities to Alerta handler

### Bugfixes
- [#2564](https://github.com/influxdata/kapacitor/pull/2564): Fix a panic in the scraper handler when debug mode is enabled
Expand Down
17 changes: 17 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,23 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
if a.Environment != "" {
c.Environment = a.Environment
}
if len(a.RenameSeverities) != 0 {
c.RenameSeverities = a.RenameSeverities
}
if len(a.ExtraSeverities) != 0 {
c.ExtraSeverityExpressions = make([]stateful.Expression, len(a.ExtraSeverities))
c.ExtraSeverityNames = make([]string, len(a.ExtraSeverities))
c.ExtraSeverityScopePools = make([]stateful.ScopePool, len(a.ExtraSeverities))
for i, severity := range a.ExtraSeverities {
statefulExpression, expressionCompileError := stateful.NewExpression(severity.Condition.Expression)
if expressionCompileError != nil {
return nil, fmt.Errorf("Failed to compile stateful expression for Alerta extra severity %s: %s", severity.Name, expressionCompileError)
}
c.ExtraSeverityExpressions[i] = statefulExpression
c.ExtraSeverityNames[i] = severity.Name
c.ExtraSeverityScopePools[i] = stateful.NewScopePool(ast.FindReferenceVariables(severity.Condition.Expression))
}
}
if a.Group != "" {
c.Group = a.Group
}
Expand Down
25 changes: 25 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9197,11 +9197,18 @@ stream
.token('testtoken1234567')
.environment('production')
.timeout(1h)
.alerta()
.token('testtoken7654321')
.environment('production')
.timeout(1h)
.addSeverity('fatal', 1, lambda: "count" > 9.0)
.addSeverity('disaster', 0, lambda: "count" > 15.0)
.alerta()
.token('anothertesttoken')
.resource('resource: {{ index .Tags "host" }}')
.event('event: {{ .TaskName }}')
.environment('{{ index .Tags "host" }}')
.renameSeverity('crit', 'major')
.origin('override')
.group('{{ .ID }}')
.value('{{ index .Fields "count" }}')
Expand All @@ -9227,6 +9234,23 @@ stream
Event: "serverA",
Group: "host=serverA",
Environment: "production",
Severity: "critical",
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "Kapacitor",
Service: []string{"cpu"},
Correlate: []string{"cpu"},
Timeout: 3600,
},
},
alertatest.Request{
URL: "/alert",
Authorization: "Bearer testtoken7654321",
PostData: alertatest.PostData{
Resource: "cpu",
Event: "serverA",
Group: "host=serverA",
Environment: "production",
Severity: "fatal",
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "Kapacitor",
Service: []string{"cpu"},
Expand All @@ -9242,6 +9266,7 @@ stream
Event: "event: TestStream_Alert",
Group: "serverA",
Environment: "serverA",
Severity: "major",
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "override",
Service: []string{"serviceA", "serviceB", "cpu"},
Expand Down
32 changes: 32 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -1219,6 +1220,12 @@ func (n *AlertNodeData) Alerta() *AlertaHandler {
return alerta
}

type AlertaCustomSeverity struct {
Name string `json:"name"`
Code int64 `json:"code"`
Condition *ast.LambdaNode `json:"condition"`
}

// tick:embedded:AlertNode.Alerta
type AlertaHandler struct {
*AlertNodeData `json:"-"`
Expand All @@ -1242,6 +1249,13 @@ type AlertaHandler struct {
// Defaut is set from the configuration.
Environment string `json:"environment"`

// Alerta supports many different severity levels. And it allows you to add even more.
// To benefit from this model we can use two ways:
// Rename kapacitor built-in severities to match some of alerta's severities
RenameSeverities map[string]string `tick:"RenameSeverity" json:"rename-severities"`
// Add post-processing to kapacitor alerts to fine tune severity levels
ExtraSeverities []*AlertaCustomSeverity `tick:"AddSeverity" json:"add-severities"`

// Alerta group.
// Can be a template and has access to the same data as the AlertNode.Details property.
// Default: {{ .Group }}
Expand All @@ -1268,6 +1282,24 @@ type AlertaHandler struct {
Timeout time.Duration `json:"timeout"`
}

func (a *AlertaHandler) RenameSeverity(kapacitorName string, alertaName string) *AlertaHandler {
if a.RenameSeverities == nil {
a.RenameSeverities = make(map[string]string)
}
a.RenameSeverities[kapacitorName] = alertaName
return a
}
func (a *AlertaHandler) AddSeverity(name string, code int64, condition *ast.LambdaNode) *AlertaHandler {
a.ExtraSeverities = append(a.ExtraSeverities, &AlertaCustomSeverity{
Name: name,
Code: code,
Condition: condition,
})
// Alerta severities have descending order: higher severity has lower code (1 for critical, 9 for ok)
sort.SliceStable(a.ExtraSeverities, func(i, j int) bool { return a.ExtraSeverities[i].Code < a.ExtraSeverities[j].Code })
return a
}

// List of effected services.
// If not specified defaults to the Name of the stream.
// tick:property
Expand Down
16 changes: 14 additions & 2 deletions pipeline/tick/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,20 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) {
Dot("token", h.Token).
Dot("resource", h.Resource).
Dot("event", h.Event).
Dot("environment", h.Environment).
Dot("group", h.Group).
Dot("environment", h.Environment)

var severitiesOrder = []string{"ok", "info", "warn", "crit"}
for _, k := range severitiesOrder {
if val, ok := h.RenameSeverities[k]; ok {
n.Dot("renameSeverity", k, val)
}
}

for _, k := range h.ExtraSeverities {
n.Dot("addSeverity", k.Name, k.Code, k.Condition)
}

n.Dot("group", h.Group).
Dot("value", h.Value).
Dot("origin", h.Origin).
Dot("services", args(h.Service)...).
Expand Down
10 changes: 10 additions & 0 deletions pipeline/tick/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,14 @@ func TestAlertAlerta(t *testing.T) {
handler.Resource = "Harbinger"
handler.Event = "Jump through Omega-4 Relay"
handler.Environment = "Collector base"
handler.RenameSeverities = make(map[string]string)
handler.RenameSeverities["info"] = "notice"
handler.ExtraSeverities = make([]*pipeline.AlertaCustomSeverity, 1)
handler.ExtraSeverities[0] = &pipeline.AlertaCustomSeverity{
Name: "major",
Code: 2,
Condition: newLambda(85),
}
handler.Group = "I brought Jack, Miranda and Tali"
handler.Value = "Save the Galaxy"
handler.Origin = "Omega"
Expand All @@ -680,6 +688,8 @@ func TestAlertAlerta(t *testing.T) {
.resource('Harbinger')
.event('Jump through Omega-4 Relay')
.environment('Collector base')
.renameSeverity('info', 'notice')
.addSeverity('major', 2, lambda: "cpu" > 85)
.group('I brought Jack, Miranda and Tali')
.value('Save the Galaxy')
.origin('Omega')
Expand Down
1 change: 1 addition & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10149,6 +10149,7 @@ func TestServer_AlertHandlers(t *testing.T) {
Event: "id",
Group: "test",
Environment: "env",
Severity: "critical",
Text: "message",
Origin: "kapacitor",
Service: []string{"alert"},
Expand Down
1 change: 1 addition & 0 deletions services/alerta/alertatest/alertatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type PostData struct {
Event string `json:"event"`
Group string `json:"group"`
Environment string `json:"environment"`
Severity string `json:"severity"`
Text string `json:"text"`
Origin string `json:"origin"`
Service []string `json:"service"`
Expand Down
125 changes: 108 additions & 17 deletions services/alerta/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
khttp "github.com/influxdata/kapacitor/http"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/influxdata/kapacitor/tick/stateful"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -267,6 +269,16 @@ type HandlerConfig struct {
// Defaut is set from the configuration.
Environment string `mapstructure:"environment"`

// Renaming rules for severities
// Allows to rewrite build-in kapacitor severities (info, warn, crit) to any of Alerta multiple severities
RenameSeverities map[string]string `mapstructure:"rename-severities"`

// Expressions for custom Alerta severities
// Allows to fine tune severity levels of kapacitor
ExtraSeverityExpressions []stateful.Expression `mapstructure:"severity-expressions"`
ExtraSeverityNames []string `mapstructure:"severity-names"`
ExtraSeverityScopePools []stateful.ScopePool `mapstructure:"severity-scope-pool"`

// Alerta group.
// Can be a template and has access to the same data as the AlertNode.Details property.
// Default: {{ .Group }}
Expand Down Expand Up @@ -297,13 +309,17 @@ type handler struct {
c HandlerConfig
diag Diagnostic

resourceTmpl *text.Template
eventTmpl *text.Template
environmentTmpl *text.Template
valueTmpl *text.Template
groupTmpl *text.Template
serviceTmpl []*text.Template
correlateTmpl []*text.Template
resourceTmpl *text.Template
eventTmpl *text.Template
environmentTmpl *text.Template
renameSeverities map[string]string
severityLevels []string
severityExpressions []stateful.Expression
scopePools []stateful.ScopePool
valueTmpl *text.Template
groupTmpl *text.Template
serviceTmpl []*text.Template
correlateTmpl []*text.Template
}

func (s *Service) DefaultHandlerConfig() HandlerConfig {
Expand Down Expand Up @@ -357,16 +373,20 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er
}

return &handler{
s: s,
c: c,
diag: s.diag.WithContext(ctx...),
resourceTmpl: rtmpl,
eventTmpl: evtmpl,
environmentTmpl: etmpl,
groupTmpl: gtmpl,
valueTmpl: vtmpl,
serviceTmpl: stmpl,
correlateTmpl: ctmpl,
s: s,
c: c,
diag: s.diag.WithContext(ctx...),
resourceTmpl: rtmpl,
eventTmpl: evtmpl,
environmentTmpl: etmpl,
renameSeverities: c.RenameSeverities,
severityLevels: c.ExtraSeverityNames,
severityExpressions: c.ExtraSeverityExpressions,
scopePools: c.ExtraSeverityScopePools,
groupTmpl: gtmpl,
valueTmpl: vtmpl,
serviceTmpl: stmpl,
correlateTmpl: ctmpl,
}, nil
}

Expand Down Expand Up @@ -468,20 +488,40 @@ func (h *handler) Handle(event alert.Event) {
}

var severity string
var severityKey string

switch event.State.Level {
case alert.OK:
severity = "ok"
severityKey = "ok"
case alert.Info:
severity = "informational"
severityKey = "info"
case alert.Warning:
severity = "warning"
severityKey = "warn"
case alert.Critical:
severity = "critical"
severityKey = "crit"
default:
severity = "indeterminate"
}

if val, ok := h.renameSeverities[severityKey]; ok {
severity = val
}

if len(h.severityLevels) != 0 {
for i, expression := range h.severityExpressions {
if pass, err := EvalPredicate(expression, h.scopePools[i], event.State.Time, event.Data.Fields, event.Data.Tags); err != nil {
h.diag.Error("error evaluating expression for Alerta severity", err)
} else if pass {
severity = h.severityLevels[i]
break
}
}
}

if err := h.s.Alert(
h.c.Token,
h.c.TokenPrefix,
Expand All @@ -502,3 +542,54 @@ func (h *handler) Handle(event alert.Event) {
h.diag.Error("failed to send event to Alerta", err)
}
}

func EvalPredicate(se stateful.Expression, scopePool stateful.ScopePool, now time.Time, fields models.Fields, tags models.Tags) (bool, error) {
vars := scopePool.Get()
defer scopePool.Put(vars)
err := fillScope(vars, scopePool.ReferenceVariables(), now, fields, tags)
if err != nil {
return false, err
}

// for function signature check
if _, err := se.Type(vars); err != nil {
return false, err
}

return se.EvalBool(vars)
}

// fillScope - given a scope and reference variables, we fill the exact variables from the now, fields and tags.
func fillScope(vars *stateful.Scope, referenceVariables []string, now time.Time, fields models.Fields, tags models.Tags) error {
for _, refVariableName := range referenceVariables {
if refVariableName == "time" {
vars.Set("time", now.Local())
continue
}

// Support the error with tags/fields collision
var fieldValue interface{}
var isFieldExists bool
var tagValue interface{}
var isTagExists bool

if fieldValue, isFieldExists = fields[refVariableName]; isFieldExists {
vars.Set(refVariableName, fieldValue)
}

if tagValue, isTagExists = tags[refVariableName]; isTagExists {
if isFieldExists {
return fmt.Errorf("cannot have field and tags with same name %q", refVariableName)
}
vars.Set(refVariableName, tagValue)
}
if !isFieldExists && !isTagExists {
if !vars.Has(refVariableName) {
vars.Set(refVariableName, ast.MissingValue)
}

}
}

return nil
}

0 comments on commit a7c1fa8

Please sign in to comment.