Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions crates/iceberg/src/arrow/reader/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ impl FileScanTaskReader {
.next()
.is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());

// Position-based fallback applies only when the file has no embedded field IDs
// AND no name mapping is available. With a name mapping, field IDs are assigned
// to the Arrow schema below, and projection/predicate planning must use them
// (see #2403).
let use_position_fallback = missing_field_ids && task.name_mapping.is_none();

// Three-branch schema resolution strategy matching Java's ReadConf constructor
//
// Per Iceberg spec Column Projection rules:
Expand Down Expand Up @@ -214,15 +220,16 @@ impl FileScanTaskReader {
.collect();

// Create projection mask based on field IDs
// - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
// - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
// - If fallback IDs: position-based projection (missing_field_ids=true)
// - If file has embedded IDs: field-ID-based projection
// - If name mapping applied: field-ID-based projection using the IDs the name
// mapping assigned to the Arrow schema
// - Otherwise: position-based fallback projection
let projection_mask = ArrowReader::get_arrow_projection_mask(
&project_field_ids_without_metadata,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
use_position_fallback, // Whether to use position-based (true) or field-ID-based (false) projection
)?;

record_batch_stream_builder =
Expand Down Expand Up @@ -302,7 +309,9 @@ impl FileScanTaskReader {
if let Some(predicate) = final_predicate {
let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
&predicate,
use_position_fallback,
)?;

let row_filter = ArrowReader::get_row_filter(
Expand Down
247 changes: 242 additions & 5 deletions crates/iceberg/src/arrow/reader/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ use crate::{Error, ErrorKind};
impl ArrowReader {
pub(super) fn build_field_id_set_and_map(
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
predicate: &BoundPredicate,
use_position_fallback: bool,
) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
// Collects all Iceberg field IDs referenced in the filter predicate
let mut collector = CollectFieldIdVisitor {
Expand All @@ -48,10 +50,13 @@ impl ArrowReader {

let iceberg_field_ids = collector.field_ids();

// Without embedded field IDs, we fall back to position-based mapping for compatibility
let field_id_map = match build_field_id_map(parquet_schema)? {
Some(map) => map,
None => build_fallback_field_id_map(parquet_schema),
// No embedded field IDs and no name mapping: position-based fallback
None if use_position_fallback => build_fallback_field_id_map(parquet_schema),
// No embedded field IDs, but a name mapping assigned them to the Arrow
// schema: resolve columns through the mapped Arrow field-id metadata
None => build_field_id_map_from_arrow_schema(arrow_schema),
};

Ok((iceberg_field_ids, field_id_map))
Expand Down Expand Up @@ -84,7 +89,7 @@ impl ArrowReader {
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark)
use_fallback: bool, // Position-based fallback: file lacks embedded field IDs and no name mapping assigned any
) -> Result<ProjectionMask> {
fn type_promotion_is_valid(
file_type: Option<&PrimitiveType>,
Expand Down Expand Up @@ -308,6 +313,30 @@ pub(super) fn build_fallback_field_id_map(
column_map
}

/// Builds a mapping from field IDs to leaf column indices using the field-id metadata
/// carried by the Arrow schema.
///
/// Used for Parquet files without embedded field IDs when a name mapping has assigned
/// IDs to the Arrow schema (see [`apply_name_mapping_to_arrow_schema`]): the Parquet
/// schema descriptor itself still has no IDs, but the Arrow leaves are flattened in the
/// same depth-first order as Parquet leaf columns, so the Arrow leaf index lines up with
/// the Parquet column index. Columns the mapping did not match carry no field-id
/// metadata and are simply absent from the map.
fn build_field_id_map_from_arrow_schema(arrow_schema: &ArrowSchemaRef) -> HashMap<i32, usize> {
let mut column_map = HashMap::new();
arrow_schema.fields().filter_leaves(|idx, field| {
if let Some(field_id) = field
.metadata()
.get(PARQUET_FIELD_ID_META_KEY)
.and_then(|value| i32::from_str(value).ok())
{
column_map.insert(field_id, idx);
}
false
});
column_map
}

/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
///
/// Assigns Iceberg field IDs based on column names using the name mapping,
Expand Down Expand Up @@ -421,7 +450,7 @@ mod tests {
use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::{ArrayRef, RecordBatch, StringArray};
use arrow_array::{Array, ArrayRef, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use futures::TryStreamExt;
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY, ProjectionMask};
Expand All @@ -435,7 +464,9 @@ mod tests {
use crate::expr::{Bind, Reference};
use crate::io::FileIO;
use crate::scan::{FileScanTask, FileScanTaskStream};
use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type};
use crate::spec::{
DataFileFormat, Datum, MappedField, NameMapping, NestedField, PrimitiveType, Schema, Type,
};
use crate::{ErrorKind, Runtime};

#[test]
Expand Down Expand Up @@ -721,6 +752,212 @@ message schema {
assert_eq!(age_array.value(2), 35);
}

/// Regression test for #2403: when a Parquet file lacks embedded field IDs but a
/// name mapping is present, projection must use the field IDs assigned by the name
/// mapping — not the position-based fallback (field_id N → column N-1).
///
/// The scenario uses a file whose physical column order does not line up with the
/// Iceberg field IDs: physical columns are `[name, subdept]` while the mapping
/// assigns `name → 2` and `subdept → 4`. The position fallback would project
/// field 2 from physical column 1 (`subdept`) and drop field 4 (column 3 is out of
/// range), silently misreading data.
#[tokio::test]
async fn test_read_parquet_with_name_mapping_uses_mapped_field_ids() {
// Iceberg schema: physical file order does NOT match field-id order.
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(4, "subdept", Type::Primitive(PrimitiveType::String))
.into(),
])
.build()
.unwrap(),
);

// Migrated Parquet file: no field-id metadata, physical columns [name, subdept].
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("name", DataType::Utf8, true),
Field::new("subdept", DataType::Utf8, true),
]));

let name_mapping = Arc::new(NameMapping::new(vec![
MappedField::new(Some(2), vec!["name".to_string()], vec![]),
MappedField::new(Some(4), vec!["subdept".to_string()], vec![]),
]));

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();

let name_col = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) as ArrayRef;
let subdept_col = Arc::new(StringArray::from(vec!["comms", "tax", "audit"])) as ArrayRef;

let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![name_col, subdept_col]).unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();

let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build();

let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask::builder()
.with_file_size_in_bytes(
std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
)
.with_start(0)
.with_length(0)
.with_data_file_path(format!("{table_location}/1.parquet"))
.with_data_file_format(DataFileFormat::Parquet)
.with_schema(schema.clone())
.with_project_field_ids(vec![2, 4])
.with_case_sensitive(false)
.with_name_mapping(Some(name_mapping))
.build())]
.into_iter(),
)) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.stream()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 2);

// field 2 (`name`) must come from physical column 0, not be NULL-filled.
let name_array = batch.column(0).as_string::<i32>();
assert_eq!(
name_array.null_count(),
0,
"`name` was NULL-filled: name mapping was ignored and position fallback was used"
);
assert_eq!(name_array.value(0), "Alice");
assert_eq!(name_array.value(1), "Bob");
assert_eq!(name_array.value(2), "Charlie");

// field 4 (`subdept`) must come from physical column 1.
let subdept_array = batch.column(1).as_string::<i32>();
assert_eq!(subdept_array.null_count(), 0);
assert_eq!(subdept_array.value(0), "comms");
assert_eq!(subdept_array.value(1), "tax");
assert_eq!(subdept_array.value(2), "audit");
}

/// Regression test for #2403, predicate side: with a name mapping present, predicate
/// pushdown must resolve field IDs via the mapping rather than the position fallback,
/// which would evaluate the filter against the wrong physical column.
#[tokio::test]
async fn test_predicate_on_name_mapped_file_uses_mapped_field_ids() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(4, "subdept", Type::Primitive(PrimitiveType::String))
.into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("name", DataType::Utf8, true),
Field::new("subdept", DataType::Utf8, true),
]));

let name_mapping = Arc::new(NameMapping::new(vec![
MappedField::new(Some(2), vec!["name".to_string()], vec![]),
MappedField::new(Some(4), vec!["subdept".to_string()], vec![]),
]));

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();

// Engineered so that filtering the wrong physical column yields different rows:
// `name` and `subdept` both contain the value "Alice", on different rows.
let name_col = Arc::new(StringArray::from(vec!["Alice", "Bob", "Sue"])) as ArrayRef;
let subdept_col = Arc::new(StringArray::from(vec!["Bob", "Alice", "Alice"])) as ArrayRef;

let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![name_col, subdept_col]).unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();

let predicate = Reference::new("name").equal_to(Datum::string("Alice"));

let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
.with_row_group_filtering_enabled(true)
.with_row_selection_enabled(true)
.build();

let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask::builder()
.with_file_size_in_bytes(
std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
)
.with_start(0)
.with_length(0)
.with_data_file_path(format!("{table_location}/1.parquet"))
.with_data_file_format(DataFileFormat::Parquet)
.with_schema(schema.clone())
.with_project_field_ids(vec![2, 4])
.with_case_sensitive(false)
.with_name_mapping(Some(name_mapping))
.with_predicate(Some(predicate.bind(schema, true).unwrap()))
.build())]
.into_iter(),
)) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.stream()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows, 1,
"filter `name = \"Alice\"` matched the wrong rows: predicate was evaluated \
against the wrong physical column"
);

let batch = &result[0];
let name_array = batch.column(0).as_string::<i32>();
assert_eq!(name_array.value(0), "Alice");
let subdept_array = batch.column(1).as_string::<i32>();
assert_eq!(subdept_array.value(0), "Bob");
}

/// Test reading Parquet files without field IDs with partial projection.
/// Only a subset of columns are requested, verifying position-based fallback
/// handles column selection correctly.
Expand Down
Loading