Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,8 @@ export class BaseQuery {
preAggregationQuery: this.options.preAggregationQuery,
totalQuery: this.options.totalQuery,
joinHints: this.options.joinHints,
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin')
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin'),
disableExternalPreAggregations: !!this.options.disableExternalPreAggregations,
};

try {
Expand Down Expand Up @@ -945,7 +946,8 @@ export class BaseQuery {
ungrouped: this.options.ungrouped,
exportAnnotatedSql: false,
preAggregationQuery: this.options.preAggregationQuery,
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin')
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin'),
disableExternalPreAggregations: !!this.options.disableExternalPreAggregations,
};

const buildResult = nativeBuildSqlAndParams(queryParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export class PreAggregations {
* It returns full pre-aggregation object (with keyQueries, previewSql, loadSql, and so on.
*/
public preAggregationsDescription(): FullPreAggregationDescription[] {
const disableExternalPreAggregations = this.query.options?.disableExternalPreAggregations;
const preAggregations = [this.preAggregationsDescriptionLocal()].concat(
this.query.subQueryDimensions.map(d => this.query.subQueryDescription(d).subQuery)
.map(q => q.preAggregations.preAggregationsDescription())
Expand All @@ -114,7 +115,7 @@ export class PreAggregations {
R.uniqBy(desc => desc.tableName)
)(
preAggregations
);
).filter(agg => !(disableExternalPreAggregations && agg.external));
}

private preAggregationsDescriptionLocal(): FullPreAggregationDescription[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct BaseQueryOptionsStatic {
pub total_query: Option<bool>,
#[serde(rename = "cubestoreSupportMultistage")]
pub cubestore_support_multistage: Option<bool>,
#[serde(rename = "disableExternalPreAggregations")]
pub disable_external_pre_aggregations: bool,
}

#[nativebridge::native_bridge(BaseQueryOptionsStatic)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ impl PreAggregationOptimizer {
}
}

pub fn try_optimize(&mut self, plan: Rc<Query>) -> Result<Option<Rc<Query>>, CubeError> {
pub fn try_optimize(
&mut self,
plan: Rc<Query>,
disable_external_pre_aggregations: bool,
) -> Result<Option<Rc<Query>>, CubeError> {
let cube_names = collect_cube_names_from_node(&plan)?;
let mut compiler = PreAggregationsCompiler::try_new(self.query_tools.clone(), &cube_names)?;

let compiled_pre_aggregations = compiler.compile_all_pre_aggregations()?;
let compiled_pre_aggregations =
compiler.compile_all_pre_aggregations(disable_external_pre_aggregations)?;

for pre_aggregation in compiled_pre_aggregations.iter() {
let new_query = self.try_rewrite_query(plan.clone(), pre_aggregation)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,14 @@ impl PreAggregationsCompiler {

pub fn compile_all_pre_aggregations(
&mut self,
disable_external_pre_aggregations: bool,
) -> Result<Vec<Rc<CompiledPreAggregation>>, CubeError> {
let mut result = Vec::new();
for (name, _) in self.descriptions.clone().iter() {
result.push(self.compile_pre_aggregation(&name)?);
let pre_aggregation = self.compile_pre_aggregation(name)?;
if !(disable_external_pre_aggregations && pre_aggregation.external == Some(true)) {
result.push(pre_aggregation);
}
}
Ok(result)
}
Expand Down
6 changes: 5 additions & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ impl<IT: InnerTypes> BaseQuery<IT> {
self.query_tools.clone(),
self.cubestore_support_multistage,
);
if let Some(result) = pre_aggregation_optimizer.try_optimize(plan.clone())? {
let disable_external_pre_aggregations =
self.request.disable_external_pre_aggregations();
if let Some(result) = pre_aggregation_optimizer
.try_optimize(plan.clone(), disable_external_pre_aggregations)?
{
if pre_aggregation_optimizer.get_used_pre_aggregations().len() == 1 {
(
result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl DimensionSubqueryPlanner {
false,
Rc::new(vec![]),
true,
self.query_properties.disable_external_pre_aggregations(),
)?;
let query_planner = QueryPlanner::new(sub_query_properties, self.query_tools.clone());
let sub_query = query_planner.plan()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl MultiStageMemberQueryPlanner {
false,
Rc::new(vec![]),
true,
self.query_properties.disable_external_pre_aggregations(),
)?;

let simple_query_planer =
Expand Down Expand Up @@ -377,6 +378,7 @@ impl MultiStageMemberQueryPlanner {
false,
self.query_properties.query_join_hints().clone(),
false,
self.query_properties.disable_external_pre_aggregations(),
)?;

let query_planner =
Expand Down
10 changes: 10 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub struct QueryProperties {
total_query: bool,
query_join_hints: Rc<Vec<JoinHintItem>>,
allow_multi_stage: bool,
disable_external_pre_aggregations: bool,
}

impl QueryProperties {
Expand Down Expand Up @@ -405,6 +406,8 @@ impl QueryProperties {

let pre_aggregation_query = options.static_data().pre_aggregation_query.unwrap_or(false);
let total_query = options.static_data().total_query.unwrap_or(false);
let disable_external_pre_aggregations =
options.static_data().disable_external_pre_aggregations;

let mut res = Self {
measures,
Expand All @@ -425,6 +428,7 @@ impl QueryProperties {
total_query,
query_join_hints,
allow_multi_stage: true,
disable_external_pre_aggregations,
};
res.apply_static_filters()?;
Ok(Rc::new(res))
Expand All @@ -448,6 +452,7 @@ impl QueryProperties {
total_query: bool,
query_join_hints: Rc<Vec<JoinHintItem>>,
allow_multi_stage: bool,
disable_external_pre_aggregations: bool,
) -> Result<Rc<Self>, CubeError> {
let order_by = if order_by.is_empty() {
Self::default_order(&dimensions, &time_dimensions, &measures)
Expand All @@ -474,6 +479,7 @@ impl QueryProperties {
total_query,
query_join_hints,
allow_multi_stage,
disable_external_pre_aggregations,
};
res.apply_static_filters()?;

Expand Down Expand Up @@ -721,6 +727,10 @@ impl QueryProperties {
self.pre_aggregation_query
}

pub fn disable_external_pre_aggregations(&self) -> bool {
self.disable_external_pre_aggregations
}

pub fn all_filters(&self) -> Option<Filter> {
let items = self
.time_dimensions_filters
Expand Down
Loading