diff --git a/CHANGELOG.md b/CHANGELOG.md index e601c1651f44..d9e5763e426d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -280,6 +280,8 @@ v0.40.0 (2024-02-27) - Python profiling using eBPF is now aggregated now by kernel space. [PR](https://github.com/grafana/pyroscope/pull/2996) (@korniltsev) +- Add Luhn filter to `loki.process` to filter PCI data from log data + ### Bugfixes - Fix an issue in `remote.s3` where the exported content of an object would be an empty string if `remote.s3` failed to fully retrieve diff --git a/docs/sources/flow/reference/components/loki.process.md b/docs/sources/flow/reference/components/loki.process.md index f30efb576793..05eb467d63c2 100644 --- a/docs/sources/flow/reference/components/loki.process.md +++ b/docs/sources/flow/reference/components/loki.process.md @@ -65,6 +65,7 @@ The following blocks are supported inside the definition of `loki.process`: | stage.labels | [stage.labels][] | Configures a `labels` processing stage. | no | | stage.limit | [stage.limit][] | Configures a `limit` processing stage. | no | | stage.logfmt | [stage.logfmt][] | Configures a `logfmt` processing stage. | no | +| stage.luhn | [stage.luhn][] | Configures a `luhn` processing stage. | no | | stage.match | [stage.match][] | Configures a `match` processing stage. | no | | stage.metrics | [stage.metrics][] | Configures a `metrics` stage. | no | | stage.multiline | [stage.multiline][] | Configures a `multiline` processing stage. | no | @@ -95,6 +96,7 @@ file. [stage.labels]: #stagelabels-block [stage.limit]: #stagelimit-block [stage.logfmt]: #stagelogfmt-block +[stage.luhn]: #stageluhn-block [stage.match]: #stagematch-block [stage.metrics]: #stagemetrics-block [stage.multiline]: #stagemultiline-block @@ -566,6 +568,47 @@ set of extracted data, with the value of `user=foo`. The second stage parses the contents of `extra` and appends the `username: foo` key-value pair to the set of extracted data. +### stage.luhn block + +The `stage.luhn` inner block configures a processing stage that reads incoming +log lines and redacts strings that match a Luhn algorithm. + +The [Luhn algorithm][] is a simple checksum formula used to validate various +identification numbers, such as credit card numbers, IMEI numbers, National +Provider Identifier numbers in the US, and Canadian Social Insurance Numbers. +Many Payment Card Industry environments require these numbers to be redacted. + +[Luhn algorithm]: https://en.wikipedia.org/wiki/Luhn_algorithm + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| ------------- | ------------- | ---------------------------------------------- | ---------------- | -------- | +| `replacement` | `string` | String to substitute the matched patterns with | `"**REDACTED**"` | no | +| `source` | `string` | Source of the data to parse. | `""` | no | +| `minLength` | `int` | Minimum length of digits to consider | `13` | no | + + +The `source` field defines the source of data to search. When `source` is +missing or empty, the stage parses the log line itself, but it can also be used +to parse a previously extracted value. + +The following example log line contains an approved credit card number. + +``` +time=2012-11-01T22:08:41+00:00 app=loki level=WARN duration=125 message="credit card approved 4032032513548443" extra="user=foo" + +stage.luhn { + replacement = "**DELETED**" +} +``` + +The stage parses the log line, redacts the credit card number, and produces the following updated log line: + +``` +time=2012-11-01T22:08:41+00:00 app=loki level=INFO duration=125 message="credit card approved **DELETED**" extra="user=foo" +``` + ### stage.match block The `stage.match` inner block configures a filtering stage that can conditionally diff --git a/internal/component/loki/process/stages/luhn.go b/internal/component/loki/process/stages/luhn.go new file mode 100644 index 000000000000..60610e659f5a --- /dev/null +++ b/internal/component/loki/process/stages/luhn.go @@ -0,0 +1,147 @@ +package stages + +import ( + "strconv" + "strings" + "time" + "unicode" + + "github.com/prometheus/common/model" +) + +// LuhnFilterConfig configures a processing stage that filters out Luhn-valid numbers. +type LuhnFilterConfig struct { + Replacement string `river:"replacement,attr,optional"` + Source *string `river:"source,attr,optional"` + MinLength int `river:"min_length,attr,optional"` +} + +// validateLuhnFilterConfig validates the LuhnFilterConfig. +func validateLuhnFilterConfig(c LuhnFilterConfig) error { + if c.Replacement == "" { + c.Replacement = "**REDACTED**" + } + if c.MinLength < 1 { + c.MinLength = 13 + } + if c.Source != nil && *c.Source == "" { + return ErrEmptyRegexStageSource + } + return nil +} + +// newLuhnFilterStage creates a new LuhnFilterStage. +func newLuhnFilterStage(config LuhnFilterConfig) (Stage, error) { + if err := validateLuhnFilterConfig(config); err != nil { + return nil, err + } + return toStage(&luhnFilterStage{ + config: &config, + }), nil +} + +// luhnFilterStage applies Luhn algorithm filtering to log entries. +type luhnFilterStage struct { + config *LuhnFilterConfig +} + +// Process implements Stage. +func (r *luhnFilterStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { + input := entry + if r.config.Source != nil { + value, ok := extracted[*r.config.Source] + if !ok { + return + } + strVal, ok := value.(string) + if !ok { + return + } + input = &strVal + } + + if input == nil { + return + } + + // Replace Luhn-valid numbers in the input. + updatedEntry := replaceLuhnValidNumbers(*input, r.config.Replacement, r.config.MinLength) + *entry = updatedEntry +} + +// replaceLuhnValidNumbers scans the input for Luhn-valid numbers and replaces them. + +func replaceLuhnValidNumbers(input, replacement string, minLength int) string { + var sb strings.Builder + var currentNumber strings.Builder + + flushNumber := func() { + // If the number is at least minLength, check if it's a Luhn-valid number. + if currentNumber.Len() >= minLength { + numberStr := currentNumber.String() + number, err := strconv.Atoi(numberStr) + if err == nil && isLuhn(number) { + // If the number is Luhn-valid, replace it. + sb.WriteString(replacement) + } else { + // If the number is not Luhn-valid, write it as is. + sb.WriteString(numberStr) + } + } else if currentNumber.Len() > 0 { + // If the number is less than minLength but not empty, write it as is. + sb.WriteString(currentNumber.String()) + } + // Reset the current number. + currentNumber.Reset() + } + + // Iterate over the input, replacing Luhn-valid numbers. + for _, char := range input { + // If the character is a digit, add it to the current number. + if unicode.IsDigit(char) { + currentNumber.WriteRune(char) + } else { + // If the character is not a digit, flush the current number and write the character. + flushNumber() + sb.WriteRune(char) + } + } + flushNumber() // Ensure any trailing number is processed + + return sb.String() +} + +// isLuhn check number is valid or not based on Luhn algorithm +func isLuhn(number int) bool { + // Luhn algorithm is a simple checksum formula used to validate a + // variety of identification numbers, such as credit card numbers, IMEI + // numbers, National Provider Identifier numbers in the US, and + // Canadian Social Insurance Numbers. This is a simple implementation + // of the Luhn algorithm. + // https://en.wikipedia.org/wiki/Luhn_algorithm + return (number%10+checksum(number/10))%10 == 0 +} + +func checksum(number int) int { + var luhn int + + for i := 0; number > 0; i++ { + cur := number % 10 + + if i%2 == 0 { // even + cur *= 2 + if cur > 9 { + cur = cur%10 + cur/10 + } + } + + luhn += cur + number /= 10 + } + return luhn % 10 +} + +// Name implements Stage. +func (r *luhnFilterStage) Name() string { + return StageTypeLuhn +} diff --git a/internal/component/loki/process/stages/luhn_test.go b/internal/component/loki/process/stages/luhn_test.go new file mode 100644 index 000000000000..ef618aa863bb --- /dev/null +++ b/internal/component/loki/process/stages/luhn_test.go @@ -0,0 +1,54 @@ +package stages + +import ( + "testing" +) + +// Test cases for the Luhn algorithm validation +func TestIsLuhnValid(t *testing.T) { + cases := []struct { + input int + want bool + }{ + {4539_1488_0343_6467, true}, // Valid Luhn number + {1234_5678_1234_5670, true}, // Another valid Luhn number + {499_2739_8112_1717, false}, // Invalid Luhn number + {1234567812345678, false}, // Another invalid Luhn number + {3782_822463_10005, true}, // Short, valid Luhn number + {123, false}, // Short, invalid Luhn number + } + + for _, c := range cases { + got := isLuhn(c.input) + if got != c.want { + t.Errorf("isLuhnValid(%q) == %t, want %t", c.input, got, c.want) + } + } +} + +// TestReplaceLuhnValidNumbers tests the replaceLuhnValidNumbers function. +func TestReplaceLuhnValidNumbers(t *testing.T) { + cases := []struct { + input string + replacement string + want string + }{ + // Test case with a single Luhn-valid number + {"My credit card number is 3530111333300000.", "**REDACTED**", "My credit card number is **REDACTED**."}, + // Test case with multiple Luhn-valid numbers + {"Cards 4532015112830366 and 6011111111111117 are valid.", "**REDACTED**", "Cards **REDACTED** and **REDACTED** are valid."}, + // Test case with no Luhn-valid numbers + {"No valid numbers here.", "**REDACTED**", "No valid numbers here."}, + // Test case with mixed content + {"Valid: 4556737586899855, invalid: 1234.", "**REDACTED**", "Valid: **REDACTED**, invalid: 1234."}, + // Test case with edge cases + {"Edge cases: 0, 00, 000, 1.", "**REDACTED**", "Edge cases: 0, 00, 000, 1."}, + } + + for _, c := range cases { + got := replaceLuhnValidNumbers(c.input, c.replacement, 13) + if got != c.want { + t.Errorf("replaceLuhnValidNumbers(%q, %q) == %q, want %q", c.input, c.replacement, got, c.want) + } + } +} diff --git a/internal/component/loki/process/stages/pipeline.go b/internal/component/loki/process/stages/pipeline.go index fb1be291e5d5..e24642c84cba 100644 --- a/internal/component/loki/process/stages/pipeline.go +++ b/internal/component/loki/process/stages/pipeline.go @@ -28,6 +28,7 @@ type StageConfig struct { LabelsConfig *LabelsConfig `river:"labels,block,optional"` LimitConfig *LimitConfig `river:"limit,block,optional"` LogfmtConfig *LogfmtConfig `river:"logfmt,block,optional"` + LuhnFilterConfig *LuhnFilterConfig `river:"luhn,block,optional"` MatchConfig *MatchConfig `river:"match,block,optional"` MetricsConfig *MetricsConfig `river:"metrics,block,optional"` MultilineConfig *MultilineConfig `river:"multiline,block,optional"` diff --git a/internal/component/loki/process/stages/stage.go b/internal/component/loki/process/stages/stage.go index a5657d570ea0..0958bfdf09b2 100644 --- a/internal/component/loki/process/stages/stage.go +++ b/internal/component/loki/process/stages/stage.go @@ -28,6 +28,7 @@ const ( StageTypeLabelDrop = "labeldrop" StageTypeLimit = "limit" StageTypeLogfmt = "logfmt" + StageTypeLuhn = "luhn" StageTypeMatch = "match" StageTypeMetric = "metrics" StageTypeMultiline = "multiline" @@ -136,6 +137,11 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh if err != nil { return nil, err } + case cfg.LuhnFilterConfig != nil: + s, err = newLuhnFilterStage(*cfg.LuhnFilterConfig) + if err != nil { + return nil, err + } case cfg.MetricsConfig != nil: s, err = newMetricStage(logger, *cfg.MetricsConfig, registerer) if err != nil {