diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index 57c614575a..ef38bc8b7c 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -133,6 +133,12 @@ impl FileScanTaskReader { .next() .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()); + // Position-based fallback applies only when the file has no embedded field IDs + // AND no name mapping is available. With a name mapping, field IDs are assigned + // to the Arrow schema below, and projection/predicate planning must use them + // (see #2403). + let use_position_fallback = missing_field_ids && task.name_mapping.is_none(); + // Three-branch schema resolution strategy matching Java's ReadConf constructor // // Per Iceberg spec Column Projection rules: @@ -214,15 +220,16 @@ impl FileScanTaskReader { .collect(); // Create projection mask based on field IDs - // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) - // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) - // - If fallback IDs: position-based projection (missing_field_ids=true) + // - If file has embedded IDs: field-ID-based projection + // - If name mapping applied: field-ID-based projection using the IDs the name + // mapping assigned to the Arrow schema + // - Otherwise: position-based fallback projection let projection_mask = ArrowReader::get_arrow_projection_mask( &project_field_ids_without_metadata, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), - missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection + use_position_fallback, // Whether to use position-based (true) or field-ID-based (false) projection )?; record_batch_stream_builder = @@ -302,7 +309,9 @@ impl FileScanTaskReader { if let Some(predicate) = final_predicate { let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map( record_batch_stream_builder.parquet_schema(), + record_batch_stream_builder.schema(), &predicate, + use_position_fallback, )?; let row_filter = ArrowReader::get_row_filter( diff --git a/crates/iceberg/src/arrow/reader/projection.rs b/crates/iceberg/src/arrow/reader/projection.rs index 450affd450..aad3123f0a 100644 --- a/crates/iceberg/src/arrow/reader/projection.rs +++ b/crates/iceberg/src/arrow/reader/projection.rs @@ -38,7 +38,9 @@ use crate::{Error, ErrorKind}; impl ArrowReader { pub(super) fn build_field_id_set_and_map( parquet_schema: &SchemaDescriptor, + arrow_schema: &ArrowSchemaRef, predicate: &BoundPredicate, + use_position_fallback: bool, ) -> Result<(HashSet, HashMap)> { // Collects all Iceberg field IDs referenced in the filter predicate let mut collector = CollectFieldIdVisitor { @@ -48,10 +50,13 @@ impl ArrowReader { let iceberg_field_ids = collector.field_ids(); - // Without embedded field IDs, we fall back to position-based mapping for compatibility let field_id_map = match build_field_id_map(parquet_schema)? { Some(map) => map, - None => build_fallback_field_id_map(parquet_schema), + // No embedded field IDs and no name mapping: position-based fallback + None if use_position_fallback => build_fallback_field_id_map(parquet_schema), + // No embedded field IDs, but a name mapping assigned them to the Arrow + // schema: resolve columns through the mapped Arrow field-id metadata + None => build_field_id_map_from_arrow_schema(arrow_schema), }; Ok((iceberg_field_ids, field_id_map)) @@ -84,7 +89,7 @@ impl ArrowReader { iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, arrow_schema: &ArrowSchemaRef, - use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark) + use_fallback: bool, // Position-based fallback: file lacks embedded field IDs and no name mapping assigned any ) -> Result { fn type_promotion_is_valid( file_type: Option<&PrimitiveType>, @@ -308,6 +313,30 @@ pub(super) fn build_fallback_field_id_map( column_map } +/// Builds a mapping from field IDs to leaf column indices using the field-id metadata +/// carried by the Arrow schema. +/// +/// Used for Parquet files without embedded field IDs when a name mapping has assigned +/// IDs to the Arrow schema (see [`apply_name_mapping_to_arrow_schema`]): the Parquet +/// schema descriptor itself still has no IDs, but the Arrow leaves are flattened in the +/// same depth-first order as Parquet leaf columns, so the Arrow leaf index lines up with +/// the Parquet column index. Columns the mapping did not match carry no field-id +/// metadata and are simply absent from the map. +fn build_field_id_map_from_arrow_schema(arrow_schema: &ArrowSchemaRef) -> HashMap { + let mut column_map = HashMap::new(); + arrow_schema.fields().filter_leaves(|idx, field| { + if let Some(field_id) = field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|value| i32::from_str(value).ok()) + { + column_map.insert(field_id, idx); + } + false + }); + column_map +} + /// Apply name mapping to Arrow schema for Parquet files lacking field IDs. /// /// Assigns Iceberg field IDs based on column names using the name mapping, @@ -421,7 +450,7 @@ mod tests { use std::sync::Arc; use arrow_array::cast::AsArray; - use arrow_array::{ArrayRef, RecordBatch, StringArray}; + use arrow_array::{Array, ArrayRef, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use futures::TryStreamExt; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY, ProjectionMask}; @@ -435,7 +464,9 @@ mod tests { use crate::expr::{Bind, Reference}; use crate::io::FileIO; use crate::scan::{FileScanTask, FileScanTaskStream}; - use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type}; + use crate::spec::{ + DataFileFormat, Datum, MappedField, NameMapping, NestedField, PrimitiveType, Schema, Type, + }; use crate::{ErrorKind, Runtime}; #[test] @@ -721,6 +752,212 @@ message schema { assert_eq!(age_array.value(2), 35); } + /// Regression test for #2403: when a Parquet file lacks embedded field IDs but a + /// name mapping is present, projection must use the field IDs assigned by the name + /// mapping — not the position-based fallback (field_id N → column N-1). + /// + /// The scenario uses a file whose physical column order does not line up with the + /// Iceberg field IDs: physical columns are `[name, subdept]` while the mapping + /// assigns `name → 2` and `subdept → 4`. The position fallback would project + /// field 2 from physical column 1 (`subdept`) and drop field 4 (column 3 is out of + /// range), silently misreading data. + #[tokio::test] + async fn test_read_parquet_with_name_mapping_uses_mapped_field_ids() { + // Iceberg schema: physical file order does NOT match field-id order. + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(4, "subdept", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(), + ); + + // Migrated Parquet file: no field-id metadata, physical columns [name, subdept]. + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, true), + Field::new("subdept", DataType::Utf8, true), + ])); + + let name_mapping = Arc::new(NameMapping::new(vec![ + MappedField::new(Some(2), vec!["name".to_string()], vec![]), + MappedField::new(Some(4), vec!["subdept".to_string()], vec![]), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::new_with_fs(); + + let name_col = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) as ArrayRef; + let subdept_col = Arc::new(StringArray::from(vec!["comms", "tax", "audit"])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![name_col, subdept_col]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(format!("{table_location}/1.parquet")).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask::builder() + .with_file_size_in_bytes( + std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), + ) + .with_start(0) + .with_length(0) + .with_data_file_path(format!("{table_location}/1.parquet")) + .with_data_file_format(DataFileFormat::Parquet) + .with_schema(schema.clone()) + .with_project_field_ids(vec![2, 4]) + .with_case_sensitive(false) + .with_name_mapping(Some(name_mapping)) + .build())] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .stream() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 2); + + // field 2 (`name`) must come from physical column 0, not be NULL-filled. + let name_array = batch.column(0).as_string::(); + assert_eq!( + name_array.null_count(), + 0, + "`name` was NULL-filled: name mapping was ignored and position fallback was used" + ); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + assert_eq!(name_array.value(2), "Charlie"); + + // field 4 (`subdept`) must come from physical column 1. + let subdept_array = batch.column(1).as_string::(); + assert_eq!(subdept_array.null_count(), 0); + assert_eq!(subdept_array.value(0), "comms"); + assert_eq!(subdept_array.value(1), "tax"); + assert_eq!(subdept_array.value(2), "audit"); + } + + /// Regression test for #2403, predicate side: with a name mapping present, predicate + /// pushdown must resolve field IDs via the mapping rather than the position fallback, + /// which would evaluate the filter against the wrong physical column. + #[tokio::test] + async fn test_predicate_on_name_mapped_file_uses_mapped_field_ids() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(4, "subdept", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, true), + Field::new("subdept", DataType::Utf8, true), + ])); + + let name_mapping = Arc::new(NameMapping::new(vec![ + MappedField::new(Some(2), vec!["name".to_string()], vec![]), + MappedField::new(Some(4), vec!["subdept".to_string()], vec![]), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::new_with_fs(); + + // Engineered so that filtering the wrong physical column yields different rows: + // `name` and `subdept` both contain the value "Alice", on different rows. + let name_col = Arc::new(StringArray::from(vec!["Alice", "Bob", "Sue"])) as ArrayRef; + let subdept_col = Arc::new(StringArray::from(vec!["Bob", "Alice", "Alice"])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![name_col, subdept_col]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(format!("{table_location}/1.parquet")).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let predicate = Reference::new("name").equal_to(Datum::string("Alice")); + + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true) + .build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask::builder() + .with_file_size_in_bytes( + std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), + ) + .with_start(0) + .with_length(0) + .with_data_file_path(format!("{table_location}/1.parquet")) + .with_data_file_format(DataFileFormat::Parquet) + .with_schema(schema.clone()) + .with_project_field_ids(vec![2, 4]) + .with_case_sensitive(false) + .with_name_mapping(Some(name_mapping)) + .with_predicate(Some(predicate.bind(schema, true).unwrap())) + .build())] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .stream() + .try_collect::>() + .await + .unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, 1, + "filter `name = \"Alice\"` matched the wrong rows: predicate was evaluated \ + against the wrong physical column" + ); + + let batch = &result[0]; + let name_array = batch.column(0).as_string::(); + assert_eq!(name_array.value(0), "Alice"); + let subdept_array = batch.column(1).as_string::(); + assert_eq!(subdept_array.value(0), "Bob"); + } + /// Test reading Parquet files without field IDs with partial projection. /// Only a subset of columns are requested, verifying position-based fallback /// handles column selection correctly.