Skip to content

Commit

Permalink
Fix issue where comparison operations between two vectors incorrectly…
Browse files Browse the repository at this point in the history
… fail with a conflict if multiple left series match the same right series and only one left point remains after applying the comparison
  • Loading branch information
charleskorn committed Dec 13, 2024
1 parent cad9fa0 commit d8b53a1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,17 +378,6 @@ func (b *OneToOneVectorVectorBinaryOperation) NextSeries(ctx context.Context) (t

thisSeries := b.remainingSeries[0]
b.remainingSeries = b.remainingSeries[1:]

allLeftSeries, err := b.leftBuffer.GetSeries(ctx, thisSeries.leftSeriesIndices)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

mergedLeftSide, err := b.mergeSingleSide(allLeftSeries, thisSeries.leftSeriesIndices, b.leftMetadata, "left")
if err != nil {
return types.InstantVectorSeriesData{}, err
}

rightSide := thisSeries.rightSide

if rightSide.rightSeriesIndices != nil {
Expand All @@ -402,25 +391,39 @@ func (b *OneToOneVectorVectorBinaryOperation) NextSeries(ctx context.Context) (t
rightSide.outputSeriesCount--
canMutateRightSide := rightSide.outputSeriesCount == 0

result, err := b.evaluator.computeResult(mergedLeftSide, rightSide.mergedData, true, canMutateRightSide)
allLeftSeries, err := b.leftBuffer.GetSeries(ctx, thisSeries.leftSeriesIndices)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

// If the right side matches to many output series, check for conflicts between those left side series.
if rightSide.leftSidePresence != nil {
seriesIdx := thisSeries.leftSeriesIndices[0] // FIXME: this isn't right, need to do this after applying early filtering
for i, leftSeries := range allLeftSeries {
isLastLeftSeries := i == len(allLeftSeries)-1

if err := b.updateLeftSidePresence(rightSide, result, seriesIdx); err != nil {
allLeftSeries[i], err = b.evaluator.computeResult(leftSeries, rightSide.mergedData, true, canMutateRightSide && isLastLeftSeries)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

if rightSide.outputSeriesCount == 0 {
types.IntSlicePool.Put(rightSide.leftSidePresence, b.MemoryConsumptionTracker)
// If the right side matches to many output series, check for conflicts between those left side series.
if rightSide.leftSidePresence != nil {
seriesIdx := thisSeries.leftSeriesIndices[i]

if err := b.updateLeftSidePresence(rightSide, allLeftSeries[i], seriesIdx); err != nil {
return types.InstantVectorSeriesData{}, err
}
}
}

return result, nil
mergedResult, err := b.mergeSingleSide(allLeftSeries, thisSeries.leftSeriesIndices, b.leftMetadata, "left")
if err != nil {
return types.InstantVectorSeriesData{}, err
}

if rightSide.leftSidePresence != nil && rightSide.outputSeriesCount == 0 {
types.IntSlicePool.Put(rightSide.leftSidePresence, b.MemoryConsumptionTracker)
}

return mergedResult, nil
}

func (b *OneToOneVectorVectorBinaryOperation) populateRightSide(ctx context.Context, rightSide *oneToOneBinaryOperationRightSide) error {
Expand Down
17 changes: 7 additions & 10 deletions pkg/streamingpromql/testdata/ours/binary_operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -1397,10 +1397,9 @@ load 6m
left_side_b{env="test", pod="a"} 5 6 7 8
right_side{env="test", pod="a"} 2 2 7 7

# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order.
#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side
# left_side_a{pod="a"} _ 2 _ _
# left_side_b{pod="a"} _ _ 7 _
eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side
left_side_a{pod="a"} _ 2 _ _
left_side_b{pod="a"} _ _ 7 _

eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool ignoring(env) right_side
expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation)
Expand All @@ -1416,9 +1415,8 @@ eval_fail range from 0 to 18m step 6m right_side == bool ignoring(env) {__name__
# left_side_b{pod="a"} _ _ 7 _
# but instead both engines drop the metric names in the output.
# This is accepted behaviour: https://github.com/prometheus/prometheus/issues/5326
# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order.
#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == on(pod) right_side
# {pod="a"} _ 2 7 _
eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == on(pod) right_side
{pod="a"} _ 2 7 _

eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool on(pod) right_side
expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation)
Expand Down Expand Up @@ -1458,9 +1456,8 @@ load 6m
left{pod="b"} 5 6 7 8
right 2 2 7 7

# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order.
# eval range from 0 to 18m step 6m left == ignoring(pod) right
# left _ 2 7 _
eval range from 0 to 18m step 6m left == ignoring(pod) right
left _ 2 7 _

eval_fail range from 0 to 18m step 6m left == ignoring(pod) group_right right
expected_fail_regexp found duplicate series for the match group .* on the left (hand-)?side of the operation
Expand Down

0 comments on commit d8b53a1

Please sign in to comment.