Skip to content

Commit

Permalink
Druid Explore Exactify mode (#4397)
Browse files Browse the repository at this point in the history
* Druid Exactify

* Druid Exactify

* Druid Exactify

* Druid Exactify

* Druid Exactify

---------

Co-authored-by: Egor Ryashin <[email protected]>
  • Loading branch information
2 people authored and nishantmonu51 committed Mar 25, 2024
1 parent 5d54feb commit c11f545
Showing 1 changed file with 122 additions and 23 deletions.
145 changes: 122 additions & 23 deletions runtime/queries/metricsview_comparison_toplist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/expressionpb"
"github.com/rilldata/rill/runtime/pkg/pbutil"
"google.golang.org/protobuf/types/known/structpb"

Expand Down Expand Up @@ -112,13 +113,97 @@ func (q *MetricsViewComparison) Resolve(ctx context.Context, rt *runtime.Runtime
return err
}

// comparison toplist
if !isTimeRangeNil(q.ComparisonTimeRange) {
// execute toplist for base and get dim list
// create and add filter and execute comprison toplist
// remove strict limits in comp toplist sql
if drivers.DialectDruid != olap.Dialect() || q.Exact {
return q.executeComparisonToplist(ctx, olap, mv, priority, security)
}

// Druid-based `exactify` approach:
// 1. The first query fetch topN dimensions.
// 2. The second query fetches topN filtered by the colleted dimensions.
// The dimension filter contrains topN table avoiding approximation in measures (due to mearging multiple topN Druid results from different nodes).
// Optimizations:
// * the first query fetches only sorted dimensions
// * the second query isn't run if the topN already less than the limit
originalMeasures := q.removeNoSortMeasures()

if q.isBase() || q.isDeltaComparison() {
err = q.executeToplist(ctx, olap, mv, priority, security)
if err != nil {
return err
}
} else {
ttr := q.TimeRange
q.TimeRange = q.ComparisonTimeRange
err = q.executeToplist(ctx, olap, mv, priority, security)
if err != nil {
return err
}

q.TimeRange = ttr
}

q.addDimsAsFilter()
q.Measures = originalMeasures
return q.executeComparisonToplist(ctx, olap, mv, priority, security)
}

// general toplist
if drivers.DialectDruid != olap.Dialect() || q.Exact {
return q.executeToplist(ctx, olap, mv, priority, security)
}

// Druid-based `exactify` approach (see comments above)
originalMeasures := q.Measures
if len(q.Measures) >= 5 {
originalMeasures = q.removeNoSortMeasures()
}

err = q.executeToplist(ctx, olap, mv, priority, security)
if err != nil {
return err
}

if len(q.Result.Rows) < int(q.Limit) && len(q.Measures) == len(originalMeasures) {
return nil
}

q.addDimsAsFilter()

q.Measures = originalMeasures
return q.executeToplist(ctx, olap, mv, priority, security)
}

func (q *MetricsViewComparison) removeNoSortMeasures() []*runtimev1.MetricsViewAggregationMeasure {
measures := q.Measures
sortMeasures := make([]*runtimev1.MetricsViewAggregationMeasure, 0, len(q.Sort))
for _, m := range q.Measures {
for _, s := range q.Sort {
if s.Name == m.Name {
sortMeasures = append(sortMeasures, m)
}
}
}
q.Measures = sortMeasures
return measures
}

func (q *MetricsViewComparison) addDimsAsFilter() {
inExpressions := make([]*runtimev1.Expression, 0, len(q.Result.Rows))
for _, r := range q.Result.Rows {
inExpressions = append(inExpressions, expressionpb.Value(r.DimensionValue))
}
if q.Where != nil {
q.Where = expressionpb.And([]*runtimev1.Expression{q.Where, expressionpb.In(expressionpb.Identifier(q.DimensionName), inExpressions)})
} else {
q.Where = expressionpb.In(expressionpb.Identifier(q.DimensionName), inExpressions)
}
}

func (q *MetricsViewComparison) calculateMeasuresMeta() error {
compare := !isTimeRangeNil(q.ComparisonTimeRange)

Expand Down Expand Up @@ -707,12 +792,8 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M
}

limitClause := ""
twiceTheLimitClause := ""
if q.Limit > 0 {
limitClause = fmt.Sprintf(" LIMIT %d", q.Limit)
twiceTheLimitClause = fmt.Sprintf(" LIMIT %d", q.Limit*2)
} else if q.Limit == 0 {
twiceTheLimitClause = fmt.Sprintf(" LIMIT %d", 100_000) // use Druid limit
}

baseLimitClause := ""
Expand All @@ -723,7 +804,7 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M
deltaComparison := q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_ABS_DELTA ||
q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_REL_DELTA

approximationLimit := q.Limit
approximationLimit := int(q.Limit)
if q.Limit != 0 && q.Limit < 100 && deltaComparison {
approximationLimit = 100
}
Expand Down Expand Up @@ -779,23 +860,23 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M
// this leads to ambiguity whether it applies to the base.measure ot comparison.measure.
// to keep the clause builder consistent we add an outer query here.
sql = fmt.Sprintf(`
SELECT * from (
SELECT COALESCE(base.%[2]s, comparison.%[2]s) AS %[10]s, %[9]s FROM
(
SELECT %[1]s FROM %[3]s %[14]s WHERE %[4]s GROUP BY 1 %[12]s
) base
%[11]s JOIN
(
SELECT %[16]s FROM %[3]s %[14]s WHERE %[5]s GROUP BY 1 %[13]s
) comparison
ON
%[17]s
%[6]s
%[7]s
OFFSET
%[8]d
) WHERE %[15]s
`,
SELECT * from (
SELECT COALESCE(base.%[2]s, comparison.%[2]s) AS %[10]s, %[9]s FROM
(
SELECT %[1]s FROM %[3]s %[14]s WHERE %[4]s GROUP BY 1 %[12]s
) base
%[11]s JOIN
(
SELECT %[16]s FROM %[3]s %[14]s WHERE %[5]s GROUP BY 1 %[13]s
) comparison
ON
%[17]s
%[6]s
%[7]s
OFFSET
%[8]d
) WHERE %[15]s
`,
subSelectClause, // 1
colName, // 2
safeName(mv.Table), // 3
Expand Down Expand Up @@ -897,6 +978,15 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M
rightWhereClause = baseWhereClause
}

twiceTheLimitClause := ""
if q.Exact {
if q.Limit > 0 {
twiceTheLimitClause = fmt.Sprintf(" LIMIT %d", q.Limit*2)
} else if q.Limit == 0 {
twiceTheLimitClause = fmt.Sprintf(" LIMIT %d", 100_000) // use Druid limit
}
}

sql = fmt.Sprintf(`
WITH %[11]s AS (
SELECT %[1]s FROM %[3]s WHERE %[4]s GROUP BY 1 %[13]s %[10]s OFFSET %[8]d
Expand All @@ -905,7 +995,7 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M
)
SELECT %[11]s.%[2]s AS %[14]s, %[9]s FROM %[11]s LEFT JOIN %[12]s ON base.%[2]s = comparison.%[2]s
GROUP BY 1
HAVING %[15]s
HAVING %[15]s
%[6]s
%[7]s
OFFSET %[8]d
Expand Down Expand Up @@ -1207,3 +1297,12 @@ func validateMeasureAliases(aliases []*runtimev1.MetricsViewComparisonMeasureAli
func isTimeRangeNil(tr *runtimev1.TimeRange) bool {
return tr == nil || (tr.Start == nil && tr.End == nil)
}

func (q *MetricsViewComparison) isDeltaComparison() bool {
return q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_ABS_DELTA ||
q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_REL_DELTA
}

func (q *MetricsViewComparison) isBase() bool {
return q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_BASE_VALUE
}

1 comment on commit c11f545

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 Published on https://ui.rilldata.com as production
🚀 Deployed on https://6601c63c829f002ee9c64852--rill-ui.netlify.app

Please sign in to comment.