Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
890c8d3
[doc](parquet) Define complex type reader contract
suxiaogang223 Jun 9, 2026
3295cbc
[doc](parquet) Clarify struct-only nested predicate contract
suxiaogang223 Jun 9, 2026
324c236
[doc](parquet) Use struct-only nested predicate contract
suxiaogang223 Jun 9, 2026
59a8b1d
[doc](parquet) Clarify complex row-level filter contract
suxiaogang223 Jun 9, 2026
72ad524
[doc](parquet) Center contract on nested shape support
suxiaogang223 Jun 9, 2026
c7c2ecb
[test](be) Add parquet nested predicate safety tests
suxiaogang223 Jun 9, 2026
63932c5
[improvement](be) Add parquet nested shape reader channel
suxiaogang223 Jun 9, 2026
5ed3a84
[fix](be) Fix parquet nested shape null map assignment
suxiaogang223 Jun 9, 2026
89a7622
[improvement](be) Support recursive parquet complex readers
suxiaogang223 Jun 9, 2026
d278819
[test](be) Fix parquet map map test row count
suxiaogang223 Jun 9, 2026
e2c6d21
[fix](be) Preserve parquet complex top-level reads
suxiaogang223 Jun 9, 2026
44dbf0c
[fix](be) Skip parquet nested scalar shape slots
suxiaogang223 Jun 10, 2026
dbae23e
[fix](be) Align parquet nested map scalar values
suxiaogang223 Jun 10, 2026
722c847
[fix](be) Match parquet map value levels by repetition
suxiaogang223 Jun 10, 2026
5d040d2
[fix](be) Consume parquet map value shape levels
suxiaogang223 Jun 10, 2026
9330af6
[fix](be) Align nested parquet map scalar values by slot
suxiaogang223 Jun 10, 2026
b831b80
[fix](be) Build nested parquet map values recursively
suxiaogang223 Jun 10, 2026
bd2ef51
[fix](be) Align nested parquet map scalar value slots
suxiaogang223 Jun 10, 2026
2d2bddd
[fix](be) Advance parquet nested nullable scalar slots
suxiaogang223 Jun 10, 2026
f0cd265
[fix](be) Fix recursive parquet struct child alignment
suxiaogang223 Jun 10, 2026
71fab0b
[refactor](be) Remove legacy parquet complex readers
suxiaogang223 Jun 10, 2026
9748db0
[test](be) Add deep parquet nested shape tests
suxiaogang223 Jun 10, 2026
2156cef
[improvement](be) Support nested parquet schema evolution
suxiaogang223 Jun 10, 2026
8d95f44
[refactor](be) Add parquet struct predicate target
suxiaogang223 Jun 10, 2026
3a87bd7
[improvement](be) Add nested parquet null pruning
suxiaogang223 Jun 10, 2026
b57b439
[doc](be) Remove completed parquet complex type contract
suxiaogang223 Jun 10, 2026
50310be
[fix](be) Align table reader test options after rebase
suxiaogang223 Jun 10, 2026
8c5ca02
[fix](be) Fix parquet column reader unsupported type test
suxiaogang223 Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
445 changes: 380 additions & 65 deletions be/src/format_v2/column_mapper.cpp

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions be/src/format_v2/column_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ struct ColumnMapping {
// and to localize nested filters that reference children not present in the output projection.
DataTypePtr original_file_type;
std::vector<ColumnDefinition> original_file_children;
// File children after applying the scan projection. The order follows the physical file schema,
// not table child order. TableReader uses this to map table-output children back to the
// file-local block layout when projection, predicate-only children, and schema evolution mix.
std::vector<ColumnDefinition> projected_file_children;
// Split/file-local constant entry when this mapping is produced from partition/default/virtual
// expression instead of physical file data.
std::optional<ConstantIndex> constant_index;
Expand Down
94 changes: 92 additions & 2 deletions be/src/format_v2/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
namespace doris::format {
namespace {

std::unique_ptr<FileStructPredicateTarget> clone_struct_predicate_target(
const std::unique_ptr<FileStructPredicateTarget>& target) {
return target == nullptr ? nullptr : std::make_unique<FileStructPredicateTarget>(*target);
}

template <typename T, typename Formatter>
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
std::ostringstream out;
Expand Down Expand Up @@ -53,12 +58,97 @@ std::string int_vector_debug_string(const std::vector<int32_t>& values) {
return out.str();
}

void append_struct_predicate_path(const FileStructPredicateTarget* target,
std::vector<int32_t>* path) {
DORIS_CHECK(path != nullptr);
for (const auto* current = target; current != nullptr; current = current->child.get()) {
path->push_back(current->file_local_id);
}
}

std::string struct_predicate_target_debug_string(const FileStructPredicateTarget* target) {
if (target == nullptr) {
return "null";
}
std::ostringstream out;
out << "{file_local_id=" << target->file_local_id
<< ", file_child_name=" << target->file_child_name
<< ", child=" << struct_predicate_target_debug_string(target->child.get()) << "}";
return out.str();
}

bool struct_predicate_targets_equal(const FileStructPredicateTarget* lhs,
const FileStructPredicateTarget* rhs) {
while (lhs != nullptr && rhs != nullptr) {
if (lhs->file_local_id != rhs->file_local_id) {
return false;
}
lhs = lhs->child.get();
rhs = rhs->child.get();
}
return lhs == nullptr && rhs == nullptr;
}

} // namespace

FileStructPredicateTarget::FileStructPredicateTarget(const FileStructPredicateTarget& other)
: file_local_id(other.file_local_id),
file_child_name(other.file_child_name),
child(clone_struct_predicate_target(other.child)) {}

FileStructPredicateTarget& FileStructPredicateTarget::operator=(
const FileStructPredicateTarget& other) {
if (this == &other) {
return *this;
}
file_local_id = other.file_local_id;
file_child_name = other.file_child_name;
child = clone_struct_predicate_target(other.child);
return *this;
}

FileNestedPredicateTarget::FileNestedPredicateTarget(const FileNestedPredicateTarget& other)
: file_column_id(other.file_column_id),
struct_target(clone_struct_predicate_target(other.struct_target)) {}

FileNestedPredicateTarget& FileNestedPredicateTarget::operator=(
const FileNestedPredicateTarget& other) {
if (this == &other) {
return *this;
}
file_column_id = other.file_column_id;
struct_target = clone_struct_predicate_target(other.struct_target);
return *this;
}

LocalColumnId FileColumnPredicateFilter::effective_file_column_id() const {
return target.is_valid() ? target.file_column_id : file_column_id;
}

std::vector<int32_t> FileColumnPredicateFilter::effective_file_child_id_path() const {
if (!target.is_valid()) {
return file_child_id_path;
}
std::vector<int32_t> path;
append_struct_predicate_path(target.struct_target.get(), &path);
return path;
}

bool FileColumnPredicateFilter::same_target_as(const FileColumnPredicateFilter& other) const {
if (target.is_valid() && other.target.is_valid()) {
return target.file_column_id == other.target.file_column_id &&
struct_predicate_targets_equal(target.struct_target.get(),
other.target.struct_target.get());
}
return effective_file_column_id() == other.effective_file_column_id() &&
effective_file_child_id_path() == other.effective_file_child_id_path();
}

std::string FileColumnPredicateFilter::debug_string() const {
std::ostringstream out;
out << "FileColumnPredicateFilter{file_column_id=" << file_column_id
<< ", file_child_id_path=" << int_vector_debug_string(file_child_id_path)
out << "FileColumnPredicateFilter{target={file_column_id=" << effective_file_column_id()
<< ", struct_target=" << struct_predicate_target_debug_string(target.struct_target.get())
<< "}, file_child_id_path=" << int_vector_debug_string(effective_file_child_id_path())
<< ", predicate_count=" << predicates.size() << "}";
return out.str();
}
Expand Down
55 changes: 49 additions & 6 deletions be/src/format_v2/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,63 @@ struct IOContext;

namespace doris::format {

// Struct-only nested predicate target used by file-layer pruning.
//
// This intentionally models only a STRUCT field chain. LIST/MAP/repeated predicates need explicit
// quantified semantics, so they must not be encoded here.
struct FileStructPredicateTarget {
int32_t file_local_id = -1;
std::string file_child_name;
std::unique_ptr<FileStructPredicateTarget> child;

FileStructPredicateTarget() = default;
FileStructPredicateTarget(int32_t local_id, std::string child_name,
std::unique_ptr<FileStructPredicateTarget> nested_child = nullptr)
: file_local_id(local_id),
file_child_name(std::move(child_name)),
child(std::move(nested_child)) {}
FileStructPredicateTarget(const FileStructPredicateTarget& other);
FileStructPredicateTarget& operator=(const FileStructPredicateTarget& other);
FileStructPredicateTarget(FileStructPredicateTarget&& other) noexcept = default;
FileStructPredicateTarget& operator=(FileStructPredicateTarget&& other) noexcept = default;
};

struct FileNestedPredicateTarget {
LocalColumnId file_column_id = LocalColumnId::invalid();
// Null means the predicate targets the top-level primitive column itself.
std::unique_ptr<FileStructPredicateTarget> struct_target;

FileNestedPredicateTarget() = default;
explicit FileNestedPredicateTarget(LocalColumnId column_id) : file_column_id(column_id) {}
FileNestedPredicateTarget(LocalColumnId column_id,
std::unique_ptr<FileStructPredicateTarget> target)
: file_column_id(column_id), struct_target(std::move(target)) {}
FileNestedPredicateTarget(const FileNestedPredicateTarget& other);
FileNestedPredicateTarget& operator=(const FileNestedPredicateTarget& other);
FileNestedPredicateTarget(FileNestedPredicateTarget&& other) noexcept = default;
FileNestedPredicateTarget& operator=(FileNestedPredicateTarget&& other) noexcept = default;

bool is_valid() const { return file_column_id.is_valid(); }
};

// File-local single-column predicates for file-layer pruning, such as min/max, page index,
// dictionary and bloom filter.
//
// Predicates must all belong to file_column_id. file_child_id_path points to the nested primitive
// leaf under that root; empty means the top-level column itself is the primitive leaf. These
// predicates are pruning hints only and are not row-level conjuncts.
// Predicates must all belong to target.file_column_id. target.struct_target points to the nested
// primitive leaf under that root; null means the top-level column itself is the primitive leaf.
// These predicates are pruning hints only and are not row-level conjuncts.
struct FileColumnPredicateFilter {
FileNestedPredicateTarget target;
// Compatibility fields for call sites and tests that still construct pruning filters directly.
// New mapper code should fill target; file readers consume target first and only fall back to
// these fields while the API migration is in progress.
LocalColumnId file_column_id = LocalColumnId::invalid();
// Reader-local child id path under file_column_id. Empty means top-level scalar.
// Each id is interpreted by the concrete FileReader inside the current parent node. For
// Parquet this is the child ordinal under that parent, not the optional Parquet field_id.
std::vector<int32_t> file_child_id_path;
std::vector<std::shared_ptr<ColumnPredicate>> predicates;

LocalColumnId effective_file_column_id() const;
std::vector<int32_t> effective_file_child_id_path() const;
bool same_target_as(const FileColumnPredicateFilter& other) const;
std::string debug_string() const;
};

Expand Down
12 changes: 11 additions & 1 deletion be/src/format_v2/parquet/parquet_column_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct SchemaBuildContext {
int16_t repetition_level = 0;
int16_t nullable_definition_level = 0;
int16_t repeated_repetition_level = 0;
int16_t repeated_ancestor_definition_level = 0;
};

bool is_list_node(const ::parquet::schema::Node& node) {
Expand Down Expand Up @@ -69,20 +70,25 @@ void inherit_common_schema_state(const ::parquet::schema::Node& node,
column_schema->max_definition_level = context.definition_level;
column_schema->max_repetition_level = context.repetition_level;
column_schema->nullable_definition_level = context.nullable_definition_level;
column_schema->definition_level = context.definition_level;
column_schema->repetition_level = context.repetition_level;
column_schema->repeated_ancestor_definition_level = context.repeated_ancestor_definition_level;
column_schema->repeated_repetition_level = context.repeated_repetition_level;
}

SchemaBuildContext child_context(const SchemaBuildContext& parent,
const ::parquet::schema::Node& child_node, int32_t child_idx) {
SchemaBuildContext result = parent;
result.local_id = child_idx;
if (child_node.repetition() != ::parquet::Repetition::REQUIRED) {
if (child_node.repetition() == ::parquet::Repetition::OPTIONAL) {
result.definition_level++;
result.nullable_definition_level = result.definition_level;
}
if (child_node.is_repeated()) {
result.repetition_level++;
result.definition_level++;
result.repeated_repetition_level = result.repetition_level;
result.repeated_ancestor_definition_level = result.definition_level;
}
return result;
}
Expand Down Expand Up @@ -152,6 +158,8 @@ Status build_node_schema(const ::parquet::SchemaDescriptor& schema,
node.name());
}
auto repeated_context = child_context(context, repeated_node, 0);
column_schema->definition_level = repeated_context.definition_level;
column_schema->repetition_level = repeated_context.repetition_level;
column_schema->repeated_repetition_level = repeated_context.repeated_repetition_level;
std::unique_ptr<ParquetColumnSchema> child;
RETURN_IF_ERROR(build_node_schema(
Expand All @@ -177,6 +185,8 @@ Status build_node_schema(const ::parquet::SchemaDescriptor& schema,
node.name());
}
auto key_value_context = child_context(context, key_value_node, 0);
column_schema->definition_level = key_value_context.definition_level;
column_schema->repetition_level = key_value_context.repetition_level;
column_schema->repeated_repetition_level = key_value_context.repeated_repetition_level;
if (key_value_node.is_primitive()) {
return Status::NotSupported("Unsupported parquet MAP key_value layout for column {}",
Expand Down
6 changes: 6 additions & 0 deletions be/src/format_v2/parquet/parquet_column_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ struct ParquetColumnSchema {
// Complex readers use this to distinguish null containers from empty containers while
// assembling STRUCT/LIST/MAP values.
int16_t nullable_definition_level = 0;
// Definition/repetition levels used to reconstruct this node from a descendant leaf level
// stream. These mirror parquet::internal::LevelInfo but are kept in Doris schema state so
// complex readers do not depend on Arrow's Arrow-array reader internals.
int16_t definition_level = 0;
int16_t repetition_level = 0;
int16_t repeated_ancestor_definition_level = 0;
// Repetition level introduced by this node's repeated container, or the nearest repeated
// container carried from its parent.
int16_t repeated_repetition_level = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/format_v2/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Status ParquetReader::open(std::shared_ptr<format::FileScanRequest> request) {

const int num_fields = static_cast<int>(_state->file_schema.size());
for (const auto& column_filter : request_snapshot->column_predicate_filters) {
const auto file_column_id = column_filter.file_column_id;
const auto file_column_id = column_filter.effective_file_column_id();
if (!file_column_id.is_valid() || file_column_id.value() >= num_fields) {
return Status::InvalidArgument("Invalid parquet filter top-level local id {}",
file_column_id.value());
Expand Down
8 changes: 4 additions & 4 deletions be/src/format_v2/parquet/parquet_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,15 +564,15 @@ const ParquetColumnSchema* find_child_schema_by_local_id(const ParquetColumnSche
const ParquetColumnSchema* ParquetStatisticsUtils::ResolvePredicateLeafSchema(
const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
const format::FileColumnPredicateFilter& column_filter) {
if (!column_filter.file_column_id.is_valid() ||
column_filter.file_column_id.value() >= static_cast<int>(schema.size())) {
const auto file_column_id = column_filter.effective_file_column_id();
if (!file_column_id.is_valid() || file_column_id.value() >= static_cast<int>(schema.size())) {
return nullptr;
}
const ParquetColumnSchema* column_schema = schema[column_filter.file_column_id.value()].get();
const ParquetColumnSchema* column_schema = schema[file_column_id.value()].get();
if (column_schema == nullptr) {
return nullptr;
}
for (const auto child_local_id : column_filter.file_child_id_path) {
for (const auto child_local_id : column_filter.effective_file_child_id_path()) {
column_schema = find_child_schema_by_local_id(*column_schema, child_local_id);
if (column_schema == nullptr) {
return nullptr;
Expand Down
41 changes: 35 additions & 6 deletions be/src/format_v2/parquet/reader/arrow_leaf_reader_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,40 +354,67 @@ Status read_nested_leaf_batch(const ArrowLeafReaderContext& context, int64_t bat
}

const int16_t leaf_definition_level = context.descriptor->max_definition_level();
int64_t value_slot_count = 0;
// Arrow's RecordReader may emit value placeholders for null ancestors that are below the
// Doris materialization threshold. Those slots must still advance the payload value index;
// otherwise the next defined child level points at the placeholder instead of its real value.
auto count_value_slots = [&](int16_t slot_definition_level) {
int64_t slot_count = 0;
for (int64_t level_idx = 0; level_idx < batch->levels_written; ++level_idx) {
if (batch->def_levels[level_idx] >= slot_definition_level &&
batch->rep_levels[level_idx] <= value_slot_repetition_level) {
++slot_count;
}
}
return slot_count;
};

const int64_t value_slot_count = count_value_slots(value_slot_definition_level);
int16_t payload_slot_definition_level = value_slot_definition_level;
int64_t payload_value_slot_count = value_slot_count;
while (payload_slot_definition_level > 0 &&
payload_value_slot_count < values_written) {
--payload_slot_definition_level;
payload_value_slot_count = count_value_slots(payload_slot_definition_level);
}

int64_t leaf_value_count = 0;
for (int64_t level_idx = 0; level_idx < batch->levels_written; ++level_idx) {
if (batch->def_levels[level_idx] < value_slot_definition_level ||
batch->rep_levels[level_idx] > value_slot_repetition_level) {
continue;
}
++value_slot_count;
if (batch->def_levels[level_idx] == leaf_definition_level) {
++leaf_value_count;
}
}

enum class ValueLayout { LEVELS, VALUE_SLOTS, LEAF_VALUES };
enum class ValueLayout { LEVELS, VALUE_SLOTS, LEAF_VALUES, PAYLOAD_VALUE_SLOTS };
ValueLayout value_layout = ValueLayout::LEAF_VALUES;
if (values_written == batch->levels_written) {
value_layout = ValueLayout::LEVELS;
} else if (values_written == value_slot_count) {
value_layout = ValueLayout::VALUE_SLOTS;
} else if (values_written == leaf_value_count) {
value_layout = ValueLayout::LEAF_VALUES;
} else if (values_written == payload_value_slot_count) {
value_layout = ValueLayout::PAYLOAD_VALUE_SLOTS;
} else {
return Status::Corruption(
"Nested parquet reader returned inconsistent value count for column {}: values={}, "
"levels={}, slots={}, leaf_values={}",
"levels={}, slots={}, leaf_values={}, payload_slots={}, "
"payload_slot_definition_level={}",
context.column_name(), values_written, batch->levels_written, value_slot_count,
leaf_value_count);
leaf_value_count, payload_value_slot_count, payload_slot_definition_level);
}

batch->value_indices.resize(static_cast<size_t>(batch->levels_written), -1);
NullMap value_nulls(static_cast<size_t>(values_written), 1);
int64_t value_idx = 0;
const int16_t decoded_slot_definition_level =
value_layout == ValueLayout::PAYLOAD_VALUE_SLOTS ? payload_slot_definition_level
: value_slot_definition_level;
for (int64_t level_idx = 0; level_idx < batch->levels_written; ++level_idx) {
if (batch->def_levels[level_idx] < value_slot_definition_level ||
if (batch->def_levels[level_idx] < decoded_slot_definition_level ||
batch->rep_levels[level_idx] > value_slot_repetition_level) {
continue;
}
Expand All @@ -397,6 +424,8 @@ Status read_nested_leaf_batch(const ArrowLeafReaderContext& context, int64_t bat
decoded_value_idx = level_idx;
} else if (value_layout == ValueLayout::VALUE_SLOTS) {
decoded_value_idx = value_idx++;
} else if (value_layout == ValueLayout::PAYLOAD_VALUE_SLOTS) {
decoded_value_idx = value_idx++;
} else {
if (!has_leaf_value) {
continue;
Expand Down
Loading
Loading