Skip to content

Commit 71c4458

Browse files
committed
Better use label reusing.
1 parent abb4a42 commit 71c4458

File tree

7 files changed

+77
-23
lines changed

7 files changed

+77
-23
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.23
55
toolchain go1.23.1
66

77
require (
8+
github.com/dghubble/trie v0.1.0
89
github.com/dolthub/swiss v0.2.1
910
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3
1011
github.com/go-kit/log v0.2.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
2626
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2727
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
2828
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
29+
github.com/dghubble/trie v0.1.0 h1:kJnjBLFFElBwS60N4tkPvnLhnpcDxbBjIulgI8CpNGM=
30+
github.com/dghubble/trie v0.1.0/go.mod h1:sOmnzfBNH7H92ow2292dDFWNsVQuh/izuD7otCYb1ak=
2931
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
3032
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
3133
github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw=

serialization/appender.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"github.com/grafana/walqueue/types/v2"
7+
"sync"
78
"time"
89

910
"github.com/go-kit/log"
@@ -15,6 +16,34 @@ import (
1516
"github.com/prometheus/prometheus/storage"
1617
)
1718

19+
var metricSinglePool = sync.Pool{
20+
New: func() any {
21+
return &types.Metric{}
22+
},
23+
}
24+
25+
func GetMetricFromPool() *types.Metric {
26+
return metricSinglePool.Get().(*types.Metric)
27+
}
28+
29+
func PutMetricSliceIntoPool(m []*types.Metric) {
30+
for _, mt := range m {
31+
PutMetricIntoPool(mt)
32+
}
33+
}
34+
35+
func PutMetricIntoPool(m *types.Metric) {
36+
m.Hash = 0
37+
m.TS = 0
38+
m.Value = 0
39+
// We dont reuse these labels since they are owned by the scraper.
40+
m.Labels = nil
41+
m.Histogram = nil
42+
m.FloatHistogram = nil
43+
44+
metricSinglePool.Put(m)
45+
}
46+
1847
type appender struct {
1948
ctx context.Context
2049
ttl time.Duration
@@ -42,8 +71,8 @@ func NewAppender(ctx context.Context, ttl time.Duration, s types.Serializer, log
4271

4372
// Append metric
4473
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
45-
ts := types.GetMetricFromPool()
46-
ts.Labels = l.Copy()
74+
ts := types.GetMetricFromPool(false)
75+
ts.Labels = l
4776
ts.TS = t
4877
ts.Value = v
4978
ts.Hash = l.Hash()
@@ -67,7 +96,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem
6796
if e.HasTs && e.Ts < endTime {
6897
return ref, nil
6998
}
70-
ts := types.GetMetricFromPool()
99+
ts := types.GetMetricFromPool(false)
71100
ts.Hash = e.Labels.Hash()
72101
ts.TS = e.Ts
73102
ts.Labels = e.Labels.Copy()
@@ -82,7 +111,7 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
82111
if t < endTime {
83112
return ref, nil
84113
}
85-
ts := types.GetMetricFromPool()
114+
ts := types.GetMetricFromPool(false)
86115
ts.Labels = l.Copy()
87116
ts.TS = t
88117
if h != nil {

serialization/serializer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestRoundTripSerialization(t *testing.T) {
3838
s.Start()
3939
defer s.Stop()
4040
for i := 0; i < 10; i++ {
41-
tss := types.GetMetricFromPool()
41+
tss := types.GetMetricFromPool(true)
4242
tss.Labels = make(labels.Labels, 10)
4343
for j := 0; j < 10; j++ {
4444
tss.Labels[j] = labels.Label{
@@ -101,7 +101,7 @@ func (f *fqq) Store(ctx context.Context, meta map[string]string, value []byte) e
101101
metrics, _, err := v2.DeserializeToSeriesGroup(sg, f.buf)
102102
require.NoError(f.t, err)
103103
require.Len(f.t, sg.Series, 10)
104-
for _, series := range metrics {
104+
for _, series := range metrics.M {
105105
require.Len(f.t, series.Labels, 10)
106106
for j := 0; j < 10; j++ {
107107
series.Labels[j].Name = fmt.Sprintf("name_%d_%d", int(series.Value), j)

types/metric.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,21 @@ type Metrics struct {
2121
M []*Metric
2222
}
2323

24-
func (m *Metrics) Resize(length int) {
24+
func (m *Metrics) Resize(length int, canLabelsBeReused bool) {
2525
m.M = make([]*Metric, length)
2626
for i := range m.M {
27-
m.M[i] = GetMetricFromPool()
27+
m.M[i] = GetMetricFromPool(canLabelsBeReused)
2828
}
2929
}
3030

3131
type Metric struct {
32-
Labels labels.Labels
33-
TS int64
34-
Value float64
35-
Hash uint64
36-
Histogram *histogram.Histogram
37-
FloatHistogram *histogram.FloatHistogram
32+
Labels labels.Labels
33+
TS int64
34+
Value float64
35+
Hash uint64
36+
Histogram *histogram.Histogram
37+
FloatHistogram *histogram.FloatHistogram
38+
canLabelsBeReused bool
3839
}
3940

4041
// IsMetadata is used because it's easier to store metadata as a set of labels.
@@ -43,15 +44,28 @@ func (m Metric) IsMetadata() bool {
4344
}
4445

4546
var OutstandingIndividualMetrics = atomic.Int32{}
46-
var metricSinglePool = sync.Pool{
47+
var metricReusableLabelPool = sync.Pool{
4748
New: func() any {
4849
return &Metric{}
4950
},
5051
}
5152

52-
func GetMetricFromPool() *Metric {
53+
var metricNonReusablePool = sync.Pool{
54+
New: func() any {
55+
return &Metric{}
56+
},
57+
}
58+
59+
func GetMetricFromPool(canLabelsBeReused bool) *Metric {
5360
OutstandingIndividualMetrics.Inc()
54-
return metricSinglePool.Get().(*Metric)
61+
var m *Metric
62+
if canLabelsBeReused {
63+
m = metricReusableLabelPool.Get().(*Metric)
64+
} else {
65+
m = metricNonReusablePool.Get().(*Metric)
66+
}
67+
m.canLabelsBeReused = canLabelsBeReused
68+
return m
5569
}
5670

5771
func PutMetricSliceIntoPool(m []*Metric) {
@@ -66,9 +80,17 @@ func PutMetricIntoPool(m *Metric) {
6680
m.Hash = 0
6781
m.TS = 0
6882
m.Value = 0
69-
m.Labels = m.Labels[:0]
83+
if m.canLabelsBeReused {
84+
m.Labels = m.Labels[:0]
85+
} else {
86+
m.Labels = nil
87+
}
7088
m.Histogram = nil
7189
m.FloatHistogram = nil
7290

73-
metricSinglePool.Put(m)
91+
if m.canLabelsBeReused {
92+
metricReusableLabelPool.Put(m)
93+
} else {
94+
metricNonReusablePool.Put(m)
95+
}
7496
}

types/v1/serialization_func.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func DeserializeToSeriesGroup(sg *SeriesGroup, buf []byte) (*types.Metrics, *typ
268268
}
269269
// Need to fill in the labels.
270270
metrics := &types.Metrics{}
271-
metrics.Resize(len(sg.Series))
271+
metrics.Resize(len(sg.Series), true)
272272
for seriesIndex, series := range sg.Series {
273273
metric := metrics.M[seriesIndex]
274274
if cap(metric.Labels) < len(series.LabelsNames) {
@@ -288,7 +288,7 @@ func DeserializeToSeriesGroup(sg *SeriesGroup, buf []byte) (*types.Metrics, *typ
288288
series.LabelsValues = series.LabelsValues[:0]
289289
}
290290
meta := &types.Metrics{}
291-
meta.Resize(len(sg.Metadata))
291+
meta.Resize(len(sg.Metadata), true)
292292
for seriesIndex, series := range sg.Metadata {
293293
m := meta.M[seriesIndex]
294294
if cap(m.Labels) < len(series.LabelsNames) {

types/v2/serialization_func.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func DeserializeToSeriesGroup(sg *SeriesGroup, buf []byte) (*types.Metrics, *typ
6060
return nil, nil, err
6161
}
6262
metrics := &types.Metrics{}
63-
metrics.Resize(len(sg.Series))
63+
metrics.Resize(len(sg.Series), true)
6464

6565
// Need to fill in the labels.
6666
for seriesIndex, series := range sg.Series {
@@ -92,7 +92,7 @@ func DeserializeToSeriesGroup(sg *SeriesGroup, buf []byte) (*types.Metrics, *typ
9292
}
9393

9494
metadata := &types.Metrics{}
95-
metadata.Resize(len(sg.Metadata))
95+
metadata.Resize(len(sg.Metadata), true)
9696
for seriesIndex, series := range sg.Metadata {
9797
meta := metadata.M[seriesIndex]
9898

0 commit comments

Comments
 (0)