Skip to content

Commit dd52234

Browse files
authored
chore: Use first-wins semantics for duplicate fields in logfmt/json parsers (#19568)
1 parent 977fb2b commit dd52234

File tree

4 files changed

+61
-6
lines changed

4 files changed

+61
-6
lines changed

pkg/engine/internal/executor/parse_json.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ func (j *jsonParser) process(line []byte, requestedKeys []string) (map[string]st
7676
err := jsonparser.ObjectEach(line, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error {
7777
return j.parseObject(key, value, dataType, result, requestedKeyLookup)
7878
})
79-
8079
// If there's an error, return empty result for consistency with malformed JSON handling
8180
if err != nil {
8281
return make(map[string]string), err
@@ -142,6 +141,11 @@ func (j *jsonParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu
142141
// Convert the value to string based on its type
143142
parsedValue := parseValue(value, dataType)
144143
if parsedValue != "" {
144+
// First-wins semantics for duplicates
145+
_, exists := result[keyString]
146+
if exists {
147+
return nil
148+
}
145149
result[keyString] = parsedValue
146150
}
147151

pkg/engine/internal/executor/parse_logfmt.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func buildLogfmtColumns(input *array.String, requestedKeys []string) ([]string,
1313
}
1414

1515
// tokenizeLogfmt parses logfmt input using the standard decoder
16-
// Returns a map of key-value pairs with last-wins semantics for duplicates
16+
// Returns a map of key-value pairs with first-wins semantics for duplicates
1717
// If requestedKeys is provided, the result will be filtered to only include those keys
1818
func tokenizeLogfmt(input string, requestedKeys []string) (map[string]string, error) {
1919
result := make(map[string]string)
@@ -37,11 +37,15 @@ func tokenizeLogfmt(input string, requestedKeys []string) (map[string]string, er
3737

3838
val := decoder.Value()
3939
if len(val) == 0 {
40-
//TODO: retain empty values if --keep-empty is set
40+
// TODO: retain empty values if --keep-empty is set
4141
continue
4242
}
4343

44-
// Last-wins semantics for duplicates
44+
// First-wins semantics for duplicates
45+
_, exists := result[key]
46+
if exists {
47+
continue
48+
}
4549
result[key] = unsafeString(decoder.Value())
4650
}
4751

pkg/engine/internal/executor/parse_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,24 @@ func TestNewParsePipeline_logfmt(t *testing.T) {
237237
},
238238
},
239239
},
240+
{
241+
name: "handle duplicate keys with first-wins semantics",
242+
schema: arrow.NewSchema([]arrow.Field{
243+
semconv.FieldFromFQN("utf8.builtin.message", true),
244+
}, nil),
245+
input: arrowtest.Rows{
246+
{colMsg: "level=info status=200 level=debug"},
247+
},
248+
requestedKeys: nil,
249+
expectedFields: 3, // 3 columns: message, level, status
250+
expectedOutput: arrowtest.Rows{
251+
{
252+
colMsg: "level=info status=200 level=debug",
253+
"utf8.parsed.level": "info",
254+
"utf8.parsed.status": "200",
255+
},
256+
},
257+
},
240258
} {
241259
t.Run(tt.name, func(t *testing.T) {
242260
// Create input data with message column containing logfmt
@@ -625,6 +643,23 @@ func TestNewParsePipeline_JSON(t *testing.T) {
625643
},
626644
},
627645
},
646+
{
647+
name: "duplicate field name takes first value",
648+
schema: arrow.NewSchema([]arrow.Field{
649+
semconv.FieldFromFQN("utf8.builtin.message", true),
650+
}, nil),
651+
input: arrowtest.Rows{
652+
{colMsg: `{"app": "foo", "app": "duplicate"}`},
653+
},
654+
requestedKeys: nil, // Extract all keys
655+
expectedFields: 2, // message, app
656+
expectedOutput: arrowtest.Rows{
657+
{
658+
colMsg: `{"app": "foo", "app": "duplicate"}`,
659+
"utf8.parsed.app": "foo",
660+
},
661+
},
662+
},
628663
} {
629664
t.Run(tt.name, func(t *testing.T) {
630665
// Create input data with message column containing JSON

pkg/logql/log/parser_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ func Test_jsonParser_Parse(t *testing.T) {
5757
},
5858
NoParserHints(),
5959
},
60-
{"numeric",
60+
{
61+
"numeric",
6162
[]byte(`{"counter":1, "price": {"_net_":5.56909}}`),
6263
labels.EmptyLabels(),
6364
labels.FromStrings("counter", "1",
@@ -665,6 +666,18 @@ func TestJSONExpressionParser(t *testing.T) {
665666
),
666667
NoParserHints(),
667668
},
669+
{
670+
"duplicate field name takes first value",
671+
[]byte(`{"app":"foo","app":"duplicate"}`),
672+
[]LabelExtractionExpr{
673+
NewLabelExtractionExpr("app", `app`),
674+
},
675+
labels.FromStrings("foo", "bar"),
676+
labels.FromStrings("foo", "bar",
677+
"app", "foo",
678+
),
679+
NoParserHints(),
680+
},
668681
}
669682
for _, tt := range tests {
670683
t.Run(tt.name, func(t *testing.T) {
@@ -1428,7 +1441,6 @@ func TestLogfmtConsistentPrecedence(t *testing.T) {
14281441
require.Equal(t, ParsedLabel, cat)
14291442
require.True(t, ok)
14301443
})
1431-
14321444
}
14331445

14341446
func TestLogfmtExpressionParser(t *testing.T) {

0 commit comments

Comments
 (0)