From a8395663e5599012b938563d6c25504d90b2ad32 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Wed, 5 Jun 2024 11:45:08 +0100 Subject: [PATCH] Fix error handling in filterProfiles (#3338) --- pkg/phlaredb/filter_profiles_bidi.go | 38 +++++++++++++++------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/pkg/phlaredb/filter_profiles_bidi.go b/pkg/phlaredb/filter_profiles_bidi.go index 870ad05393..7e9e77d3f7 100644 --- a/pkg/phlaredb/filter_profiles_bidi.go +++ b/pkg/phlaredb/filter_profiles_bidi.go @@ -57,6 +57,13 @@ type filterResponse interface { *ingestv1.MergeSpanProfileResponse } +func rewriteEOFError(err error) error { + if errors.Is(err, io.EOF) { + return connect.NewError(connect.CodeCanceled, errors.New("client closed stream")) + } + return err +} + // filterProfiles merges and dedupe profiles from different iterators and allow filtering via a bidi stream. func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterRequest]( ctx context.Context, profiles []iter.Iterator[Profile], batchProfileSize int, stream B, @@ -130,10 +137,7 @@ func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterR // read a batch of profiles and sends it. if err != nil { - if errors.Is(err, io.EOF) { - return connect.NewError(connect.CodeCanceled, errors.New("client closed stream")) - } - return err + return rewriteEOFError(err) } sp.LogFields(otlog.String("msg", "batch sent to client")) @@ -144,30 +148,28 @@ func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterR switch s := BidiServerMerge[Res, Req](stream).(type) { case BidiServerMerge[*ingestv1.MergeProfilesStacktracesResponse, *ingestv1.MergeProfilesStacktracesRequest]: selectionResponse, err := s.Receive() - if err == nil { - selected = selectionResponse.Profiles + if err != nil { + return rewriteEOFError(err) } + selected = selectionResponse.Profiles case BidiServerMerge[*ingestv1.MergeProfilesLabelsResponse, *ingestv1.MergeProfilesLabelsRequest]: selectionResponse, err := s.Receive() - if err == nil { - selected = selectionResponse.Profiles + if err != nil { + return rewriteEOFError(err) } + selected = selectionResponse.Profiles case BidiServerMerge[*ingestv1.MergeProfilesPprofResponse, *ingestv1.MergeProfilesPprofRequest]: selectionResponse, err := s.Receive() - if err == nil { - selected = selectionResponse.Profiles + if err != nil { + return rewriteEOFError(err) } + selected = selectionResponse.Profiles case BidiServerMerge[*ingestv1.MergeSpanProfileResponse, *ingestv1.MergeSpanProfileRequest]: selectionResponse, err := s.Receive() - if err == nil { - selected = selectionResponse.Profiles - } - } - if err != nil { - if errors.Is(err, io.EOF) { - return connect.NewError(connect.CodeCanceled, errors.New("client closed stream")) + if err != nil { + return rewriteEOFError(err) } - return err + selected = selectionResponse.Profiles } sp.LogFields(otlog.String("msg", "selection received")) for i, k := range selected {