Skip to content

Commit

Permalink
Fix buffer freeing in blocksStoreQuerier
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Dec 17, 2024
1 parent 2b1d501 commit 6be7edd
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
if err != nil {
return translateReceivedError(err)
}
defer msg.FreeBuffer()

estimate := msg.GetStreamingChunksEstimate()
msg.FreeBuffer()
if estimate == nil {
return fmt.Errorf("expected to receive chunks estimate, but got message of type %T", msg.Result)
}
Expand Down
177 changes: 101 additions & 76 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,93 +793,32 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
return gCtx.Err()
}

resp, err := stream.Recv()
var err error
var isEOS bool
var shouldRetry bool
mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, isEOS, shouldRetry, err = q.receiveMessage(
c, stream, queryLimiter, mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched,
)
if errors.Is(err, io.EOF) {
util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
break
}
if err != nil {
if shouldRetry(err) {
level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err)
return nil
}

return err
}
defer resp.FreeBuffer()

// Response may either contain series, streaming series, warning or hints.
if s := resp.GetSeries(); s != nil {
// Take a safe copy of every label.
for i, l := range s.Labels {
s.Labels[i].Name = strings.Clone(l.Name)
s.Labels[i].Value = strings.Clone(l.Value)
}
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil {
return err
}

chunksCount, chunksSize := countChunksAndBytes(s)
q.metrics.chunksTotal.Add(float64(chunksCount))
if err := queryLimiter.AddChunkBytes(chunksSize); err != nil {
return err
}
if err := queryLimiter.AddChunks(chunksCount); err != nil {
return err
}
if err := queryLimiter.AddEstimatedChunks(chunksCount); err != nil {
return err
}
}

if w := resp.GetWarning(); w != "" {
myWarnings.Add(errors.New(w))
}

if h := resp.GetHints(); h != nil {
hints := hintspb.SeriesResponseHints{}
if err := types.UnmarshalAny(h, &hints); err != nil {
return errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress())
}

ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks)
if err != nil {
return errors.Wrapf(err, "failed to parse queried block IDs from received hints")
}

myQueriedBlocks = append(myQueriedBlocks, ids...)
}

if s := resp.GetStats(); s != nil {
indexBytesFetched += s.FetchedIndexBytes
if shouldRetry {
level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err)
return nil
}

if ss := resp.GetStreamingSeries(); ss != nil {
myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series))

for _, s := range ss.Series {
// Add series fingerprint to query limiter; will return error if we are over the limit
l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels)

if limitErr := queryLimiter.AddSeries(l); limitErr != nil {
return limitErr
}

myStreamingSeriesLabels = append(myStreamingSeriesLabels, l)
if isEOS {
// If we aren't expecting any series from this stream, close it now.
if len(myStreamingSeriesLabels) == 0 {
util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
}

if ss.IsEndOfSeriesStream {
// If we aren't expecting any series from this stream, close it now.
if len(myStreamingSeriesLabels) == 0 {
util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
}

// We expect "end of stream" to be sent after the hints and the stats have been sent, so we can break out of the loop now.
break
}
// We expect "end of stream" to be sent after the hints and the stats have been sent, so we can break out of the loop now.
break
}
}

Expand Down Expand Up @@ -987,6 +926,92 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
return seriesSets, queriedBlocks, warnings, startStreamingChunks, estimateChunks, nil //nolint:govet // It's OK to return without cancelling reqCtx, see comment above.
}

func (q *blocksStoreQuerier) receiveMessage(c BlocksStoreClient, stream storegatewaypb.StoreGateway_SeriesClient, queryLimiter *limiter.QueryLimiter, mySeries []*storepb.Series, myWarnings annotations.Annotations, myQueriedBlocks []ulid.ULID, myStreamingSeriesLabels []labels.Labels, indexBytesFetched uint64) ([]*storepb.Series, annotations.Annotations, []ulid.ULID, []labels.Labels, uint64, bool, bool, error) {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}

if shouldRetry(err) {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, true, nil
}

return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
defer resp.FreeBuffer()

// Response may either contain series, streaming series, warning or hints.
if s := resp.GetSeries(); s != nil {
// Take a safe copy of every label.
for i, l := range s.Labels {
s.Labels[i].Name = strings.Clone(l.Name)
s.Labels[i].Value = strings.Clone(l.Value)
}
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}

chunksCount, chunksSize := countChunksAndBytes(s)
q.metrics.chunksTotal.Add(float64(chunksCount))
if err := queryLimiter.AddChunkBytes(chunksSize); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
if err := queryLimiter.AddChunks(chunksCount); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
if err := queryLimiter.AddEstimatedChunks(chunksCount); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
}

if w := resp.GetWarning(); w != "" {
myWarnings.Add(errors.New(w))
}

if h := resp.GetHints(); h != nil {
hints := hintspb.SeriesResponseHints{}
if err := types.UnmarshalAny(h, &hints); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress())
}

ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks)
if err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, errors.Wrapf(err, "failed to parse queried block IDs from received hints")
}

myQueriedBlocks = append(myQueriedBlocks, ids...)
}

if s := resp.GetStats(); s != nil {
indexBytesFetched += s.FetchedIndexBytes
}

if ss := resp.GetStreamingSeries(); ss != nil {
myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series))

for _, s := range ss.Series {
l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels)

// Add series fingerprint to query limiter; will return error if we are over the limit
if limitErr := queryLimiter.AddSeries(l); limitErr != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, limitErr
}

myStreamingSeriesLabels = append(myStreamingSeriesLabels, l)
}

if ss.IsEndOfSeriesStream {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, true, false, nil
}
}

return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, nil
}

func shouldRetry(err error) bool {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
Expand Down

0 comments on commit 6be7edd

Please sign in to comment.