Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,8 @@ export class BaseQuery {
preAggregationQuery: this.options.preAggregationQuery,
totalQuery: this.options.totalQuery,
joinHints: this.options.joinHints,
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin')
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin'),
disableExternalPreAggregations: !!this.options.disableExternalPreAggregations,
};

try {
Expand Down Expand Up @@ -945,7 +946,8 @@ export class BaseQuery {
ungrouped: this.options.ungrouped,
exportAnnotatedSql: false,
preAggregationQuery: this.options.preAggregationQuery,
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin')
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin'),
disableExternalPreAggregations: !!this.options.disableExternalPreAggregations,
};

const buildResult = nativeBuildSqlAndParams(queryParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,19 @@ export class PreAggregations {
* It returns full pre-aggregation object (with keyQueries, previewSql, loadSql, and so on.
*/
public preAggregationsDescription(): FullPreAggregationDescription[] {
const disableExternalPreAggregations = this.query.options?.disableExternalPreAggregations;
const preAggregations = [this.preAggregationsDescriptionLocal()].concat(
this.query.subQueryDimensions.map(d => this.query.subQueryDescription(d).subQuery)
.map(q => q.preAggregations.preAggregationsDescription())
);

return R.pipe(
R.unnest as (list: any[][]) => any[],
// TODO: Move this to somewhere BEFORE pre-agg matching, possibly to rollupMatchResults()
// to avoid constly matching and then throwing it away.
R.filter((agg: FullPreAggregationDescription) => !(disableExternalPreAggregations && agg.external)),
R.uniqBy(desc => desc.tableName)
)(
preAggregations
);
)(preAggregations);
}

private preAggregationsDescriptionLocal(): FullPreAggregationDescription[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct BaseQueryOptionsStatic {
pub total_query: Option<bool>,
#[serde(rename = "cubestoreSupportMultistage")]
pub cubestore_support_multistage: Option<bool>,
#[serde(rename = "disableExternalPreAggregations")]
pub disable_external_pre_aggregations: bool,
}

#[nativebridge::native_bridge(BaseQueryOptionsStatic)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ impl PreAggregationOptimizer {
}
}

pub fn try_optimize(&mut self, plan: Rc<Query>) -> Result<Option<Rc<Query>>, CubeError> {
pub fn try_optimize(
&mut self,
plan: Rc<Query>,
disable_external_pre_aggregations: bool,
) -> Result<Option<Rc<Query>>, CubeError> {
let cube_names = collect_cube_names_from_node(&plan)?;
let mut compiler = PreAggregationsCompiler::try_new(self.query_tools.clone(), &cube_names)?;

let compiled_pre_aggregations = compiler.compile_all_pre_aggregations()?;
let compiled_pre_aggregations =
compiler.compile_all_pre_aggregations(disable_external_pre_aggregations)?;

for pre_aggregation in compiled_pre_aggregations.iter() {
let new_query = self.try_rewrite_query(plan.clone(), pre_aggregation)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,14 @@ impl PreAggregationsCompiler {

pub fn compile_all_pre_aggregations(
&mut self,
disable_external_pre_aggregations: bool,
) -> Result<Vec<Rc<CompiledPreAggregation>>, CubeError> {
let mut result = Vec::new();
for (name, _) in self.descriptions.clone().iter() {
result.push(self.compile_pre_aggregation(&name)?);
let pre_aggregation = self.compile_pre_aggregation(name)?;
if !(disable_external_pre_aggregations && pre_aggregation.external == Some(true)) {
result.push(pre_aggregation);
}
}
Ok(result)
}
Expand Down
6 changes: 5 additions & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ impl<IT: InnerTypes> BaseQuery<IT> {
self.query_tools.clone(),
self.cubestore_support_multistage,
);
if let Some(result) = pre_aggregation_optimizer.try_optimize(plan.clone())? {
let disable_external_pre_aggregations =
self.request.disable_external_pre_aggregations();
if let Some(result) = pre_aggregation_optimizer
.try_optimize(plan.clone(), disable_external_pre_aggregations)?
{
if pre_aggregation_optimizer.get_used_pre_aggregations().len() == 1 {
(
result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl DimensionSubqueryPlanner {
false,
Rc::new(vec![]),
true,
self.query_properties.disable_external_pre_aggregations(),
)?;
let query_planner = QueryPlanner::new(sub_query_properties, self.query_tools.clone());
let sub_query = query_planner.plan()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl MultiStageMemberQueryPlanner {
false,
Rc::new(vec![]),
true,
self.query_properties.disable_external_pre_aggregations(),
)?;

let simple_query_planer =
Expand Down Expand Up @@ -377,6 +378,7 @@ impl MultiStageMemberQueryPlanner {
false,
self.query_properties.query_join_hints().clone(),
false,
self.query_properties.disable_external_pre_aggregations(),
)?;

let query_planner =
Expand Down
10 changes: 10 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub struct QueryProperties {
total_query: bool,
query_join_hints: Rc<Vec<JoinHintItem>>,
allow_multi_stage: bool,
disable_external_pre_aggregations: bool,
}

impl QueryProperties {
Expand Down Expand Up @@ -405,6 +406,8 @@ impl QueryProperties {

let pre_aggregation_query = options.static_data().pre_aggregation_query.unwrap_or(false);
let total_query = options.static_data().total_query.unwrap_or(false);
let disable_external_pre_aggregations =
options.static_data().disable_external_pre_aggregations;

let mut res = Self {
measures,
Expand All @@ -425,6 +428,7 @@ impl QueryProperties {
total_query,
query_join_hints,
allow_multi_stage: true,
disable_external_pre_aggregations,
};
res.apply_static_filters()?;
Ok(Rc::new(res))
Expand All @@ -448,6 +452,7 @@ impl QueryProperties {
total_query: bool,
query_join_hints: Rc<Vec<JoinHintItem>>,
allow_multi_stage: bool,
disable_external_pre_aggregations: bool,
) -> Result<Rc<Self>, CubeError> {
let order_by = if order_by.is_empty() {
Self::default_order(&dimensions, &time_dimensions, &measures)
Expand All @@ -474,6 +479,7 @@ impl QueryProperties {
total_query,
query_join_hints,
allow_multi_stage,
disable_external_pre_aggregations,
};
res.apply_static_filters()?;

Expand Down Expand Up @@ -721,6 +727,10 @@ impl QueryProperties {
self.pre_aggregation_query
}

pub fn disable_external_pre_aggregations(&self) -> bool {
self.disable_external_pre_aggregations
}

pub fn all_filters(&self) -> Option<Filter> {
let items = self
.time_dimensions_filters
Expand Down
Loading