Skip to content

Commit

Permalink
Fix error handling in filterProfiles (#3338)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine authored Jun 5, 2024
1 parent e55d2de commit a839566
Showing 1 changed file with 20 additions and 18 deletions.
38 changes: 20 additions & 18 deletions pkg/phlaredb/filter_profiles_bidi.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ type filterResponse interface {
*ingestv1.MergeSpanProfileResponse
}

func rewriteEOFError(err error) error {
if errors.Is(err, io.EOF) {
return connect.NewError(connect.CodeCanceled, errors.New("client closed stream"))
}
return err
}

// filterProfiles merges and dedupe profiles from different iterators and allow filtering via a bidi stream.
func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterRequest](
ctx context.Context, profiles []iter.Iterator[Profile], batchProfileSize int, stream B,
Expand Down Expand Up @@ -130,10 +137,7 @@ func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterR
// read a batch of profiles and sends it.

if err != nil {
if errors.Is(err, io.EOF) {
return connect.NewError(connect.CodeCanceled, errors.New("client closed stream"))
}
return err
return rewriteEOFError(err)
}
sp.LogFields(otlog.String("msg", "batch sent to client"))

Expand All @@ -144,30 +148,28 @@ func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterR
switch s := BidiServerMerge[Res, Req](stream).(type) {
case BidiServerMerge[*ingestv1.MergeProfilesStacktracesResponse, *ingestv1.MergeProfilesStacktracesRequest]:
selectionResponse, err := s.Receive()
if err == nil {
selected = selectionResponse.Profiles
if err != nil {
return rewriteEOFError(err)
}
selected = selectionResponse.Profiles
case BidiServerMerge[*ingestv1.MergeProfilesLabelsResponse, *ingestv1.MergeProfilesLabelsRequest]:
selectionResponse, err := s.Receive()
if err == nil {
selected = selectionResponse.Profiles
if err != nil {
return rewriteEOFError(err)
}
selected = selectionResponse.Profiles
case BidiServerMerge[*ingestv1.MergeProfilesPprofResponse, *ingestv1.MergeProfilesPprofRequest]:
selectionResponse, err := s.Receive()
if err == nil {
selected = selectionResponse.Profiles
if err != nil {
return rewriteEOFError(err)
}
selected = selectionResponse.Profiles
case BidiServerMerge[*ingestv1.MergeSpanProfileResponse, *ingestv1.MergeSpanProfileRequest]:
selectionResponse, err := s.Receive()
if err == nil {
selected = selectionResponse.Profiles
}
}
if err != nil {
if errors.Is(err, io.EOF) {
return connect.NewError(connect.CodeCanceled, errors.New("client closed stream"))
if err != nil {
return rewriteEOFError(err)
}
return err
selected = selectionResponse.Profiles
}
sp.LogFields(otlog.String("msg", "selection received"))
for i, k := range selected {
Expand Down

0 comments on commit a839566

Please sign in to comment.