Skip to content

Commit

Permalink
datastream length to bytes and not unicode chars
Browse files Browse the repository at this point in the history
  • Loading branch information
rubvs committed Sep 19, 2024
1 parent afa3eac commit 6a3dbe9
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 21 deletions.
12 changes: 3 additions & 9 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"encoding/hex"
"strings"
"time"
"unicode/utf8"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -50,7 +49,7 @@ import (
)

const (
MaxDataStreamRunes = 100
MaxDataStreamBytes = 100
DisallowedDataStreamRunes = "-\\/*?\"<>| ,#"
)

Expand Down Expand Up @@ -251,13 +250,8 @@ func (c *Consumer) convertLogRecord(
func sanitizeDataStreamField(field string) string {
field = strings.ToLower(field)
field = strings.Map(replaceReservedRune, field)
// Cannot start with _, +
// https://github.com/elastic/ecs/blob/main/rfcs/text/0009-data_stream-fields.md
if field[0] == '_' || field[0] == '+' {
field = field[1:]
}
if utf8.RuneCountInString(field) > MaxDataStreamRunes {
return string([]rune(field)[:MaxDataStreamRunes])
if len(field) > MaxDataStreamBytes {
return field[:MaxDataStreamBytes]
}
return field
}
Expand Down
30 changes: 26 additions & 4 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ package otlp_test

import (
"context"
"crypto/rand"
"fmt"
"math/big"
"testing"
"time"

Expand Down Expand Up @@ -532,6 +534,10 @@ func processLogEvents(t *testing.T, logs plog.Logs) modelpb.Batch {
}

func TestConsumerConsumeLogsDataStream(t *testing.T) {
randomString, err := generateRandomString(otlp.MaxDataStreamBytes)
maxLen := len(randomString) - len(otlp.DisallowedDataStreamRunes)
assert.NoError(t, err)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -568,11 +574,13 @@ func TestConsumerConsumeLogsDataStream(t *testing.T) {
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: "+Dataset" + otlp.DisallowedDataStreamRunes,
resourceDataStreamNamespace: "_Namespace" + otlp.DisallowedDataStreamRunes,
expectedDataStreamDataset: "dataset____________",
expectedDataStreamNamespace: "namespace____________",
resourceDataStreamDataset: otlp.DisallowedDataStreamRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedDataStreamRunes + randomString,
expectedDataStreamDataset: "____________" + randomString[:maxLen],
expectedDataStreamNamespace: "____________" + randomString[:maxLen],
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
Expand Down Expand Up @@ -838,3 +846,17 @@ func newLogRecord(body interface{}) plog.LogRecord {
}
return otelLogRecord
}

const charset = "abcdefghijklmnopqrstuvwxyz0123456789"

func generateRandomString(length int) (string, error) {
result := make([]byte, length)
for i := range result {
index, err := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
if err != nil {
return "", err
}
result[i] = charset[index.Int64()]
}
return string(result), nil
}
14 changes: 10 additions & 4 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) {
}

func TestConsumeMetricsDataStream(t *testing.T) {
randomString, err := generateRandomString(otlp.MaxDataStreamBytes)
maxLen := len(randomString) - len(otlp.DisallowedDataStreamRunes)
assert.NoError(t, err)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -765,11 +769,13 @@ func TestConsumeMetricsDataStream(t *testing.T) {
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: "Dataset" + otlp.DisallowedDataStreamRunes,
resourceDataStreamNamespace: "Namespace" + otlp.DisallowedDataStreamRunes,
expectedDataStreamDataset: "dataset____________",
expectedDataStreamNamespace: "namespace____________",
resourceDataStreamDataset: otlp.DisallowedDataStreamRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedDataStreamRunes + randomString,
expectedDataStreamDataset: "____________" + randomString[:maxLen],
expectedDataStreamNamespace: "____________" + randomString[:maxLen],
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
Expand Down
14 changes: 10 additions & 4 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,10 @@ func TestSpanNetworkAttributes(t *testing.T) {
}

func TestSpanDataStream(t *testing.T) {
randomString, err := generateRandomString(otlp.MaxDataStreamBytes)
maxLen := len(randomString) - len(otlp.DisallowedDataStreamRunes)
assert.NoError(t, err)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -909,11 +913,13 @@ func TestSpanDataStream(t *testing.T) {
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: "Dataset" + otlp.DisallowedDataStreamRunes,
resourceDataStreamNamespace: "Namespace" + otlp.DisallowedDataStreamRunes,
expectedDataStreamDataset: "dataset____________",
expectedDataStreamNamespace: "namespace____________",
resourceDataStreamDataset: otlp.DisallowedDataStreamRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedDataStreamRunes + randomString,
expectedDataStreamDataset: "____________" + randomString[:maxLen],
expectedDataStreamNamespace: "____________" + randomString[:maxLen],
},
} {
for _, isTxn := range []bool{false, true} {
Expand Down

0 comments on commit 6a3dbe9

Please sign in to comment.