Skip to content

Commit 0e78fa8

Browse files
authored
Add client metadata to key (#306)
* Add client metadata to key This enables persisting client metadata from the incoming metrics, through aggregation, and into the produced metrics.
1 parent c3a96f7 commit 0e78fa8

File tree

7 files changed

+338
-98
lines changed

7 files changed

+338
-98
lines changed

processor/lsmintervalprocessor/config/config.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package config // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config"
1919

2020
import (
21+
"fmt"
22+
"strings"
2123
"time"
2224

2325
"go.opentelemetry.io/collector/component"
@@ -29,21 +31,34 @@ type Config struct {
2931
// Directory is the data directory used by the database to store files.
3032
// If the directory is empty in-memory storage is used.
3133
Directory string `mapstructure:"directory"`
34+
3235
// PassThrough is a configuration that determines whether summary
3336
// metrics should be passed through as they are or aggregated. This
3437
// is because they lead to lossy aggregations.
3538
PassThrough PassThrough `mapstructure:"pass_through"`
39+
3640
// Intervals is a list of interval configuration that the processor
3741
// will aggregate over. The interval duration must be in increasing
3842
// order and must be a factor of the smallest interval duration.
3943
// TODO (lahsivjar): Make specifying interval easier. We can just
4044
// optimize the timer to run on differnt times and remove any
4145
// restriction on different interval configuration.
42-
Intervals []IntervalConfig `mapstructure:"intervals"`
43-
ResourceLimit LimitConfig `mapstructure:"resource_limit"`
44-
ScopeLimit LimitConfig `mapstructure:"scope_limit"`
45-
MetricLimit LimitConfig `mapstructure:"metric_limit"`
46-
DatapointLimit LimitConfig `mapstructure:"datapoint_limit"`
46+
Intervals []IntervalConfig `mapstructure:"intervals"`
47+
48+
// MetadataKeys is a list of client.Metadata keys that will be
49+
// propagated through the metrics aggregated by the processor.
50+
//
51+
// Only the listed metadata keys will be propagated to the
52+
// resulting metrics.
53+
//
54+
// Entries are case-insensitive. Duplicated entries will
55+
// trigger a validation error.
56+
MetadataKeys []string `mapstructure:"metadata_keys"`
57+
58+
ResourceLimit LimitConfig `mapstructure:"resource_limit"`
59+
ScopeLimit LimitConfig `mapstructure:"scope_limit"`
60+
MetricLimit LimitConfig `mapstructure:"metric_limit"`
61+
DatapointLimit LimitConfig `mapstructure:"datapoint_limit"`
4762
}
4863

4964
// PassThrough determines whether metrics should be passed through as they
@@ -93,7 +108,15 @@ type Attribute struct {
93108
Value any `mapstructure:"value"`
94109
}
95110

96-
func (config *Config) Validate() error {
111+
func (cfg *Config) Validate() error {
97112
// TODO (lahsivjar): Add validation for interval duration
113+
uniq := map[string]bool{}
114+
for _, k := range cfg.MetadataKeys {
115+
l := strings.ToLower(k)
116+
if _, has := uniq[l]; has {
117+
return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l)
118+
}
119+
uniq[l] = true
120+
}
98121
return nil
99122
}

processor/lsmintervalprocessor/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.116.0
1212
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.116.0
1313
github.com/stretchr/testify v1.10.0
14+
go.opentelemetry.io/collector/client v1.22.0
1415
go.opentelemetry.io/collector/component v0.116.0
1516
go.opentelemetry.io/collector/component/componenttest v0.116.0
1617
go.opentelemetry.io/collector/confmap v1.22.0
@@ -19,6 +20,7 @@ require (
1920
go.opentelemetry.io/collector/pdata v1.22.0
2021
go.opentelemetry.io/collector/processor v0.116.0
2122
go.opentelemetry.io/collector/processor/processortest v0.116.0
23+
go.opentelemetry.io/otel v1.32.0
2224
go.uber.org/goleak v1.3.0
2325
go.uber.org/zap v1.27.0
2426
)
@@ -81,7 +83,6 @@ require (
8183
go.opentelemetry.io/collector/pipeline v0.116.0 // indirect
8284
go.opentelemetry.io/collector/processor/xprocessor v0.116.0 // indirect
8385
go.opentelemetry.io/collector/semconv v0.116.0 // indirect
84-
go.opentelemetry.io/otel v1.32.0 // indirect
8586
go.opentelemetry.io/otel/metric v1.32.0 // indirect
8687
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
8788
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect

processor/lsmintervalprocessor/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6/go.mod h1:BUbeWZi
140140
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
141141
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
142142
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
143+
go.opentelemetry.io/collector/client v1.22.0 h1:AAUzHuqYQqxoNqacw1WXgGF/MxtBTwNZuhBvJIorgA0=
144+
go.opentelemetry.io/collector/client v1.22.0/go.mod h1:wcCSdTwbDVNTycoqs7BiDNVj3e1Ta7EnWH2sAofKnEk=
143145
go.opentelemetry.io/collector/component v0.116.0 h1:SQE1YeVfYCN7bw1n4hknUwJE5U/1qJL552sDhAdSlaA=
144146
go.opentelemetry.io/collector/component v0.116.0/go.mod h1:MYgXFZWDTq0uPgF1mkLSFibtpNqksRVAOrmihckOQEs=
145147
go.opentelemetry.io/collector/component/componentstatus v0.116.0 h1:wpgY0H2K9IPBzaNAvavKziK86VZ7TuNFQbS9OC4Z6Cs=

processor/lsmintervalprocessor/internal/merger/key.go

Lines changed: 70 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,54 +20,95 @@ package merger // import "github.com/elastic/opentelemetry-collector-components/
2020
import (
2121
"encoding/binary"
2222
"errors"
23+
"fmt"
24+
"slices"
2325
"time"
2426
)
2527

26-
// TODO (lahsivjar): Think about multitenancy, should be part of the key
2728
type Key struct {
2829
Interval time.Duration
2930
ProcessingTime time.Time
30-
}
3131

32-
// NewKey creates a new instance of the merger key.
33-
func NewKey(ivl time.Duration, pTime time.Time) Key {
34-
return Key{
35-
Interval: ivl,
36-
ProcessingTime: pTime,
37-
}
32+
// Metadata holds an ordered list of arbitrary keys and associated
33+
// string values to associate with the interval and processing time.
34+
Metadata []KeyValues
3835
}
3936

40-
// SizeBinary returns the size of the Key when binary encoded.
41-
// The interval, represented by time.Duration, is encoded to
42-
// 2 bytes by converting it into seconds. This allows a max of
43-
// ~18 hours duration.
44-
func (k *Key) SizeBinary() int {
45-
// 2 bytes for interval, 8 bytes for processing time
46-
return 10
37+
type KeyValues struct {
38+
Key string
39+
Values []string
4740
}
4841

49-
// Marshal marshals the key into binary representation.
50-
func (k *Key) Marshal() ([]byte, error) {
51-
ivlSeconds := uint16(k.Interval.Seconds())
52-
53-
var (
54-
offset int
55-
d [10]byte
56-
)
57-
binary.BigEndian.PutUint16(d[offset:], ivlSeconds)
58-
offset += 2
42+
// AppendBinary marshals the key into its binary representation,
43+
// appending it to b.
44+
func (k *Key) AppendBinary(b []byte) ([]byte, error) {
45+
b = slices.Grow(b, 10)
46+
b = binary.BigEndian.AppendUint16(b, uint16(k.Interval.Seconds()))
47+
b = binary.BigEndian.AppendUint64(b, uint64(k.ProcessingTime.Unix()))
48+
if len(k.Metadata) != 0 {
49+
b = binary.AppendUvarint(b, uint64(len(k.Metadata)))
50+
for _, kvs := range k.Metadata {
51+
mk := kvs.Key
52+
mvs := kvs.Values
5953

60-
binary.BigEndian.PutUint64(d[offset:], uint64(k.ProcessingTime.Unix()))
61-
62-
return d[:], nil
54+
b = binary.AppendUvarint(b, uint64(len(mk)))
55+
b = append(b, mk...)
56+
b = binary.AppendUvarint(b, uint64(len(mvs)))
57+
for _, mv := range mvs {
58+
b = binary.AppendUvarint(b, uint64(len(mv)))
59+
b = append(b, mv...)
60+
}
61+
}
62+
}
63+
return b, nil
6364
}
6465

6566
// Unmarshal unmarshals the binary representation of the Key.
6667
func (k *Key) Unmarshal(d []byte) error {
67-
if len(d) != 10 {
68+
if len(d) < 10 {
6869
return errors.New("failed to unmarshal key, invalid sized buffer provided")
6970
}
7071
k.Interval = time.Duration(binary.BigEndian.Uint16(d[:2])) * time.Second
7172
k.ProcessingTime = time.Unix(int64(binary.BigEndian.Uint64(d[2:10])), 0)
73+
74+
d = d[10:]
75+
if len(d) > 0 {
76+
numKeys, n := binary.Uvarint(d)
77+
if n <= 0 {
78+
return fmt.Errorf("error reading number of metadata keys (n=%d)", n)
79+
}
80+
d = d[n:]
81+
k.Metadata = make([]KeyValues, numKeys)
82+
83+
for i := range numKeys {
84+
mklen, n := binary.Uvarint(d)
85+
if n <= 0 {
86+
return fmt.Errorf("error reading metadata key length (n=%d)", n)
87+
}
88+
d = d[n:]
89+
mk := string(d[:mklen])
90+
d = d[mklen:]
91+
92+
numValues, n := binary.Uvarint(d)
93+
if n <= 0 {
94+
return fmt.Errorf("error reading number of metadata values for %q (n=%d)", mk, n)
95+
}
96+
d = d[n:]
97+
mvs := make([]string, numValues)
98+
for i := range numValues {
99+
mvlen, n := binary.Uvarint(d)
100+
if n <= 0 {
101+
return fmt.Errorf("error reading metadata value length for %q (n=%d)", mk, n)
102+
}
103+
d = d[n:]
104+
mv := string(d[:mvlen])
105+
d = d[mvlen:]
106+
mvs[i] = mv
107+
}
108+
109+
k.Metadata[i] = KeyValues{Key: mk, Values: mvs}
110+
}
111+
}
112+
72113
return nil
73114
}

processor/lsmintervalprocessor/internal/merger/key_test.go

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,29 +28,43 @@ import (
2828

2929
func TestKey(t *testing.T) {
3030
for _, tc := range []struct {
31-
name string
32-
ivl time.Duration
33-
processingTime time.Time
31+
name string
32+
key Key
3433
}{
3534
{
36-
name: "zero",
37-
ivl: 0,
38-
processingTime: time.Unix(0, 0),
35+
name: "zero",
36+
key: Key{
37+
Interval: 0,
38+
ProcessingTime: time.Unix(0, 0),
39+
},
3940
},
4041
{
41-
name: "non_zero",
42-
ivl: time.Minute,
43-
processingTime: time.Unix(time.Now().Unix(), 0),
42+
name: "non_zero",
43+
key: Key{
44+
Interval: time.Minute,
45+
ProcessingTime: time.Unix(time.Now().Unix(), 0),
46+
},
47+
},
48+
{
49+
name: "with_metadata_keys",
50+
key: Key{
51+
Interval: time.Minute,
52+
ProcessingTime: time.Unix(time.Now().Unix(), 0),
53+
Metadata: []KeyValues{
54+
{Key: "empty", Values: []string{}},
55+
{Key: "one_empty_value", Values: []string{""}},
56+
{Key: "one_nonempty_value", Values: []string{"non-empty"}},
57+
{Key: "mixed_values", Values: []string{"", "non-empty"}},
58+
},
59+
},
4460
},
4561
} {
4662
t.Run(tc.name, func(t *testing.T) {
47-
key := NewKey(tc.ivl, tc.processingTime)
48-
assert.Equal(t, 10, key.SizeBinary())
49-
b, err := key.Marshal()
63+
b, err := tc.key.AppendBinary(nil)
5064
assert.NoError(t, err)
5165
var newKey Key
5266
assert.NoError(t, newKey.Unmarshal(b))
53-
assert.Equal(t, newKey, key)
67+
assert.Equal(t, newKey, tc.key)
5468
})
5569
}
5670
}
@@ -60,14 +74,20 @@ func TestKeyOrdered(t *testing.T) {
6074
ts := time.Unix(0, 0)
6175
ivl := time.Minute
6276

63-
before := NewKey(ivl, ts)
77+
before := Key{
78+
Interval: ivl,
79+
ProcessingTime: ts,
80+
}
6481
for i := 0; i < 10; i++ {
65-
beforeBytes, err := before.Marshal()
82+
beforeBytes, err := before.AppendBinary(nil)
6683
require.NoError(t, err)
6784

6885
ts = ts.Add(time.Minute)
69-
after := NewKey(ivl, ts)
70-
afterBytes, err := after.Marshal()
86+
after := Key{
87+
Interval: ivl,
88+
ProcessingTime: ts,
89+
}
90+
afterBytes, err := after.AppendBinary(nil)
7191
require.NoError(t, err)
7292

7393
// before should always come first

0 commit comments

Comments
 (0)