Skip to content

Commit

Permalink
Start agg step stream according to agg_col_cost and group_by_keys_cost
Browse files Browse the repository at this point in the history
  • Loading branch information
auxten committed Jun 24, 2024
1 parent 50370a6 commit 25dedc1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
29 changes: 26 additions & 3 deletions src/Processors/QueryPlan/AggregatingStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
#include <Common/Logger.h>

namespace DB
{
Expand Down Expand Up @@ -457,12 +458,34 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
/// If there are several sources, then we perform parallel aggregation
if (pipeline.getNumStreams() > 1)
{
auto stream_count = pipeline.getNumStreams();
/// Calculate the stream count by adding up all the stream aggregate functions costs.
/// the max stream count is the number of pipeline streams, the min stream count is 4.
/// cost = 2 + agg_col_cost + group_by_keys_cost
size_t estimate_stream = 0;
size_t agg_col_cost = 0;
size_t group_by_keys_cost = 0;
for (const auto & agg : params.aggregates)
{
/// get the function count by counting "("
agg_col_cost += static_cast<size_t>(std::count(agg.column_name.begin(), agg.column_name.end(), '('));
/// get the column count by counting "," + 1
agg_col_cost += 1 + static_cast<size_t>(std::count(agg.column_name.begin(), agg.column_name.end(), ','));
}
for (const auto & key : params.keys)
{
group_by_keys_cost += 1 + 8 * static_cast<size_t>(std::log2(key.size() + 1));
}
estimate_stream = std::min(stream_count, std::max(4ul, 2 * (agg_col_cost + group_by_keys_cost)));

LOG_TRACE(getLogger("AggregatingStep"), "AggregatingStep: estimate_stream = {}", estimate_stream);

/// Add resize transform to uniformly distribute data between aggregating streams.
/// But not if we execute aggregation over partitioned data in which case data streams shouldn't be mixed.
if (!storage_has_evenly_distributed_read && !skip_merging)
pipeline.resize(pipeline.getNumStreams(), true, true);
pipeline.resize(estimate_stream, true, true);

auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
auto many_data = std::make_shared<ManyAggregatedData>(estimate_stream);

size_t counter = 0;
pipeline.addSimpleTransform(
Expand All @@ -479,7 +502,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
skip_merging);
});

pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, true /* force */);
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : estimate_stream, true /* force */);

aggregating = collector.detachProcessors(0);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Sources/PythonSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,14 +387,14 @@ Chunk PythonSource::scanDataToChunk()
{
// log first 10 rows of the column
std::stringstream ss;
LOG_DEBUG(logger, "Column {} structure: {}", col.name, columns[i]->dumpStructure());
// LOG_DEBUG(logger, "Column {} structure: {}", col.name, columns[i]->dumpStructure());
for (size_t j = 0; j < std::min(count, static_cast<size_t>(10)); ++j)
{
Field value;
columns[i]->get(j, value);
ss << toString(value) << ", ";
}
LOG_DEBUG(logger, "Column {} data: {}", col.name, ss.str());
// LOG_DEBUG(logger, "Column {} data: {}", col.name, ss.str());
}
}
catch (const Exception & e)
Expand Down

0 comments on commit 25dedc1

Please sign in to comment.