Skip to content

Commit

Permalink
ingester/client.QueryStreamResponse: Use memory pooling
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Jan 6, 2025
1 parent b1546d6 commit a79ef47
Show file tree
Hide file tree
Showing 30 changed files with 2,349 additions and 671 deletions.
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2712,7 +2712,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
}
defer func() {
for _, resp := range resps {
resp.FreeBuffer()
resp.Release()
}
}()

Expand Down
40 changes: 26 additions & 14 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6283,28 +6283,34 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest

if i.disableStreamingResponse || req.StreamingChunksBatchSize == 0 {
nonStreamingResponses = append(nonStreamingResponses, &client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
Chunkseries: []client.CustomTimeSeriesChunk{
{
Labels: ts.Labels,
Chunks: wireChunks,
TimeSeriesChunk: &client.TimeSeriesChunk{
Labels: ts.Labels,
Chunks: wireChunks,
},
},
},
})
} else {
streamingLabelResponses = append(streamingLabelResponses, &client.QueryStreamResponse{
StreamingSeries: []client.QueryStreamSeries{
StreamingSeries: []client.CustomQueryStreamSeries{
{
Labels: ts.Labels,
ChunkCount: int64(len(wireChunks)),
QueryStreamSeries: &client.QueryStreamSeries{
Labels: ts.Labels,
ChunkCount: int64(len(wireChunks)),
},
},
},
})

streamingChunkResponses = append(streamingChunkResponses, &client.QueryStreamResponse{
StreamingSeriesChunks: []client.QueryStreamSeriesChunks{
StreamingSeriesChunks: []client.CustomQueryStreamSeriesChunks{
{
SeriesIndex: uint64(seriesIndex),
Chunks: wireChunks,
QueryStreamSeriesChunks: &client.QueryStreamSeriesChunks{
SeriesIndex: uint64(seriesIndex),
Chunks: wireChunks,
},
},
},
})
Expand Down Expand Up @@ -6384,15 +6390,17 @@ func (i *mockIngester) QueryExemplars(ctx context.Context, req *client.ExemplarQ
}

if len(exemplars) > 0 {
res.Timeseries = append(res.Timeseries, mimirpb.TimeSeries{
Labels: series.Labels,
Exemplars: exemplars,
res.Timeseries = append(res.Timeseries, mimirpb.CustomTimeSeries{
TimeSeries: &mimirpb.TimeSeries{
Labels: series.Labels,
Exemplars: exemplars,
},
})
}
}

// Sort series by labels because the real ingester returns sorted ones.
slices.SortFunc(res.Timeseries, func(a, b mimirpb.TimeSeries) int {
slices.SortFunc(res.Timeseries, func(a, b mimirpb.CustomTimeSeries) int {
aKey := mimirpb.FromLabelAdaptersToKeyString(a.Labels)
bKey := mimirpb.FromLabelAdaptersToKeyString(b.Labels)
return strings.Compare(aKey, bKey)
Expand Down Expand Up @@ -6424,7 +6432,11 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
for _, matchers := range multiMatchers {
for _, ts := range i.timeseries {
if match(ts.Labels, matchers) {
response.Metric = append(response.Metric, &mimirpb.Metric{Labels: ts.Labels})
response.Metric = append(response.Metric, mimirpb.CustomMetric{
Metric: &mimirpb.Metric{
Labels: ts.Labels,
},
})
}
}
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, m
if err != nil {
return err
}
/* TODO: Fix me.
defer func() {
for _, r := range results {
r.FreeBuffer()
r.Release()
}
}()
*/

result = mergeExemplarQueryResponses(results)

Expand Down Expand Up @@ -180,7 +182,7 @@ func mergeExemplarSets(a, b []mimirpb.Exemplar) []mimirpb.Exemplar {

func mergeExemplarQueryResponses(results []*ingester_client.ExemplarQueryResponse) *ingester_client.ExemplarQueryResponse {
var keys []string
exemplarResults := make(map[string]mimirpb.TimeSeries)
exemplarResults := make(map[string]mimirpb.CustomTimeSeries)
for _, r := range results {
for _, ts := range r.Timeseries {
lbls := mimirpb.FromLabelAdaptersToKeyString(ts.Labels)
Expand All @@ -199,7 +201,7 @@ func mergeExemplarQueryResponses(results []*ingester_client.ExemplarQueryRespons
// Query results from each ingester were sorted, but are not necessarily still sorted after merging.
slices.Sort(keys)

result := make([]mimirpb.TimeSeries, len(exemplarResults))
result := make([]mimirpb.CustomTimeSeries, len(exemplarResults))
for i, k := range keys {
ts := exemplarResults[k]
for i, l := range ts.Labels {
Expand All @@ -220,8 +222,8 @@ func mergeExemplarQueryResponses(results []*ingester_client.ExemplarQueryRespons

type ingesterQueryResult struct {
// Why retain the batches rather than build a single slice? We don't need a single slice for each ingester, so building a single slice for each ingester is a waste of time.
chunkseriesBatches [][]ingester_client.TimeSeriesChunk
timeseriesBatches [][]mimirpb.TimeSeries
chunkseriesBatches [][]ingester_client.CustomTimeSeriesChunk
timeseriesBatches [][]mimirpb.CustomTimeSeries
streamingSeries seriesChunksStream
}

Expand Down Expand Up @@ -327,8 +329,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
queryMetrics.IngesterChunksTotal.Add(float64(totalChunks))
}()

hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{}
hashToTimeSeries := map[string]mimirpb.TimeSeries{}
hashToChunkseries := map[string]ingester_client.CustomTimeSeriesChunk{}
hashToTimeSeries := map[string]mimirpb.CustomTimeSeries{}

for _, res := range results {
// Accumulate any chunk series
Expand Down Expand Up @@ -370,8 +372,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

// Now turn the accumulated maps into slices.
resp := ingester_client.CombinedQueryStreamResponse{
Chunkseries: make([]ingester_client.TimeSeriesChunk, 0, len(hashToChunkseries)),
Timeseries: make([]mimirpb.TimeSeries, 0, len(hashToTimeSeries)),
Chunkseries: make([]ingester_client.CustomTimeSeriesChunk, 0, len(hashToChunkseries)),
Timeseries: make([]mimirpb.CustomTimeSeries, 0, len(hashToTimeSeries)),
StreamingSeries: mergeSeriesChunkStreams(results, d.estimatedIngestersPerSeries(replicationSets)),
}
for _, series := range hashToChunkseries {
Expand All @@ -395,7 +397,6 @@ func receiveResponse(stream ingester_client.Ingester_QueryStreamClient, streamin
if err != nil {
return 0, nil, false, err
}
defer resp.FreeBuffer()

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
Expand Down
Loading

0 comments on commit a79ef47

Please sign in to comment.