From 25dedc1a482369cae3003ba535bbef85601b7f76 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 24 Jun 2024 13:43:09 +0800 Subject: [PATCH] Start agg step stream according to agg_col_cost and group_by_keys_cost --- src/Processors/QueryPlan/AggregatingStep.cpp | 29 ++++++++++++++++++-- src/Processors/Sources/PythonSource.cpp | 4 +-- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 0d7e05af1de..91b4422b222 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace DB { @@ -457,12 +458,34 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B /// If there are several sources, then we perform parallel aggregation if (pipeline.getNumStreams() > 1) { + auto stream_count = pipeline.getNumStreams(); + /// Calculate the stream count by adding up all the stream aggregate functions costs. + /// the max stream count is the number of pipeline streams, the min stream count is 4. + /// cost = 2 + agg_col_cost + group_by_keys_cost + size_t estimate_stream = 0; + size_t agg_col_cost = 0; + size_t group_by_keys_cost = 0; + for (const auto & agg : params.aggregates) + { + /// get the function count by counting "(" + agg_col_cost += static_cast(std::count(agg.column_name.begin(), agg.column_name.end(), '(')); + /// get the column count by counting "," + 1 + agg_col_cost += 1 + static_cast(std::count(agg.column_name.begin(), agg.column_name.end(), ',')); + } + for (const auto & key : params.keys) + { + group_by_keys_cost += 1 + 8 * static_cast(std::log2(key.size() + 1)); + } + estimate_stream = std::min(stream_count, std::max(4ul, 2 * (agg_col_cost + group_by_keys_cost))); + + LOG_TRACE(getLogger("AggregatingStep"), "AggregatingStep: estimate_stream = {}", estimate_stream); + /// Add resize transform to uniformly distribute data between aggregating streams. /// But not if we execute aggregation over partitioned data in which case data streams shouldn't be mixed. if (!storage_has_evenly_distributed_read && !skip_merging) - pipeline.resize(pipeline.getNumStreams(), true, true); + pipeline.resize(estimate_stream, true, true); - auto many_data = std::make_shared(pipeline.getNumStreams()); + auto many_data = std::make_shared(estimate_stream); size_t counter = 0; pipeline.addSimpleTransform( @@ -479,7 +502,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B skip_merging); }); - pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, true /* force */); + pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : estimate_stream, true /* force */); aggregating = collector.detachProcessors(0); } diff --git a/src/Processors/Sources/PythonSource.cpp b/src/Processors/Sources/PythonSource.cpp index 039461a1d17..786f8348d20 100644 --- a/src/Processors/Sources/PythonSource.cpp +++ b/src/Processors/Sources/PythonSource.cpp @@ -387,14 +387,14 @@ Chunk PythonSource::scanDataToChunk() { // log first 10 rows of the column std::stringstream ss; - LOG_DEBUG(logger, "Column {} structure: {}", col.name, columns[i]->dumpStructure()); + // LOG_DEBUG(logger, "Column {} structure: {}", col.name, columns[i]->dumpStructure()); for (size_t j = 0; j < std::min(count, static_cast(10)); ++j) { Field value; columns[i]->get(j, value); ss << toString(value) << ", "; } - LOG_DEBUG(logger, "Column {} data: {}", col.name, ss.str()); + // LOG_DEBUG(logger, "Column {} data: {}", col.name, ss.str()); } } catch (const Exception & e)