Skip to content

Commit

Permalink
Merge pull request #391 from Altinity/backports/23.8/56502_fix_projec…
Browse files Browse the repository at this point in the history
…tion_analysis

23.8 Backports of ClickHouse#56502 and ClickHouse#58638 - Fix projection analysis
  • Loading branch information
Enmk authored Oct 17, 2024
2 parents 0ed3408 + 875ab56 commit b97c01c
Show file tree
Hide file tree
Showing 23 changed files with 233 additions and 99 deletions.
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ class IColumn;
M(Bool, optimize_use_projections, true, "Automatically choose projections to perform SELECT query", 0) ALIAS(allow_experimental_projection_optimization) \
M(Bool, optimize_use_implicit_projections, true, "Automatically choose implicit projections to perform SELECT query", 0) \
M(Bool, force_optimize_projection, false, "If projection optimization is enabled, SELECT queries need to use projection", 0) \
M(String, preferred_optimize_projection_name, "", "If it is set to a non-empty string, ClickHouse tries to apply specified projection", 0) \
M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \
M(Bool, async_query_sending_for_remote, true, "Asynchronously create connections and send query to shards in remote query", 0) \
M(Bool, insert_null_as_default, true, "Insert DEFAULT values instead of NULL in INSERT SELECT (UNION ALL)", 0) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ struct MinMaxProjectionCandidate
{
AggregateProjectionCandidate candidate;
Block block;
MergeTreeData::DataPartsVector normal_parts;
};

struct AggregateProjectionCandidates
Expand Down Expand Up @@ -476,7 +475,6 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
{
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG());
AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)};
MergeTreeData::DataPartsVector minmax_projection_normal_parts;

// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection sample block {}", sample_block.dumpStructure());
auto block = reading.getMergeTreeData().getMinMaxCountProjectionBlock(
Expand All @@ -485,21 +483,20 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
dag.filter_node != nullptr,
query_info,
parts,
minmax_projection_normal_parts,
nullptr,
max_added_blocks.get(),
context);

// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection sample block 2 {}", block.dumpStructure());

// minmax_count_projection cannot be used used when there is no data to process, because
// minmax_count_projection cannot be used when there is no data to process, because
// it will produce incorrect result during constant aggregation.
// See https://github.com/ClickHouse/ClickHouse/issues/36728
if (block)
{
MinMaxProjectionCandidate minmax;
minmax.candidate = std::move(candidate);
minmax.block = std::move(block);
minmax.normal_parts = std::move(minmax_projection_normal_parts);
minmax.candidate.projection = projection;
candidates.minmax_projection.emplace(std::move(minmax));
}
Expand All @@ -508,6 +505,18 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(

if (!candidates.minmax_projection)
{
auto it = std::find_if(agg_projections.begin(), agg_projections.end(), [&](const auto * projection)
{
return projection->name == context->getSettings().preferred_optimize_projection_name.value;
});

if (it != agg_projections.end())
{
const ProjectionDescription * preferred_projection = *it;
agg_projections.clear();
agg_projections.push_back(preferred_projection);
}

candidates.real.reserve(agg_projections.size());
for (const auto * projection : agg_projections)
{
Expand Down Expand Up @@ -569,49 +578,74 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &

auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks, allow_implicit_projections);

AggregateProjectionCandidate * best_candidate = nullptr;
if (candidates.minmax_projection)
best_candidate = &candidates.minmax_projection->candidate;
else if (candidates.real.empty())
return false;

const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
const auto metadata = reading->getStorageMetadata();
ContextPtr context = reading->getContext();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
AggregateProjectionCandidate * best_candidate = nullptr;

auto ordinary_reading_select_result = reading->selectRangesToRead(parts, /* alter_conversions = */ {});
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();

/// Selecting best candidate.
for (auto & candidate : candidates.real)
if (candidates.minmax_projection)
{
auto required_column_names = candidate.dag->getRequiredColumnsNames();
ActionDAGNodes added_filter_nodes;
if (candidates.has_filter)
added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front());
best_candidate = &candidates.minmax_projection->candidate;
}
else if (!candidates.real.empty())
{
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();

bool analyzed = analyzeProjectionCandidate(
candidate, *reading, reader, required_column_names, parts,
metadata, query_info, context, max_added_blocks, added_filter_nodes);
/// Nothing to read. Ignore projections.
if (ordinary_reading_marks == 0)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}

if (!analyzed)
continue;
const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges();

if (candidate.sum_marks > ordinary_reading_marks)
continue;
/// Selecting best candidate.
for (auto & candidate : candidates.real)
{
auto required_column_names = candidate.dag->getRequiredColumnsNames();
ActionDAGNodes added_filter_nodes;
if (candidates.has_filter)
added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front());

bool analyzed = analyzeProjectionCandidate(
candidate,
*reading,
reader,
required_column_names,
parts_with_ranges,
query_info,
context,
max_added_blocks,
added_filter_nodes);

if (best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks)
best_candidate = &candidate;
}
if (!analyzed)
continue;

if (candidate.sum_marks > ordinary_reading_marks)
continue;

if (!best_candidate)
if (best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks)
best_candidate = &candidate;
}

if (!best_candidate)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}
}
else
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}

chassert(best_candidate != nullptr);

QueryPlanStepPtr projection_reading;
bool has_ordinary_parts;

Expand All @@ -632,9 +666,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = candidates.minmax_projection->candidate.projection->name,
});
has_ordinary_parts = !candidates.minmax_projection->normal_parts.empty();
if (has_ordinary_parts)
reading->resetParts(std::move(candidates.minmax_projection->normal_parts));
has_ordinary_parts = false;
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <Processors/Sources/NullSource.h>
#include <Common/logger_useful.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <stack>
#include <algorithm>

namespace DB::QueryPlanOptimizations
{
Expand Down Expand Up @@ -107,6 +107,19 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (normal_projections.empty())
return false;

ContextPtr context = reading->getContext();
auto it = std::find_if(normal_projections.begin(), normal_projections.end(), [&](const auto * projection)
{
return projection->name == context->getSettings().preferred_optimize_projection_name.value;
});

if (it != normal_projections.end())
{
const ProjectionDescription * preferred_projection = *it;
normal_projections.clear();
normal_projections.push_back(preferred_projection);
}

QueryDAG query;
{
auto & child = iter->node->children[iter->next_child - 1];
Expand All @@ -122,13 +135,22 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)

const Names & required_columns = reading->getRealColumnNames();
const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
ContextPtr context = reading->getContext();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());

auto ordinary_reading_select_result = reading->selectRangesToRead(parts, /* alter_conversions = */ {});
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();

/// Nothing to read. Ignore projections.
if (ordinary_reading_marks == 0)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}

const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges();

std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);

for (const auto * projection : normal_projections)
Expand All @@ -144,8 +166,15 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
added_filter_nodes.nodes.push_back(query.filter_node);

bool analyzed = analyzeProjectionCandidate(
candidate, *reading, reader, required_columns, parts,
metadata, query_info, context, max_added_blocks, added_filter_nodes);
candidate,
*reading,
reader,
required_columns,
parts_with_ranges,
query_info,
context,
max_added_blocks,
added_filter_nodes);

if (!analyzed)
continue;
Expand Down
19 changes: 12 additions & 7 deletions src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,23 +210,28 @@ bool analyzeProjectionCandidate(
const ReadFromMergeTree & reading,
const MergeTreeDataSelectExecutor & reader,
const Names & required_column_names,
const MergeTreeData::DataPartsVector & parts,
const StorageMetadataPtr & metadata,
const RangesInDataParts & parts_with_ranges,
const SelectQueryInfo & query_info,
const ContextPtr & context,
const std::shared_ptr<PartitionIdToMaxBlock> & max_added_blocks,
const ActionDAGNodes & added_filter_nodes)
{
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
for (const auto & part : parts)
std::vector<AlterConversionsPtr> alter_conversions;
for (const auto & part_with_ranges : parts_with_ranges)
{
const auto & created_projections = part->getProjectionParts();
const auto & created_projections = part_with_ranges.data_part->getProjectionParts();
auto it = created_projections.find(candidate.projection->name);
if (it != created_projections.end())
{
projection_parts.push_back(it->second);
}
else
normal_parts.push_back(part);
{
normal_parts.push_back(part_with_ranges.data_part);
alter_conversions.push_back(part_with_ranges.alter_conversions);
}
}

if (projection_parts.empty())
Expand All @@ -236,7 +241,6 @@ bool analyzeProjectionCandidate(
std::move(projection_parts),
nullptr,
required_column_names,
metadata,
candidate.projection->metadata,
query_info, /// How it is actually used? I hope that for index we need only added_filter_nodes
added_filter_nodes,
Expand All @@ -252,7 +256,8 @@ bool analyzeProjectionCandidate(

if (!normal_parts.empty())
{
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), /* alter_conversions = */ {});
/// TODO: We can reuse existing analysis_result by filtering out projection parts
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), std::move(alter_conversions));

if (normal_result_ptr->error())
return false;
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/QueryPlan/Optimizations/projectionsCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelect
class IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
struct RangesInDataParts;

struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
Expand Down Expand Up @@ -71,8 +72,7 @@ bool analyzeProjectionCandidate(
const ReadFromMergeTree & reading,
const MergeTreeDataSelectExecutor & reader,
const Names & required_column_names,
const DataPartsVector & parts,
const StorageMetadataPtr & metadata,
const RangesInDataParts & parts_with_ranges,
const SelectQueryInfo & query_info,
const ContextPtr & context,
const std::shared_ptr<PartitionIdToMaxBlock> & max_added_blocks,
Expand Down
28 changes: 18 additions & 10 deletions src/Processors/QueryPlan/ReadFromMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,6 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
std::move(alter_conversions),
prewhere_info,
filter_nodes,
storage_snapshot->metadata,
metadata_for_reading,
query_info,
context,
Expand Down Expand Up @@ -1354,7 +1353,6 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
std::vector<AlterConversionsPtr> alter_conversions,
const PrewhereInfoPtr & prewhere_info,
const ActionDAGNodes & added_filter_nodes,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
Expand All @@ -1375,7 +1373,6 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
return selectRangesToReadImpl(
std::move(parts),
std::move(alter_conversions),
metadata_snapshot_base,
metadata_snapshot,
updated_query_info_with_filter_dag,
context,
Expand All @@ -1391,7 +1388,6 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
return selectRangesToReadImpl(
std::move(parts),
std::move(alter_conversions),
metadata_snapshot_base,
metadata_snapshot,
query_info,
context,
Expand All @@ -1407,7 +1403,6 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
Expand Down Expand Up @@ -1468,7 +1463,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
parts,
alter_conversions,
part_values,
metadata_snapshot_base,
metadata_snapshot,
data,
context,
max_block_numbers_to_read.get(),
Expand Down Expand Up @@ -2157,10 +2152,23 @@ size_t MergeTreeDataSelectAnalysisResult::marks() const
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));

const auto & index_stats = std::get<ReadFromMergeTree::AnalysisResult>(result).index_stats;
if (index_stats.empty())
return 0;
return index_stats.back().num_granules_after;
return std::get<ReadFromMergeTree::AnalysisResult>(result).selected_marks;
}

UInt64 MergeTreeDataSelectAnalysisResult::rows() const
{
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));

return std::get<ReadFromMergeTree::AnalysisResult>(result).selected_rows;
}

const RangesInDataParts & MergeTreeDataSelectAnalysisResult::partsWithRanges() const
{
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));

return std::get<ReadFromMergeTree::AnalysisResult>(result).parts_with_ranges;
}

}
Loading

0 comments on commit b97c01c

Please sign in to comment.