diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index bf2558f313069..d4c6a45f86c72 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -15,10 +15,17 @@ // specific language governing permissions and limitations // under the License. -use crate::error::{_plan_err, Result}; +use crate::{ + cast::as_struct_array, + error::{_plan_err, DataFusionError, Result}, +}; use arrow::{ - array::{Array, ArrayRef, StructArray, new_null_array}, - compute::{CastOptions, cast_with_options}, + array::OffsetSizeTrait, + array::{ + Array, ArrayRef, FixedSizeListArray, GenericListArray, MapArray, StructArray, + new_null_array, + }, + compute::{CastOptions, can_cast_types, cast_with_options}, datatypes::{DataType, DataType::Struct, Field, FieldRef}, }; use std::{collections::HashSet, sync::Arc}; @@ -55,50 +62,38 @@ fn cast_struct_column( target_fields: &[Arc], cast_options: &CastOptions, ) -> Result { - if source_col.data_type() == &DataType::Null - || (!source_col.is_empty() && source_col.null_count() == source_col.len()) - { - return Ok(new_null_array( - &Struct(target_fields.to_vec().into()), - source_col.len(), - )); - } - if let Some(source_struct) = source_col.as_any().downcast_ref::() { - let source_fields = source_struct.fields(); - validate_struct_compatibility(source_fields, target_fields)?; - let mut fields: Vec> = Vec::with_capacity(target_fields.len()); - let mut arrays: Vec = Vec::with_capacity(target_fields.len()); + validate_struct_compatibility(source_struct.fields(), target_fields)?; let num_rows = source_col.len(); - - // Iterate target fields and pick source child by name when present. - for target_child_field in target_fields.iter() { - fields.push(Arc::clone(target_child_field)); - - let source_child_opt = - source_struct.column_by_name(target_child_field.name()); - - match source_child_opt { - Some(source_child_col) => { - let adapted_child = - cast_column(source_child_col, target_child_field, cast_options) + let arrays = target_fields + .iter() + .map(|target_child_field| { + source_struct + .column_by_name(target_child_field.name()) + .map_or_else( + || Ok(new_null_array(target_child_field.data_type(), num_rows)), + |source_child_col| { + cast_column( + source_child_col, + target_child_field, + cast_options, + ) .map_err(|e| { - e.context(format!( - "While casting struct field '{}'", - target_child_field.name() - )) - })?; - arrays.push(adapted_child); - } - None => { - arrays.push(new_null_array(target_child_field.data_type(), num_rows)); - } - } - } - - let struct_array = - StructArray::new(fields.into(), arrays, source_struct.nulls().cloned()); - Ok(Arc::new(struct_array)) + e.context(format!( + "While casting struct field '{}'", + target_child_field.name() + )) + }) + }, + ) + }) + .collect::>>()?; + + Ok(Arc::new(StructArray::new( + target_fields.to_vec().into(), + arrays, + source_struct.nulls().cloned(), + ))) } else { // Return error if source is not a struct type _plan_err!( @@ -108,6 +103,84 @@ fn cast_struct_column( } } +fn downcast_source_column<'a, T: 'static>( + source_col: &'a ArrayRef, + target_type: &DataType, + kind: &str, +) -> Result<&'a T> { + source_col.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Plan(format!( + "Cannot cast column of type {} to {kind} type {}", + source_col.data_type(), + target_type + )) + }) +} + +fn cast_list_column( + source_col: &ArrayRef, + target_field: &FieldRef, + cast_options: &CastOptions, +) -> Result { + let list = downcast_source_column::>( + source_col, + target_field.data_type(), + "list", + )?; + let values = cast_column(list.values(), target_field.as_ref(), cast_options)?; + Ok(Arc::new(GenericListArray::::try_new( + Arc::clone(target_field), + list.offsets().clone(), + values, + list.nulls().cloned(), + )?)) +} + +fn cast_fixed_size_list_column( + source_col: &ArrayRef, + target_field: &FieldRef, + target_size: i32, + cast_options: &CastOptions, +) -> Result { + let list = downcast_source_column::( + source_col, + target_field.data_type(), + "fixed-size list", + )?; + let values = cast_column(list.values(), target_field.as_ref(), cast_options)?; + Ok(Arc::new(FixedSizeListArray::try_new( + Arc::clone(target_field), + target_size, + values, + list.nulls().cloned(), + )?)) +} + +fn cast_map_column( + source_col: &ArrayRef, + target_entries_field: &FieldRef, + sorted: bool, + cast_options: &CastOptions, +) -> Result { + let map = downcast_source_column::( + source_col, + &DataType::Map(Arc::clone(target_entries_field), sorted), + "map", + )?; + let values = cast_column( + &(Arc::new(map.entries().clone()) as ArrayRef), + target_entries_field.as_ref(), + cast_options, + )?; + Ok(Arc::new(MapArray::try_new( + Arc::clone(target_entries_field), + map.offsets().clone(), + as_struct_array(values.as_ref())?.clone(), + map.nulls().cloned(), + sorted, + )?)) +} + /// Cast a column to match the target field type, with special handling for nested structs. /// /// This function serves as the main entry point for column casting operations. For struct @@ -168,10 +241,33 @@ pub fn cast_column( target_field: &Field, cast_options: &CastOptions, ) -> Result { + if source_col.data_type() == &DataType::Null + || (!source_col.is_empty() && source_col.null_count() == source_col.len()) + { + return Ok(new_null_array(target_field.data_type(), source_col.len())); + } + match target_field.data_type() { Struct(target_fields) => { cast_struct_column(source_col, target_fields, cast_options) } + DataType::List(target_field) => { + cast_list_column::(source_col, target_field, cast_options) + } + DataType::LargeList(target_field) => { + cast_list_column::(source_col, target_field, cast_options) + } + DataType::FixedSizeList(target_field, target_size) => { + cast_fixed_size_list_column( + source_col, + target_field, + *target_size, + cast_options, + ) + } + DataType::Map(target_entries_field, sorted) => { + cast_map_column(source_col, target_entries_field, *sorted, cast_options) + } _ => Ok(cast_with_options( source_col, target_field.data_type(), @@ -220,8 +316,7 @@ pub fn validate_struct_compatibility( source_fields: &[FieldRef], target_fields: &[FieldRef], ) -> Result<()> { - let has_overlap = has_one_of_more_common_fields(source_fields, target_fields); - if !has_overlap { + if !has_one_of_more_common_fields(source_fields, target_fields) { return _plan_err!( "Cannot cast struct with {} fields to {} fields because there is no field name overlap", source_fields.len(), @@ -229,29 +324,25 @@ pub fn validate_struct_compatibility( ); } - // Check compatibility for each target field - for target_field in target_fields { - // Look for matching field in source by name - if let Some(source_field) = source_fields + target_fields.iter().try_for_each(|target_field| { + source_fields .iter() .find(|f| f.name() == target_field.name()) - { - validate_field_compatibility(source_field, target_field)?; - } else { - // Target field is missing from source - // If it's non-nullable, we cannot fill it with NULL - if !target_field.is_nullable() { - return _plan_err!( - "Cannot cast struct: target field '{}' is non-nullable but missing from source. \ - Cannot fill with NULL.", - target_field.name() - ); - } - } - } - - // Extra fields in source are OK - they'll be ignored - Ok(()) + .map_or_else( + || { + if target_field.is_nullable() { + Ok(()) + } else { + _plan_err!( + "Cannot cast struct: target field '{}' is non-nullable but missing from source. \ + Cannot fill with NULL.", + target_field.name() + ) + } + }, + |source_field| validate_field_compatibility(source_field, target_field), + ) + }) } fn validate_field_compatibility( @@ -281,29 +372,71 @@ fn validate_field_compatibility( ); } - // Check if the matching field types are compatible - match (source_field.data_type(), target_field.data_type()) { - // Recursively validate nested structs - (Struct(source_nested), Struct(target_nested)) => { - validate_struct_compatibility(source_nested, target_nested)?; + validate_data_type_compatibility( + source_field.data_type(), + target_field.data_type(), + target_field.name(), + ) +} + +/// Validates compatibility between source and target data types for schema evolution. +/// +/// This extends struct-aware compatibility checks through container types that can +/// wrap evolved structs, such as `List`, `LargeList`, +/// `FixedSizeList`, and `Map<_, Struct>`. +pub fn validate_data_type_compatibility( + source_type: &DataType, + target_type: &DataType, + field_name: &str, +) -> Result<()> { + match (source_type, target_type) { + (Struct(source_fields), Struct(target_fields)) => { + validate_struct_compatibility(source_fields, target_fields) + } + (DataType::List(source_field), DataType::List(target_field)) + | (DataType::LargeList(source_field), DataType::LargeList(target_field)) => { + validate_field_compatibility(source_field, target_field) } - // For non-struct types, use the existing castability check - _ => { - if !arrow::compute::can_cast_types( - source_field.data_type(), - target_field.data_type(), - ) { - return _plan_err!( - "Cannot cast struct field '{}' from type {} to type {}", - target_field.name(), - source_field.data_type(), - target_field.data_type() - ); - } + ( + DataType::FixedSizeList(source_field, source_size), + DataType::FixedSizeList(target_field, target_size), + ) if source_size == target_size => { + validate_field_compatibility(source_field, target_field) } + ( + DataType::Map(source_entries, source_sorted), + DataType::Map(target_entries, target_sorted), + ) if source_sorted == target_sorted => { + validate_field_compatibility(source_entries, target_entries) + } + _ if !requires_recursive_compatibility_validation(source_type, target_type) + && can_cast_types(source_type, target_type) => + { + Ok(()) + } + _ => _plan_err!( + "Cannot cast field '{}' from type {} to type {}", + field_name, + source_type, + target_type + ), } +} - Ok(()) +/// Returns `true` when schema-evolution compatibility should recurse through the +/// datatype rather than relying on Arrow's generic cast compatibility. +pub fn requires_recursive_compatibility_validation( + source_type: &DataType, + target_type: &DataType, +) -> bool { + matches!( + (source_type, target_type), + (Struct(_), Struct(_)) + | (DataType::List(_), DataType::List(_)) + | (DataType::LargeList(_), DataType::LargeList(_)) + | (DataType::FixedSizeList(_, _), DataType::FixedSizeList(_, _)) + | (DataType::Map(_, _), DataType::Map(_, _)) + ) } /// Check if two field lists have at least one common field by name. @@ -330,10 +463,10 @@ mod tests { use crate::{assert_contains, format::DEFAULT_CAST_OPTIONS}; use arrow::{ array::{ - BinaryArray, Int32Array, Int32Builder, Int64Array, ListArray, MapArray, - MapBuilder, NullArray, StringArray, StringBuilder, + BinaryArray, BooleanArray, Int32Array, Int32Builder, Int64Array, ListArray, + MapArray, MapBuilder, NullArray, StringArray, StringBuilder, }, - buffer::NullBuffer, + buffer::{NullBuffer, OffsetBuffer}, datatypes::{DataType, Field, FieldRef, Int32Type}, }; /// Macro to extract and downcast a column from a StructArray @@ -463,7 +596,7 @@ mod tests { let result = cast_column(&source_col, &target_field, &DEFAULT_CAST_OPTIONS); assert!(result.is_err()); let error_msg = result.unwrap_err().to_string(); - assert!(error_msg.contains("Cannot cast struct field 'a'")); + assert!(error_msg.contains("Cannot cast field 'a'")); } #[test] @@ -480,7 +613,7 @@ mod tests { let result = validate_struct_compatibility(&source_fields, &target_fields); assert!(result.is_err()); let error_msg = result.unwrap_err().to_string(); - assert!(error_msg.contains("Cannot cast struct field 'field1'")); + assert!(error_msg.contains("Cannot cast field 'field1'")); assert!(error_msg.contains("Binary")); assert!(error_msg.contains("Int32")); } @@ -647,7 +780,7 @@ mod tests { let error_msg = result.unwrap_err().to_string(); assert_contains!( error_msg, - "Cannot cast struct field 'field1' from type Binary to type Int32" + "Cannot cast field 'field1' from type Binary to type Int32" ); } @@ -727,6 +860,150 @@ mod tests { assert!(result.is_ok()); } + #[test] + fn test_validate_data_type_compatibility_list_of_struct_with_missing_nullable_field() + { + let source_type = DataType::List(Arc::new(field( + "item", + struct_type(vec![ + field("type", DataType::Utf8), + field("token", DataType::Utf8), + field("amount", DataType::Int64), + ]), + ))); + let target_type = DataType::List(Arc::new(field( + "item", + struct_type(vec![ + field("type", DataType::Utf8), + field("token", DataType::Utf8), + field("amount", DataType::Int64), + field("chain", DataType::Utf8), + ]), + ))); + + let result = + validate_data_type_compatibility(&source_type, &target_type, "messages"); + assert!(result.is_ok()); + } + + #[test] + fn test_validate_data_type_compatibility_fixed_size_list_size_mismatch() { + let source_type = + DataType::FixedSizeList(Arc::new(field("item", DataType::Int32)), 2); + let target_type = + DataType::FixedSizeList(Arc::new(field("item", DataType::Int32)), 3); + + let result = + validate_data_type_compatibility(&source_type, &target_type, "items"); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert_contains!( + error_msg, + "Cannot cast field 'items' from type FixedSizeList" + ); + } + + #[test] + fn test_validate_data_type_compatibility_map_entries_nested_struct() { + let source_type = DataType::Map( + Arc::new(non_null_field( + "entries", + struct_type(vec![ + non_null_field("keys", DataType::Utf8), + field( + "values", + struct_type(vec![ + field("type", DataType::Utf8), + field("token", DataType::Utf8), + ]), + ), + ]), + )), + false, + ); + let target_type = DataType::Map( + Arc::new(non_null_field( + "entries", + struct_type(vec![ + non_null_field("keys", DataType::Utf8), + field( + "values", + struct_type(vec![ + field("type", DataType::Utf8), + field("token", DataType::Utf8), + field("chain", DataType::Utf8), + ]), + ), + ]), + )), + false, + ); + + let result = validate_data_type_compatibility(&source_type, &target_type, "map"); + assert!(result.is_ok()); + } + + #[test] + fn test_cast_list_of_struct_with_missing_nullable_field() { + let list_values = Arc::new(StructArray::from(vec![ + ( + arc_field("id", DataType::Int32), + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef, + ), + ( + arc_field("name", DataType::Utf8), + Arc::new(StringArray::from(vec![ + Some("alpha"), + Some("beta"), + Some("gamma"), + ])) as ArrayRef, + ), + ])) as ArrayRef; + let source_col = Arc::new(ListArray::new( + Arc::new(field( + "item", + struct_type(vec![ + field("id", DataType::Int32), + field("name", DataType::Utf8), + ]), + )), + OffsetBuffer::new(vec![0, 2, 3].into()), + list_values, + None, + )) as ArrayRef; + + let target_field = field( + "messages", + DataType::List(Arc::new(field( + "item", + struct_type(vec![ + field("id", DataType::Int64), + field("name", DataType::Utf8), + field("extra", DataType::Boolean), + ]), + ))), + ); + + let result = + cast_column(&source_col, &target_field, &DEFAULT_CAST_OPTIONS).unwrap(); + let list_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(list_array.len(), 2); + + let values = list_array.values(); + let values = values.as_any().downcast_ref::().unwrap(); + let id_values = get_column_as!(values, "id", Int64Array); + assert_eq!( + id_values.iter().collect::>(), + vec![Some(1), Some(2), Some(3)] + ); + + let extra_values = get_column_as!(values, "extra", BooleanArray); + assert_eq!( + extra_values.iter().collect::>(), + vec![None, None, None] + ); + } + #[test] fn test_cast_nested_struct_with_extra_and_missing_fields() { // Source inner struct has fields a, b, extra diff --git a/datafusion/core/tests/parquet/expr_adapter.rs b/datafusion/core/tests/parquet/expr_adapter.rs index f412cdf9bd7a6..1729dbacdd062 100644 --- a/datafusion/core/tests/parquet/expr_adapter.rs +++ b/datafusion/core/tests/parquet/expr_adapter.rs @@ -18,9 +18,10 @@ use std::sync::Arc; use arrow::array::{ - Array, ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray, - StructArray, record_batch, + Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, + StringArray, StructArray, record_batch, }; +use arrow::buffer::OffsetBuffer; use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion::assert_batches_eq; @@ -462,6 +463,135 @@ async fn test_struct_schema_evolution_projection_and_filter() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_list_of_struct_schema_evolution_projection() -> Result<()> { + let list_values = Arc::new(StructArray::new( + vec![ + Arc::new(Field::new("id", DataType::Int32, false)), + Arc::new(Field::new("name", DataType::Utf8, true)), + ] + .into(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + Arc::new(StringArray::from(vec![ + Some("alpha"), + Some("beta"), + Some("gamma"), + ])) as ArrayRef, + ], + None, + )) as ArrayRef; + + let list_array = ListArray::new( + Arc::new(Field::new( + "item", + DataType::Struct( + vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ] + .into(), + ), + true, + )), + OffsetBuffer::new(vec![0, 2, 3].into()), + list_values, + None, + ); + + let physical_schema = Arc::new(Schema::new(vec![Field::new( + "messages", + list_array.data_type().clone(), + true, + )])); + + let batch = + RecordBatch::try_new(Arc::clone(&physical_schema), vec![Arc::new(list_array)])?; + + let store = Arc::new(InMemory::new()) as Arc; + let store_url = ObjectStoreUrl::parse("memory://").unwrap(); + write_parquet(batch, store.clone(), "list_struct_evolution.parquet").await; + + let logical_schema = Arc::new(Schema::new(vec![Field::new( + "messages", + DataType::List(Arc::new(Field::new( + "item", + DataType::Struct( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("extra", DataType::Boolean, true), + ] + .into(), + ), + true, + ))), + true, + )])); + + let mut cfg = SessionConfig::new() + .with_collect_statistics(false) + .with_parquet_pruning(false) + .with_parquet_page_index_pruning(false); + cfg.options_mut().execution.parquet.pushdown_filters = true; + + let ctx = SessionContext::new_with_config(cfg); + ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + + let listing_table_config = + ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) + .infer_options(&ctx.state()) + .await + .unwrap() + .with_schema(Arc::clone(&logical_schema)) + .with_expr_adapter_factory(Arc::new(DefaultPhysicalExprAdapterFactory)); + + let table = ListingTable::try_new(listing_table_config).unwrap(); + ctx.register_table("t_list_struct", Arc::new(table)) + .unwrap(); + + let batches = ctx + .sql("SELECT messages FROM t_list_struct") + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(batches.len(), 1); + + let messages = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .expect("expected list array"); + let values = messages.values(); + let values = values + .as_any() + .downcast_ref::() + .expect("expected struct values"); + + let ids = values + .column_by_name("id") + .expect("id column") + .as_any() + .downcast_ref::() + .expect("id should be cast to Int64"); + assert_eq!( + ids.iter().collect::>(), + vec![Some(1), Some(2), Some(3)] + ); + + let extra = values.column_by_name("extra").expect("extra column"); + let extra = extra + .as_any() + .downcast_ref::() + .expect("extra should be a boolean array"); + assert_eq!(extra.iter().collect::>(), vec![None, None, None]); + + Ok(()) +} + /// Test demonstrating that a single PhysicalExprAdapterFactory instance can be /// reused across multiple ListingTable instances. /// diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index a2a45cbdfe7aa..2a7a341cf204d 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -30,7 +30,9 @@ use arrow::datatypes::{DataType, Field, FieldRef, SchemaRef}; use datafusion_common::{ Result, ScalarValue, exec_err, metadata::FieldMetadata, - nested_struct::validate_struct_compatibility, + nested_struct::{ + requires_recursive_compatibility_validation, validate_data_type_compatibility, + }, tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_functions::core::getfield::GetFieldFunc; @@ -487,40 +489,39 @@ impl DefaultPhysicalExprAdapterRewriter { physical_field: FieldRef, logical_field: &Field, ) -> Result>> { - // For struct types, use validate_struct_compatibility which handles: - // - Missing fields in source (filled with nulls) - // - Extra fields in source (ignored) - // - Recursive validation of nested structs - // For non-struct types, use Arrow's can_cast_types - match (physical_field.data_type(), logical_field.data_type()) { - (DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => { - validate_struct_compatibility( - physical_fields.as_ref(), - logical_fields.as_ref(), - )?; - } - _ => { - let is_compatible = - can_cast_types(physical_field.data_type(), logical_field.data_type()); - if !is_compatible { - return exec_err!( - "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", - column.name(), - physical_field.data_type(), - logical_field.data_type() - ); - } - } + let physical_type = physical_field.data_type(); + let logical_type = logical_field.data_type(); + + if requires_recursive_compatibility_validation(physical_type, logical_type) { + validate_data_type_compatibility( + physical_type, + logical_type, + column.name(), + ) + .map_err(|err| { + datafusion_common::DataFusionError::Execution(format!( + "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type): {}", + column.name(), + physical_type, + logical_type, + err + )) + })?; + } else if !can_cast_types(physical_type, logical_type) { + return exec_err!( + "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", + column.name(), + physical_type, + logical_type + ); } - let cast_expr = Arc::new(CastColumnExpr::new( + Ok(Transformed::yes(Arc::new(CastColumnExpr::new( Arc::new(column), physical_field, Arc::new(logical_field.clone()), None, - )); - - Ok(Transformed::yes(cast_expr)) + )))) } } @@ -822,7 +823,7 @@ mod tests { // validate_struct_compatibility provides more specific error about which field can't be cast assert_contains!( error_msg, - "Cannot cast struct field 'field1' from type Binary to type Int32" + "Cannot cast field 'field1' from type Binary to type Int32" ); } @@ -892,6 +893,89 @@ mod tests { assert_eq!(result.to_string(), expected.to_string()); } + #[test] + fn test_rewrite_list_of_struct_compatible_cast() { + let physical_schema = Schema::new(vec![Field::new( + "data", + DataType::List(Arc::new(Field::new( + "item", + DataType::Struct( + vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ] + .into(), + ), + true, + ))), + true, + )]); + + let logical_schema = Schema::new(vec![Field::new( + "data", + DataType::List(Arc::new(Field::new( + "item", + DataType::Struct( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("extra", DataType::Boolean, true), + ] + .into(), + ), + true, + ))), + true, + )]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory + .create(Arc::new(logical_schema), Arc::new(physical_schema)) + .unwrap(); + let column_expr = Arc::new(Column::new("data", 0)); + + let result = adapter.rewrite(column_expr).unwrap(); + + assert!(result.as_any().downcast_ref::().is_some()); + } + + #[test] + fn test_rewrite_list_of_struct_incompatible_nested_field() { + let physical_schema = Schema::new(vec![Field::new( + "data", + DataType::List(Arc::new(Field::new( + "item", + DataType::Struct( + vec![Field::new("field1", DataType::Binary, true)].into(), + ), + true, + ))), + true, + )]); + + let logical_schema = Schema::new(vec![Field::new( + "data", + DataType::List(Arc::new(Field::new( + "item", + DataType::Struct( + vec![Field::new("field1", DataType::Int32, true)].into(), + ), + true, + ))), + true, + )]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory + .create(Arc::new(logical_schema), Arc::new(physical_schema)) + .unwrap(); + let column_expr = Arc::new(Column::new("data", 0)); + + let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string(); + assert_contains!(&error_msg, "Cannot cast column 'data'"); + assert_contains!(&error_msg, "field1"); + } + #[test] fn test_rewrite_missing_column() -> Result<()> { let (physical_schema, logical_schema) = create_test_schema();