Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions docs/sources/operations/blocking-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ overrides:
# block any query that matches this query hash
- hash: 2943214005 # hash of {stream="stdout",pod="loki-canary-9w49x"}
types: filter,limited

# block queries originating from specific sources via X-Query-Tags
# Keys and values are matched case-insensitively.
- pattern: '.*' # optional; if pattern and regex are omittied they will default to '.*' and true
regex: true
tags:
source: grafana
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tags need some explanation. The current examples could be misunderstood to mean that this pattern is a beta feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, reading further I see that is exactly what you're doing, blocking queries for a beta feature. Never mind...

feature: beta
```
{{< admonition type="note" >}}
Changes to these configurations **do not require a restart**; they are defined in the [runtime configuration file](https://grafana.com/docs/loki/<LOKI_VERSION>/configure/#runtime-configuration-file).
Expand Down Expand Up @@ -61,6 +69,48 @@ The order of patterns is preserved, so the first matching pattern will be used.

Blocked queries are logged, as well as counted in the `loki_blocked_queries` metric on a per-tenant basis.

When a policy matches by pattern/hash/regex, Loki logs whether the query type and request tags matched that policy:

```logfmt
level=warn msg="query blocker matched with regex policy" user=29 type=metric pattern=".*rate\\(.*\\).*" query="sum(rate({app=\"foo\"}[5m]))" typesMatched=true tagsMatched=false blocked=false
```

If tag constraints fail to match, Loki emits a debug log showing the missing key and the raw header value that was received:

```logfmt
level=debug msg="query blocker tags mismatch: missing or mismatched key" key=feature tagsRaw="Source=grafana,Feature=alpha"
```

## Scope

Queries received via the API and executed as [alerting/recording rules](../../alert/) will be blocked.

## Tag-based blocking

You can scope a blocked query rule to requests that include specific key=value pairs in the `X-Query-Tags` header.

- Header format: `key=value` pairs separated by commas, for example: `Source=grafana,Feature=beta`.
- Allowed characters are alphanumeric plus space, comma, equals, '@', '.', and '-'. Any other characters are replaced with `_`.
- Parsing keeps only canonical `key=value` tokens; malformed tokens are ignored.
- Matching rules:
- Keys are matched case-insensitively (the server lowercases keys).
- Values are matched case-insensitively.
- All specified `tags:` pairs in the rule must be present in the request to apply the block.

Examples:

```yaml
overrides:
tenant-a:
blocked_queries:
# Block only metric queries from a beta feature flag
- types: metric
tags:
feature: beta

# Combine with regex to narrow scope further
- pattern: '.*rate\\(.*\\).*'
regex: true
tags:
source: grafana
```
75 changes: 64 additions & 11 deletions pkg/logql/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/regexp"

"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/httpreq"
logutil "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/validation"
)
Expand Down Expand Up @@ -45,8 +46,9 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {

if b.Hash > 0 {
if b.Hash == util.HashedQuery(query) {
level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query)
return qb.block(b, typ, logger)
typesMatched, tagsMatched, blocked := qb.block(ctx, b, typ, logger)
level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query, "typesMatched", typesMatched, "tagsMatched", tagsMatched, "blocked", blocked)
return blocked
}

return false
Expand All @@ -59,8 +61,9 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
}

if strings.TrimSpace(b.Pattern) == strings.TrimSpace(query) {
level.Warn(logger).Log("msg", "query blocker matched with exact match policy", "query", query)
return qb.block(b, typ, logger)
typesMatched, tagsMatched, blocked := qb.block(ctx, b, typ, logger)
level.Warn(logger).Log("msg", "query blocker matched with exact match policy", "query", query, "typesMatched", typesMatched, "tagsMatched", tagsMatched, "blocked", blocked)
return blocked
}

if b.Regex {
Expand All @@ -71,19 +74,22 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
}

if r.MatchString(query) {
level.Warn(logger).Log("msg", "query blocker matched with regex policy", "pattern", b.Pattern, "query", query)
return qb.block(b, typ, logger)
typesMatched, tagsMatched, blocked := qb.block(ctx, b, typ, logger)
level.Warn(logger).Log("msg", "query blocker matched with regex policy", "pattern", b.Pattern, "query", query, "typesMatched", typesMatched, "tagsMatched", tagsMatched, "blocked", blocked)
return blocked
}
}
}

return false
}

func (qb *queryBlocker) block(q *validation.BlockedQuery, typ string, logger log.Logger) bool {
// no specific types to validate against, so query is blocked
func (qb *queryBlocker) block(ctx context.Context, q *validation.BlockedQuery, typ string, logger log.Logger) (bool, bool, bool) {
// returns: (typesMatched, tagsMatched, blocked)
// no specific types to validate against, so only tags (if any) need to match
if len(q.Types) == 0 {
return true
tagsMatched := qb.tagsMatch(ctx, q, logger)
return true, tagsMatched, tagsMatched
}

matched := false
Expand All @@ -97,8 +103,55 @@ func (qb *queryBlocker) block(q *validation.BlockedQuery, typ string, logger log
// query would be blocked, but it didn't match specified types
if !matched {
level.Debug(logger).Log("msg", "query blocker matched pattern, but not specified types", "pattern", q.Pattern, "regex", q.Regex, "hash", q.Hash, "types", q.Types.String(), "queryType", typ)
return false
return false, false, false
}

return true
// Types matched; ensure tags (if any) also match
tagsMatched := qb.tagsMatch(ctx, q, logger)
return true, tagsMatched, tagsMatched
}

func (qb *queryBlocker) tagsMatch(ctx context.Context, q *validation.BlockedQuery, logger log.Logger) bool {
// if no tags are expected, we treat all queries as matching
if len(q.Tags) == 0 {
return true
}

raw := httpreq.ExtractQueryTagsFromContext(ctx)
// TagsToKeyValues is expected to always return an even set of key value pairs
kvs := httpreq.TagsToKeyValues(raw)

// Build a lowercased expected map once (size m) and scan kvs once (size n)
expected := make(map[string]string, len(q.Tags))
for k, v := range q.Tags {
expected[strings.ToLower(k)] = v
}

// iterate over the keys in the context and see if they match the expected tags
for i := 0; i+1 < len(kvs) && len(expected) > 0; i += 2 {
k, okK := kvs[i].(string)
v, okV := kvs[i+1].(string)
if !okK || !okV {
continue
}

keyLower := strings.ToLower(k)
if expVal, ok := expected[keyLower]; ok {
if strings.EqualFold(v, expVal) {
// this key and value match, remove this key from the expected map of tags
delete(expected, keyLower)
}
}
}

// if all expect tags matched, they would all have been removed from the map
// we only block the query if all expected tags matched
if len(expected) == 0 {
return true
}

for k := range expected {
level.Debug(logger).Log("msg", "query blocker tags mismatch: missing or mismatched key", "key", k, "tagsRaw", raw)
}
return false
}
93 changes: 93 additions & 0 deletions pkg/logql/blocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/validation"
)

Expand Down Expand Up @@ -160,3 +161,95 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) {
})
}
}

func TestEngine_ExecWithBlockedQueries_Tags(t *testing.T) {
limits := &fakeLimits{maxSeries: 10}
eng := NewEngine(EngineOpts{}, getLocalQuerier(100000), limits, log.NewNopLogger())

defaultQuery := `topk(1,rate(({app=~"foo|bar"})[1m]))`

for _, test := range []struct {
name string
q string
tagsHeader string
blocked []*validation.BlockedQuery
expectedErr error
}{
{
name: "block when tags match and no types",
q: defaultQuery,
tagsHeader: "Source=grafana,Feature=beta",
blocked: []*validation.BlockedQuery{
{
// no pattern specified -> matches all by default
Tags: map[string]string{"source": "grafana", "feature": "beta"},
},
},
expectedErr: logqlmodel.ErrBlocked,
},
{
name: "do not block when tags value mismatches",
q: defaultQuery,
tagsHeader: "Source=grafana,Feature=alpha",
blocked: []*validation.BlockedQuery{
{
Pattern: ".*",
Regex: true,
Tags: map[string]string{"feature": "beta"},
},
},
expectedErr: nil,
},
{
name: "block when types and tags match",
q: defaultQuery,
tagsHeader: "Source=grafana,Feature=beta",
blocked: []*validation.BlockedQuery{
{
Pattern: ".*",
Regex: true,
Types: []string{QueryTypeMetric},
Tags: map[string]string{"source": "GRAFANA", "feature": "BETA"}, // case-insensitive
},
},
expectedErr: logqlmodel.ErrBlocked,
},
{
name: "do not block when types match but required tag key missing",
q: defaultQuery,
tagsHeader: "Source=grafana",
blocked: []*validation.BlockedQuery{
{
Pattern: ".*",
Regex: true,
Types: []string{QueryTypeMetric},
Tags: map[string]string{"feature": "beta"},
},
},
expectedErr: nil,
},
} {
t.Run(test.name, func(t *testing.T) {
limits.blockedQueries = test.blocked

params, err := NewLiteralParams(test.q, time.Unix(0, 0), time.Unix(100000, 0), 60*time.Second, 0, logproto.FORWARD, 1000, nil, nil)
require.NoError(t, err)
q := eng.Query(params)

ctx := user.InjectOrgID(context.Background(), "fake")
if test.tagsHeader != "" {
ctx = httpreq.InjectQueryTags(ctx, test.tagsHeader)
}

_, err = q.Exec(ctx)

if test.expectedErr == nil {
require.NoError(t, err)
return
}

require.Error(t, err)
require.Equal(t, err.Error(), test.expectedErr.Error())
})
}
}
4 changes: 4 additions & 0 deletions pkg/util/validation/blocked_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ type BlockedQuery struct {
Regex bool `yaml:"regex"`
Hash uint32 `yaml:"hash"`
Types flagext.StringSliceCSV `yaml:"types"`
// Tags defines a set of key=value constraints that must all match the
// incoming request tags (from X-Query-Tags) for this rule to apply.
// Keys are case-insensitive; values are matched case-insensitively.
Tags map[string]string `yaml:"query_tags"`
}