From 45bb1c71ace9642d0a90f8334f889e3515139952 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 13 Dec 2024 14:39:11 +1100 Subject: [PATCH 1/4] Add early filtering test case for group_right --- pkg/streamingpromql/testdata/ours/binary_operators.test | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/streamingpromql/testdata/ours/binary_operators.test b/pkg/streamingpromql/testdata/ours/binary_operators.test index cb33558432e..59894e3900c 100644 --- a/pkg/streamingpromql/testdata/ours/binary_operators.test +++ b/pkg/streamingpromql/testdata/ours/binary_operators.test @@ -1462,6 +1462,9 @@ load 6m # eval range from 0 to 18m step 6m left == ignoring(pod) right # left _ 2 7 _ +eval_fail range from 0 to 18m step 6m left == ignoring(pod) group_right right + expected_fail_regexp found duplicate series for the match group .* on the left (hand-)?side of the operation + clear # Same thing as above, but with no overlapping samples on left side. From 2d6b187f6092390636ae61f087db3039f0db52c0 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 13 Dec 2024 16:05:52 +1100 Subject: [PATCH 2/4] Fix issue where comparison operations without the bool modifier would return incorrect results if the left side contained series with different metric names --- .../operators/binops/binary_operation.go | 39 +++ .../grouped_vector_vector_binary_operation.go | 43 +--- ...e_to_one_vector_vector_binary_operation.go | 239 +++++++++++------- ...one_vector_vector_binary_operation_test.go | 100 ++++---- pkg/streamingpromql/query.go | 2 +- .../testdata/ours/binary_operators.test | 7 +- 6 files changed, 247 insertions(+), 183 deletions(-) diff --git a/pkg/streamingpromql/operators/binops/binary_operation.go b/pkg/streamingpromql/operators/binops/binary_operation.go index b499ad4f387..3bc3a82e2f6 100644 --- a/pkg/streamingpromql/operators/binops/binary_operation.go +++ b/pkg/streamingpromql/operators/binops/binary_operation.go @@ -5,9 +5,11 @@ package binops import ( "fmt" "slices" + "time" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" @@ -82,6 +84,43 @@ func groupLabelsFunc(vectorMatching parser.VectorMatching, op parser.ItemType, r } } +func formatConflictError( + firstConflictingSeriesIndex int, + secondConflictingSeriesIndex int, + description string, + ts int64, + sourceSeriesMetadata []types.SeriesMetadata, + side string, + vectorMatching parser.VectorMatching, + op parser.ItemType, + returnBool bool, +) error { + firstConflictingSeriesLabels := sourceSeriesMetadata[firstConflictingSeriesIndex].Labels + groupLabels := groupLabelsFunc(vectorMatching, op, returnBool)(firstConflictingSeriesLabels) + + if secondConflictingSeriesIndex == -1 { + return fmt.Errorf( + "found %s for the match group %s on the %s side of the operation at timestamp %s", + description, + groupLabels, + side, + timestamp.Time(ts).Format(time.RFC3339Nano), + ) + } + + secondConflictingSeriesLabels := sourceSeriesMetadata[secondConflictingSeriesIndex].Labels + + return fmt.Errorf( + "found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s", + description, + groupLabels, + side, + timestamp.Time(ts).Format(time.RFC3339Nano), + firstConflictingSeriesLabels, + secondConflictingSeriesLabels, + ) +} + // filterSeries returns data filtered based on the mask provided. // // mask is expected to contain one value for each time step in the query time range. diff --git a/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go b/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go index 206044c3051..2bdda7d501a 100644 --- a/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go +++ b/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go @@ -11,10 +11,8 @@ import ( "fmt" "slices" "sort" - "time" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -618,13 +616,13 @@ func (g *GroupedVectorVectorBinaryOperation) updateOneSidePresence(side *oneSide for _, p := range seriesData.Floats { if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 { - return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness()) + return formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool) } } for _, p := range seriesData.Histograms { if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 { - return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness()) + return formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool) } } } @@ -646,7 +644,8 @@ func (g *GroupedVectorVectorBinaryOperation) mergeOneSide(data []types.InstantVe } if conflict != nil { - return types.InstantVectorSeriesData{}, g.formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, g.oneSideMetadata, g.oneSideHandedness()) + err := formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool) + return types.InstantVectorSeriesData{}, err } return merged, nil @@ -689,40 +688,6 @@ func (g *GroupedVectorVectorBinaryOperation) mergeManySide(data []types.InstantV return merged, nil } -func (g *GroupedVectorVectorBinaryOperation) formatConflictError( - firstConflictingSeriesIndex int, - secondConflictingSeriesIndex int, - description string, - ts int64, - sourceSeriesMetadata []types.SeriesMetadata, - side string, -) error { - firstConflictingSeriesLabels := sourceSeriesMetadata[firstConflictingSeriesIndex].Labels - groupLabels := groupLabelsFunc(g.VectorMatching, g.Op, g.ReturnBool)(firstConflictingSeriesLabels) - - if secondConflictingSeriesIndex == -1 { - return fmt.Errorf( - "found %s for the match group %s on the %s side of the operation at timestamp %s", - description, - groupLabels, - side, - timestamp.Time(ts).Format(time.RFC3339Nano), - ) - } - - secondConflictingSeriesLabels := sourceSeriesMetadata[secondConflictingSeriesIndex].Labels - - return fmt.Errorf( - "found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s", - description, - groupLabels, - side, - timestamp.Time(ts).Format(time.RFC3339Nano), - firstConflictingSeriesLabels, - secondConflictingSeriesLabels, - ) -} - func (g *GroupedVectorVectorBinaryOperation) oneSideHandedness() string { switch g.VectorMatching.Card { case parser.CardOneToMany: diff --git a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go index 70f1204d7bf..8d99344a082 100644 --- a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go +++ b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go @@ -7,13 +7,10 @@ package binops import ( "context" - "fmt" "math" "sort" - "time" "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -48,13 +45,14 @@ type OneToOneVectorVectorBinaryOperation struct { expressionPosition posrange.PositionRange annotations *annotations.Annotations + timeRange types.QueryTimeRange } var _ types.InstantVectorOperator = &OneToOneVectorVectorBinaryOperation{} type oneToOneBinaryOperationOutputSeries struct { - leftSeriesIndices []int - rightSeriesIndices []int + leftSeriesIndices []int + rightSide *oneToOneBinaryOperationRightSide } // latestLeftSeries returns the index of the last series from the left source needed for this output series. @@ -66,9 +64,38 @@ func (s oneToOneBinaryOperationOutputSeries) latestLeftSeries() int { // latestRightSeries returns the index of the last series from the right source needed for this output series. // -// It assumes that rightSeriesIndices is sorted in ascending order. +// It assumes that rightSide.rightSeriesIndices is sorted in ascending order. func (s oneToOneBinaryOperationOutputSeries) latestRightSeries() int { - return s.rightSeriesIndices[len(s.rightSeriesIndices)-1] + return s.rightSide.rightSeriesIndices[len(s.rightSide.rightSeriesIndices)-1] +} + +type oneToOneBinaryOperationRightSide struct { + // If this right side is used for multiple output series and has not been populated, rightSeriesIndices will not be nil. + // If this right side has been populated, rightSeriesIndices will be nil. + rightSeriesIndices []int + mergedData types.InstantVectorSeriesData + + // The number of output series that use the same series from the right side. + // Will only be greater than 1 for comparison binary operations without the bool modifier + // where the input series on the left side have different metric names. + outputSeriesCount int + + // Time steps at which we've seen samples for any left side that matches with this right side. + // Each value is the index of the source series of the sample, or -1 if no sample has been seen for this time step yet. + leftSidePresence []int +} + +// updatePresence records the presence of a sample from the left side series with index seriesIdx at the timestamp with index timestampIdx. +// +// If there is already a sample present from another series at the same timestamp, updatePresence returns that series' index, or +// -1 if there was no sample present at the same timestamp from another series. +func (g *oneToOneBinaryOperationRightSide) updatePresence(timestampIdx int64, seriesIdx int) int { + if existing := g.leftSidePresence[timestampIdx]; existing != -1 { + return existing + } + + g.leftSidePresence[timestampIdx] = seriesIdx + return -1 } func NewOneToOneVectorVectorBinaryOperation( @@ -80,6 +107,7 @@ func NewOneToOneVectorVectorBinaryOperation( memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, + timeRange types.QueryTimeRange, ) (*OneToOneVectorVectorBinaryOperation, error) { e, err := newVectorVectorBinaryOperationEvaluator(op, returnBool, memoryConsumptionTracker, annotations, expressionPosition) if err != nil { @@ -97,6 +125,7 @@ func NewOneToOneVectorVectorBinaryOperation( evaluator: e, expressionPosition: expressionPosition, annotations: annotations, + timeRange: timeRange, } return b, nil @@ -184,60 +213,21 @@ func (b *OneToOneVectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Con func (b *OneToOneVectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*oneToOneBinaryOperationOutputSeries, []bool, []bool, error) { labelsFunc := groupLabelsFunc(b.VectorMatching, b.Op, b.ReturnBool) groupKeyFunc := vectorMatchingGroupKeyFunc(b.VectorMatching) - outputSeriesMap := map[string]*oneToOneBinaryOperationOutputSeries{} - - // Use the smaller side to populate the map of possible output series first. - // This should ensure we don't unnecessarily populate the output series map with series that will never match in most cases. - // (It's possible that all the series on the larger side all belong to the same group, but this is expected to be rare.) - smallerSide := b.leftMetadata - largerSide := b.rightMetadata - smallerSideIsLeftSide := len(b.leftMetadata) < len(b.rightMetadata) - - if !smallerSideIsLeftSide { - smallerSide = b.rightMetadata - largerSide = b.leftMetadata - } + rightSeriesGroupsMap := map[string]*oneToOneBinaryOperationRightSide{} - for idx, s := range smallerSide { + for idx, s := range b.rightMetadata { groupKey := groupKeyFunc(s.Labels) - series, exists := outputSeriesMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. + group, exists := rightSeriesGroupsMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. if !exists { - series = &oneToOneBinaryOperationOutputSeries{} - outputSeriesMap[string(groupKey)] = series - } - - if smallerSideIsLeftSide { - series.leftSeriesIndices = append(series.leftSeriesIndices, idx) - } else { - series.rightSeriesIndices = append(series.rightSeriesIndices, idx) - } - } - - for idx, s := range largerSide { - groupKey := groupKeyFunc(s.Labels) - - // Important: don't extract the string(...) call below - passing it directly allows us to avoid allocating it. - if series, exists := outputSeriesMap[string(groupKey)]; exists { - if smallerSideIsLeftSide { - // Currently iterating through right side. - series.rightSeriesIndices = append(series.rightSeriesIndices, idx) - } else { - series.leftSeriesIndices = append(series.leftSeriesIndices, idx) - } + group = &oneToOneBinaryOperationRightSide{} + rightSeriesGroupsMap[string(groupKey)] = group } - } - // Remove series that cannot produce samples. - for seriesLabels, outputSeries := range outputSeriesMap { - if len(outputSeries.leftSeriesIndices) == 0 || len(outputSeries.rightSeriesIndices) == 0 { - // No matching series on at least one side for this output series, so output series will have no samples. Remove it. - delete(outputSeriesMap, seriesLabels) - } + group.rightSeriesIndices = append(group.rightSeriesIndices, idx) } - allMetadata := types.GetSeriesMetadataSlice(len(outputSeriesMap)) - allSeries := make([]*oneToOneBinaryOperationOutputSeries, 0, len(outputSeriesMap)) + outputSeriesMap := map[string]*oneToOneBinaryOperationOutputSeries{} leftSeriesUsed, err := types.BoolSlicePool.Get(len(b.leftMetadata), b.MemoryConsumptionTracker) if err != nil { @@ -252,18 +242,45 @@ func (b *OneToOneVectorVectorBinaryOperation) computeOutputSeries() ([]types.Ser leftSeriesUsed = leftSeriesUsed[:len(b.leftMetadata)] rightSeriesUsed = rightSeriesUsed[:len(b.rightMetadata)] + for leftSeriesIndex, s := range b.leftMetadata { + outputSeriesLabels := labelsFunc(s.Labels) + outputSeries, exists := outputSeriesMap[outputSeriesLabels.String()] + + if !exists { + groupKey := groupKeyFunc(s.Labels) + + // Important: don't extract the string(...) call below - passing it directly allows us to avoid allocating it. + rightSide, exists := rightSeriesGroupsMap[string(groupKey)] + + if !exists { + // No matching series on the right side. + continue + } + + if rightSide.outputSeriesCount == 0 { + // First output series the right side has matched to. + for _, rightSeriesIndex := range rightSide.rightSeriesIndices { + rightSeriesUsed[rightSeriesIndex] = true + } + } + + rightSide.outputSeriesCount++ + + outputSeries = &oneToOneBinaryOperationOutputSeries{rightSide: rightSide} + outputSeriesMap[outputSeriesLabels.String()] = outputSeries + } + + outputSeries.leftSeriesIndices = append(outputSeries.leftSeriesIndices, leftSeriesIndex) + leftSeriesUsed[leftSeriesIndex] = true + } + + allMetadata := types.GetSeriesMetadataSlice(len(outputSeriesMap)) + allSeries := make([]*oneToOneBinaryOperationOutputSeries, 0, len(outputSeriesMap)) + for _, outputSeries := range outputSeriesMap { firstSeriesLabels := b.leftMetadata[outputSeries.leftSeriesIndices[0]].Labels allMetadata = append(allMetadata, types.SeriesMetadata{Labels: labelsFunc(firstSeriesLabels)}) allSeries = append(allSeries, outputSeries) - - for _, leftSeriesIndex := range outputSeries.leftSeriesIndices { - leftSeriesUsed[leftSeriesIndex] = true - } - - for _, rightSeriesIndex := range outputSeries.rightSeriesIndices { - rightSeriesUsed[rightSeriesIndex] = true - } } return allMetadata, allSeries, leftSeriesUsed, rightSeriesUsed, nil @@ -361,17 +378,84 @@ func (b *OneToOneVectorVectorBinaryOperation) NextSeries(ctx context.Context) (t return types.InstantVectorSeriesData{}, err } - allRightSeries, err := b.rightBuffer.GetSeries(ctx, thisSeries.rightSeriesIndices) + rightSide := thisSeries.rightSide + + if rightSide.rightSeriesIndices != nil { + // Right side hasn't been populated yet. + if err := b.populateRightSide(ctx, rightSide); err != nil { + return types.InstantVectorSeriesData{}, err + } + } + + // We don't need to return thisSeries.rightSide.mergedData here - computeResult will return it below if this is the last output series that references this right side. + rightSide.outputSeriesCount-- + canMutateRightSide := rightSide.outputSeriesCount == 0 + + result, err := b.evaluator.computeResult(mergedLeftSide, rightSide.mergedData, true, canMutateRightSide) if err != nil { return types.InstantVectorSeriesData{}, err } - mergedRightSide, err := b.mergeSingleSide(allRightSeries, thisSeries.rightSeriesIndices, b.rightMetadata, "right") + // If the right side matches to many output series, check for conflicts between those left side series. + if rightSide.leftSidePresence != nil { + seriesIdx := thisSeries.leftSeriesIndices[0] // FIXME: this isn't right, need to do this after applying early filtering + + if err := b.updateLeftSidePresence(rightSide, result, seriesIdx); err != nil { + return types.InstantVectorSeriesData{}, err + } + + if rightSide.outputSeriesCount == 0 { + types.IntSlicePool.Put(rightSide.leftSidePresence, b.MemoryConsumptionTracker) + } + } + + return result, nil +} + +func (b *OneToOneVectorVectorBinaryOperation) populateRightSide(ctx context.Context, rightSide *oneToOneBinaryOperationRightSide) error { + allRightSeries, err := b.rightBuffer.GetSeries(ctx, rightSide.rightSeriesIndices) + if err != nil { + return err + } + + rightSide.mergedData, err = b.mergeSingleSide(allRightSeries, rightSide.rightSeriesIndices, b.rightMetadata, "right") if err != nil { - return types.InstantVectorSeriesData{}, err + return err + } + + if rightSide.outputSeriesCount > 1 { + rightSide.leftSidePresence, err = types.IntSlicePool.Get(b.timeRange.StepCount, b.MemoryConsumptionTracker) + if err != nil { + return err + } + + rightSide.leftSidePresence = rightSide.leftSidePresence[:b.timeRange.StepCount] + + for i := range rightSide.leftSidePresence { + rightSide.leftSidePresence[i] = -1 + } + } + + // Signal that the right side has been populated. + rightSide.rightSeriesIndices = nil + + return nil +} + +func (b *OneToOneVectorVectorBinaryOperation) updateLeftSidePresence(rightSide *oneToOneBinaryOperationRightSide, leftSideData types.InstantVectorSeriesData, leftSideSeriesIdx int) error { + for _, p := range leftSideData.Floats { + if otherSeriesIdx := rightSide.updatePresence(b.timeRange.PointIndex(p.T), leftSideSeriesIdx); otherSeriesIdx != -1 { + return formatConflictError(otherSeriesIdx, leftSideSeriesIdx, "duplicate series", p.T, b.leftMetadata, "left", b.VectorMatching, b.Op, b.ReturnBool) + } + } + + for _, p := range leftSideData.Histograms { + if otherSeriesIdx := rightSide.updatePresence(b.timeRange.PointIndex(p.T), leftSideSeriesIdx); otherSeriesIdx != -1 { + return formatConflictError(otherSeriesIdx, leftSideSeriesIdx, "duplicate series", p.T, b.leftMetadata, "left", b.VectorMatching, b.Op, b.ReturnBool) + } } - return b.evaluator.computeResult(mergedLeftSide, mergedRightSide, true, true) + return nil } // mergeSingleSide exists to handle the case where one side of an output series has different source series at different time steps. @@ -402,30 +486,7 @@ func (b *OneToOneVectorVectorBinaryOperation) mergeSingleSide(data []types.Insta } func (b *OneToOneVectorVectorBinaryOperation) mergeConflictToError(conflict *operators.MergeConflict, sourceSeriesMetadata []types.SeriesMetadata, side string) error { - firstConflictingSeriesLabels := sourceSeriesMetadata[conflict.FirstConflictingSeriesIndex].Labels - groupLabels := groupLabelsFunc(b.VectorMatching, b.Op, b.ReturnBool)(firstConflictingSeriesLabels) - - if conflict.SecondConflictingSeriesIndex == -1 { - return fmt.Errorf( - "found %s for the match group %s on the %s side of the operation at timestamp %s", - conflict.Description, - groupLabels, - side, - timestamp.Time(conflict.Timestamp).Format(time.RFC3339Nano), - ) - } - - secondConflictingSeriesLabels := sourceSeriesMetadata[conflict.SecondConflictingSeriesIndex].Labels - - return fmt.Errorf( - "found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s", - conflict.Description, - groupLabels, - side, - timestamp.Time(conflict.Timestamp).Format(time.RFC3339Nano), - firstConflictingSeriesLabels, - secondConflictingSeriesLabels, - ) + return formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, sourceSeriesMetadata, side, b.VectorMatching, b.Op, b.ReturnBool) } func (b *OneToOneVectorVectorBinaryOperation) Close() { diff --git a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go index bd697398b3a..0559f775f23 100644 --- a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go +++ b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go @@ -230,8 +230,8 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "single output series": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{4}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{4}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -241,12 +241,12 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "two output series, both with one input series, read from both sides in same order and already sorted correctly": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, }, @@ -256,12 +256,12 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "two output series, both with one input series, read from both sides in same order but sorted incorrectly": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -271,12 +271,12 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "two output series, both with one input series, read from both sides in different order": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -286,12 +286,12 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "two output series, both with multiple input series": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1, 2}, - rightSeriesIndices: []int{0, 3}, + leftSeriesIndices: []int{1, 2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{0, 3}}, }, { - leftSeriesIndices: []int{0, 3}, - rightSeriesIndices: []int{1, 2}, + leftSeriesIndices: []int{0, 3}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1, 2}}, }, }, @@ -301,16 +301,16 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series, both with one input series, read from both sides in same order and already sorted correctly": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{3}, - rightSeriesIndices: []int{3}, + leftSeriesIndices: []int{3}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{3}}, }, }, @@ -320,16 +320,16 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series, both with one input series, read from both sides in same order but sorted incorrectly": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{3}, - rightSeriesIndices: []int{3}, + leftSeriesIndices: []int{3}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{3}}, }, { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -339,16 +339,16 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series, both with one input series, read from both sides in different order": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{3}, - rightSeriesIndices: []int{3}, + leftSeriesIndices: []int{3}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{3}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -358,16 +358,16 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series, with multiple input series each": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{4, 5, 10}, - rightSeriesIndices: []int{2, 20}, + leftSeriesIndices: []int{4, 5, 10}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2, 20}}, }, { - leftSeriesIndices: []int{2, 4, 15}, - rightSeriesIndices: []int{3, 5, 50}, + leftSeriesIndices: []int{2, 4, 15}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{3, 5, 50}}, }, { - leftSeriesIndices: []int{3, 1}, - rightSeriesIndices: []int{1, 40}, + leftSeriesIndices: []int{3, 1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1, 40}}, }, }, @@ -377,20 +377,20 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series which depend on the same input series": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 3d9be241537..b553922e10c 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -273,7 +273,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types case parser.CardOneToMany, parser.CardManyToOne: return binops.NewGroupedVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange(), timeRange) case parser.CardOneToOne: - return binops.NewOneToOneVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) + return binops.NewOneToOneVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange(), timeRange) default: return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with %v matching for '%v'", e.VectorMatching.Card, e.Op)) } diff --git a/pkg/streamingpromql/testdata/ours/binary_operators.test b/pkg/streamingpromql/testdata/ours/binary_operators.test index 59894e3900c..a47550ed538 100644 --- a/pkg/streamingpromql/testdata/ours/binary_operators.test +++ b/pkg/streamingpromql/testdata/ours/binary_operators.test @@ -1473,10 +1473,9 @@ load 6m left_side_b{env="test", pod="a"} _ _ 7 8 right_side{env="test", pod="a"} 2 2 7 7 -# FIXME: MQE currently does not correctly handle this case. -#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side -# left_side_a{pod="a"} _ 2 _ _ -# left_side_b{pod="a"} _ _ 7 _ +eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side + left_side_a{pod="a"} _ 2 _ _ + left_side_b{pod="a"} _ _ 7 _ eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool ignoring(env) right_side {pod="a"} 0 1 1 0 From a2fef86f8a8c29121a0311310218e54ef45de08b Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 13 Dec 2024 16:18:35 +1100 Subject: [PATCH 3/4] Avoid expensive `labels.Labels.String()` call --- ...e_to_one_vector_vector_binary_operation.go | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go index 8d99344a082..2f1ad2ded50 100644 --- a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go +++ b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go @@ -11,6 +11,7 @@ import ( "sort" "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -98,6 +99,11 @@ func (g *oneToOneBinaryOperationRightSide) updatePresence(timestampIdx int64, se return -1 } +type oneToOneBinaryOperationOutputSeriesWithLabels struct { + labels labels.Labels + series *oneToOneBinaryOperationOutputSeries +} + func NewOneToOneVectorVectorBinaryOperation( left types.InstantVectorOperator, right types.InstantVectorOperator, @@ -227,7 +233,7 @@ func (b *OneToOneVectorVectorBinaryOperation) computeOutputSeries() ([]types.Ser group.rightSeriesIndices = append(group.rightSeriesIndices, idx) } - outputSeriesMap := map[string]*oneToOneBinaryOperationOutputSeries{} + outputSeriesMap := map[string]oneToOneBinaryOperationOutputSeriesWithLabels{} leftSeriesUsed, err := types.BoolSlicePool.Get(len(b.leftMetadata), b.MemoryConsumptionTracker) if err != nil { @@ -241,10 +247,12 @@ func (b *OneToOneVectorVectorBinaryOperation) computeOutputSeries() ([]types.Ser leftSeriesUsed = leftSeriesUsed[:len(b.leftMetadata)] rightSeriesUsed = rightSeriesUsed[:len(b.rightMetadata)] + outputSeriesLabelsBytes := make([]byte, 0, 1024) for leftSeriesIndex, s := range b.leftMetadata { outputSeriesLabels := labelsFunc(s.Labels) - outputSeries, exists := outputSeriesMap[outputSeriesLabels.String()] + outputSeriesLabelsBytes = outputSeriesLabels.Bytes(outputSeriesLabelsBytes) // FIXME: it'd be better if we could just get the underlying byte slice without copying here + outputSeries, exists := outputSeriesMap[string(outputSeriesLabelsBytes)] if !exists { groupKey := groupKeyFunc(s.Labels) @@ -266,11 +274,15 @@ func (b *OneToOneVectorVectorBinaryOperation) computeOutputSeries() ([]types.Ser rightSide.outputSeriesCount++ - outputSeries = &oneToOneBinaryOperationOutputSeries{rightSide: rightSide} - outputSeriesMap[outputSeriesLabels.String()] = outputSeries + outputSeries = oneToOneBinaryOperationOutputSeriesWithLabels{ + labels: outputSeriesLabels, + series: &oneToOneBinaryOperationOutputSeries{rightSide: rightSide}, + } + + outputSeriesMap[string(outputSeriesLabelsBytes)] = outputSeries } - outputSeries.leftSeriesIndices = append(outputSeries.leftSeriesIndices, leftSeriesIndex) + outputSeries.series.leftSeriesIndices = append(outputSeries.series.leftSeriesIndices, leftSeriesIndex) leftSeriesUsed[leftSeriesIndex] = true } @@ -278,9 +290,8 @@ func (b *OneToOneVectorVectorBinaryOperation) computeOutputSeries() ([]types.Ser allSeries := make([]*oneToOneBinaryOperationOutputSeries, 0, len(outputSeriesMap)) for _, outputSeries := range outputSeriesMap { - firstSeriesLabels := b.leftMetadata[outputSeries.leftSeriesIndices[0]].Labels - allMetadata = append(allMetadata, types.SeriesMetadata{Labels: labelsFunc(firstSeriesLabels)}) - allSeries = append(allSeries, outputSeries) + allMetadata = append(allMetadata, types.SeriesMetadata{Labels: outputSeries.labels}) + allSeries = append(allSeries, outputSeries.series) } return allMetadata, allSeries, leftSeriesUsed, rightSeriesUsed, nil From 73b9e18c6fcc66ecdd083ba66c1b5860b76fa9b7 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 13 Dec 2024 16:34:46 +1100 Subject: [PATCH 4/4] Fix issue where comparison operations between two vectors incorrectly fail with a conflict if multiple left series match the same right series and only one left point remains after applying the comparison --- ...e_to_one_vector_vector_binary_operation.go | 41 ++++++++++--------- .../testdata/ours/binary_operators.test | 17 ++++---- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go index 2f1ad2ded50..5d7fd7873ff 100644 --- a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go +++ b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go @@ -378,17 +378,6 @@ func (b *OneToOneVectorVectorBinaryOperation) NextSeries(ctx context.Context) (t thisSeries := b.remainingSeries[0] b.remainingSeries = b.remainingSeries[1:] - - allLeftSeries, err := b.leftBuffer.GetSeries(ctx, thisSeries.leftSeriesIndices) - if err != nil { - return types.InstantVectorSeriesData{}, err - } - - mergedLeftSide, err := b.mergeSingleSide(allLeftSeries, thisSeries.leftSeriesIndices, b.leftMetadata, "left") - if err != nil { - return types.InstantVectorSeriesData{}, err - } - rightSide := thisSeries.rightSide if rightSide.rightSeriesIndices != nil { @@ -402,25 +391,39 @@ func (b *OneToOneVectorVectorBinaryOperation) NextSeries(ctx context.Context) (t rightSide.outputSeriesCount-- canMutateRightSide := rightSide.outputSeriesCount == 0 - result, err := b.evaluator.computeResult(mergedLeftSide, rightSide.mergedData, true, canMutateRightSide) + allLeftSeries, err := b.leftBuffer.GetSeries(ctx, thisSeries.leftSeriesIndices) if err != nil { return types.InstantVectorSeriesData{}, err } - // If the right side matches to many output series, check for conflicts between those left side series. - if rightSide.leftSidePresence != nil { - seriesIdx := thisSeries.leftSeriesIndices[0] // FIXME: this isn't right, need to do this after applying early filtering + for i, leftSeries := range allLeftSeries { + isLastLeftSeries := i == len(allLeftSeries)-1 - if err := b.updateLeftSidePresence(rightSide, result, seriesIdx); err != nil { + allLeftSeries[i], err = b.evaluator.computeResult(leftSeries, rightSide.mergedData, true, canMutateRightSide && isLastLeftSeries) + if err != nil { return types.InstantVectorSeriesData{}, err } - if rightSide.outputSeriesCount == 0 { - types.IntSlicePool.Put(rightSide.leftSidePresence, b.MemoryConsumptionTracker) + // If the right side matches to many output series, check for conflicts between those left side series. + if rightSide.leftSidePresence != nil { + seriesIdx := thisSeries.leftSeriesIndices[i] + + if err := b.updateLeftSidePresence(rightSide, allLeftSeries[i], seriesIdx); err != nil { + return types.InstantVectorSeriesData{}, err + } } } - return result, nil + mergedResult, err := b.mergeSingleSide(allLeftSeries, thisSeries.leftSeriesIndices, b.leftMetadata, "left") + if err != nil { + return types.InstantVectorSeriesData{}, err + } + + if rightSide.leftSidePresence != nil && rightSide.outputSeriesCount == 0 { + types.IntSlicePool.Put(rightSide.leftSidePresence, b.MemoryConsumptionTracker) + } + + return mergedResult, nil } func (b *OneToOneVectorVectorBinaryOperation) populateRightSide(ctx context.Context, rightSide *oneToOneBinaryOperationRightSide) error { diff --git a/pkg/streamingpromql/testdata/ours/binary_operators.test b/pkg/streamingpromql/testdata/ours/binary_operators.test index a47550ed538..f5725eb0399 100644 --- a/pkg/streamingpromql/testdata/ours/binary_operators.test +++ b/pkg/streamingpromql/testdata/ours/binary_operators.test @@ -1397,10 +1397,9 @@ load 6m left_side_b{env="test", pod="a"} 5 6 7 8 right_side{env="test", pod="a"} 2 2 7 7 -# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order. -#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side -# left_side_a{pod="a"} _ 2 _ _ -# left_side_b{pod="a"} _ _ 7 _ +eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side + left_side_a{pod="a"} _ 2 _ _ + left_side_b{pod="a"} _ _ 7 _ eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool ignoring(env) right_side expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) @@ -1416,9 +1415,8 @@ eval_fail range from 0 to 18m step 6m right_side == bool ignoring(env) {__name__ # left_side_b{pod="a"} _ _ 7 _ # but instead both engines drop the metric names in the output. # This is accepted behaviour: https://github.com/prometheus/prometheus/issues/5326 -# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order. -#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == on(pod) right_side -# {pod="a"} _ 2 7 _ +eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == on(pod) right_side + {pod="a"} _ 2 7 _ eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool on(pod) right_side expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) @@ -1458,9 +1456,8 @@ load 6m left{pod="b"} 5 6 7 8 right 2 2 7 7 -# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order. -# eval range from 0 to 18m step 6m left == ignoring(pod) right -# left _ 2 7 _ +eval range from 0 to 18m step 6m left == ignoring(pod) right + left _ 2 7 _ eval_fail range from 0 to 18m step 6m left == ignoring(pod) group_right right expected_fail_regexp found duplicate series for the match group .* on the left (hand-)?side of the operation