diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index fdd04f752455e..a51c1bcc25022 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -94,7 +94,10 @@ pub use schema_reference::SchemaReference; pub use spans::{Location, Span, Spans}; pub use stats::{ColumnStatistics, Statistics}; pub use table_reference::{ResolvedTableReference, TableReference}; -pub use unnest::{RecursionUnnestOption, UnnestOptions}; +pub use unnest::{ + RecursionUnnestOption, UNNEST_PLACEHOLDER_METADATA_KEY, UnnestOptions, + is_unnest_placeholder_field, unnest_placeholder_field_metadata, +}; pub use utils::project_schema; // These are hidden from docs purely to avoid polluting the public view of what this crate exports. diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs index db48edd061605..5fc0a4c07ddf3 100644 --- a/datafusion/common/src/unnest.rs +++ b/datafusion/common/src/unnest.rs @@ -17,7 +17,33 @@ //! [`UnnestOptions`] for unnesting structured types +use std::collections::BTreeMap; + +use arrow::datatypes::Field; + use crate::Column; +use crate::metadata::FieldMetadata; + +/// Field metadata key used to mark planner-internal unnest placeholder columns. +pub const UNNEST_PLACEHOLDER_METADATA_KEY: &str = "datafusion:unnest_placeholder"; + +const UNNEST_PLACEHOLDER_METADATA_VALUE: &str = "true"; + +/// Returns metadata that marks a field as a planner-internal unnest placeholder. +pub fn unnest_placeholder_field_metadata() -> FieldMetadata { + FieldMetadata::from(BTreeMap::from([( + UNNEST_PLACEHOLDER_METADATA_KEY.to_string(), + UNNEST_PLACEHOLDER_METADATA_VALUE.to_string(), + )])) +} + +/// Returns true if the field is a planner-internal unnest placeholder. +pub fn is_unnest_placeholder_field(field: &Field) -> bool { + field + .metadata() + .get(UNNEST_PLACEHOLDER_METADATA_KEY) + .is_some_and(|value| value == UNNEST_PLACEHOLDER_METADATA_VALUE) +} /// Options for unnesting a column that contains a list type, /// replicating values in the other, non nested rows. diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index c94ab10a9e72f..8d566d2cd939a 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -18,6 +18,7 @@ // Include tests in dataframe_functions mod dataframe_functions; mod describe; +mod unnest_chunks; use arrow::array::{ Array, ArrayRef, BooleanArray, DictionaryArray, FixedSizeListArray, diff --git a/datafusion/core/tests/dataframe/unnest_chunks.rs b/datafusion/core/tests/dataframe/unnest_chunks.rs new file mode 100644 index 0000000000000..19510ba391633 --- /dev/null +++ b/datafusion/core/tests/dataframe/unnest_chunks.rs @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::*; + +const CREATE_RECURSIVE_UNNEST_TABLE: &str = "CREATE TABLE recursive_unnest_table AS VALUES \ + (struct([1], 'a'), [[[1],[2]],[[1,1]]], [struct([1],[[1,2]])]), \ + (struct([2], 'b'), [[[3,4],[5]],[[null,6],null,[7,8]]], [struct([2],[[3],[4]])])"; + +const RECURSIVE_REPEATED_REFS_QUERY: &str = "SELECT \ + unnest(column2), \ + unnest(unnest(column2)), \ + unnest(unnest(unnest(column2))), \ + unnest(unnest(unnest(column2))) + 1 \ + FROM recursive_unnest_table"; + +fn count_physical_plan_nodes_by_name( + plan: &Arc, + node_name: &str, +) -> usize { + let mut count = 0; + let mut stack = vec![Arc::clone(plan)]; + + while let Some(node) = stack.pop() { + if node.name() == node_name { + count += 1; + } + stack.extend(node.children().into_iter().cloned()); + } + + count +} + +fn batch_slicing_ctx(batch_size: usize) -> SessionContext { + batch_slicing_ctx_with_partitions(batch_size, 1) +} + +fn batch_slicing_ctx_with_partitions( + batch_size: usize, + target_partitions: usize, +) -> SessionContext { + SessionContext::new_with_config( + SessionConfig::new() + .with_batch_size(batch_size) + .with_target_partitions(target_partitions), + ) +} + +fn total_rows(results: &[RecordBatch]) -> usize { + results.iter().map(RecordBatch::num_rows).sum() +} + +fn assert_total_and_max_batch_rows( + results: &[RecordBatch], + total_rows: usize, + max_batch_rows: usize, +) { + assert_eq!(self::total_rows(results), total_rows); + assert!( + results + .iter() + .all(|batch| batch.num_rows() <= max_batch_rows) + ); +} + +fn nested_int32_batch(rows: &[&[&[i32]]]) -> Result { + let mut list_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new())); + + for row in rows { + for values in *row { + for &value in *values { + list_builder.values().values().append_value(value); + } + list_builder.values().append(true); + } + list_builder.append(true); + } + + Ok(RecordBatch::try_from_iter(vec![( + "nested", + Arc::new(list_builder.finish()) as ArrayRef, + )])?) +} + +async fn create_recursive_unnest_table(ctx: &SessionContext) -> Result<()> { + ctx.sql(CREATE_RECURSIVE_UNNEST_TABLE).await?; + Ok(()) +} + +#[tokio::test] +async fn unnest_chunks_high_fanout_batches() -> Result<()> { + let ctx = batch_slicing_ctx(4); + + let shape_ids = Arc::new(UInt32Array::from(vec![1, 2, 3])) as ArrayRef; + let tag_ids = Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(11), Some(12), Some(13)]), + Some(vec![Some(21), Some(22), Some(23)]), + Some(vec![Some(31), Some(32), Some(33)]), + ])) as ArrayRef; + + let batch = + RecordBatch::try_from_iter(vec![("shape_id", shape_ids), ("tag_id", tag_ids)])?; + + ctx.register_batch("shapes", batch)?; + + let results = ctx + .table("shapes") + .await? + .unnest_columns(&["tag_id"])? + .collect() + .await?; + + assert_eq!( + results + .iter() + .map(RecordBatch::num_rows) + .collect::>(), + vec![3, 3, 3] + ); + assert_total_and_max_batch_rows(&results, 9, 4); + + assert_snapshot!( + batches_to_sort_string(&results), + @r" + +----------+--------+ + | shape_id | tag_id | + +----------+--------+ + | 1 | 11 | + | 1 | 12 | + | 1 | 13 | + | 2 | 21 | + | 2 | 22 | + | 2 | 23 | + | 3 | 31 | + | 3 | 32 | + | 3 | 33 | + +----------+--------+ + " + ); + + Ok(()) +} + +/// Tests that the batch-slicing estimation path respects `batch_size` when +/// multiple unnested columns have different lengths per row. +/// +/// `estimate_row_output_rows` picks `max(len_col_a, len_col_b)` per row to +/// determine how many rows a batch-slice will expand to. With `batch_size=4` +/// and row expansion vectors [3, 2, 3] this should yield three output batches +/// whose sizes never exceed 4. +#[tokio::test] +async fn unnest_chunks_multi_col_different_lengths() -> Result<()> { + let ctx = batch_slicing_ctx(4); + + // id: 1, 2, 3 + // list_a: [1], [2, 3], [4, 5, 6] (lengths 1, 2, 3) + // list_b: [10, 11, 12], [20], [30, 31] (lengths 3, 1, 2) + // max lengths per row: 3, 2, 3 -> total output rows: 8 + let ids = Arc::new(UInt32Array::from(vec![1u32, 2, 3])) as ArrayRef; + let list_a = Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1)]), + Some(vec![Some(2), Some(3)]), + Some(vec![Some(4), Some(5), Some(6)]), + ])) as ArrayRef; + let list_b = Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(10), Some(11), Some(12)]), + Some(vec![Some(20)]), + Some(vec![Some(30), Some(31)]), + ])) as ArrayRef; + + let batch = RecordBatch::try_from_iter(vec![ + ("id", ids), + ("list_a", list_a), + ("list_b", list_b), + ])?; + ctx.register_batch("t", batch)?; + + let results = ctx + .table("t") + .await? + .unnest_columns(&["list_a", "list_b"])? + .collect() + .await?; + + assert_total_and_max_batch_rows(&results, 8, 4); + + assert_snapshot!( + batches_to_sort_string(&results), + @r" + +----+--------+--------+ + | id | list_a | list_b | + +----+--------+--------+ + | 1 | | 11 | + | 1 | | 12 | + | 1 | 1 | 10 | + | 2 | 2 | 20 | + | 2 | 3 | | + | 3 | 4 | 30 | + | 3 | 5 | 31 | + | 3 | 6 | | + +----+--------+--------+ + " + ); + + Ok(()) +} + +/// Tests that `preserve_nulls = false` correctly drops null-list rows when the +/// batch-slicing estimation path is active. +/// +/// `estimate_row_output_rows` returns 0 for null rows under +/// `preserve_nulls = false`, so those rows are folded into adjacent slices +/// without contributing output rows. +#[tokio::test] +async fn unnest_chunks_preserve_nulls_false() -> Result<()> { + let ctx = batch_slicing_ctx(2); + + // list: [1, 2], null, [3], null + // id: A, B, C, D + // With preserve_nulls=false: only rows A and C produce output (3 rows total) + let list_array = Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2)]), + None, + Some(vec![Some(3)]), + None, + ])) as ArrayRef; + let id_array = Arc::new(StringArray::from(vec!["A", "B", "C", "D"])) as ArrayRef; + + let batch = RecordBatch::try_from_iter(vec![("id", id_array), ("list", list_array)])?; + ctx.register_batch("t", batch)?; + + let results = ctx + .table("t") + .await? + .unnest_columns_with_options( + &["list"], + UnnestOptions::new().with_preserve_nulls(false), + )? + .collect() + .await?; + + assert_total_and_max_batch_rows(&results, 3, 2); + + assert_snapshot!( + batches_to_sort_string(&results), + @r" + +----+------+ + | id | list | + +----+------+ + | A | 1 | + | A | 2 | + | C | 3 | + +----+------+ + " + ); + + Ok(()) +} + +/// Tests that recursive unnest (`depth > 1`) preserves correctness under a +/// small `batch_size`. +/// +/// Recursive / staged unnest now bypasses row-slice chunking entirely because +/// changing batch boundaries can leak through downstream repartitioning and +/// reorder parent-row groups. This test verifies that the conservative +/// fallback still produces the complete result. +#[tokio::test] +async fn unnest_chunks_recursive_single_row_fallback() -> Result<()> { + let ctx = batch_slicing_ctx(4); + + let batch = nested_int32_batch(&[&[&[1, 2], &[3]], &[&[4, 5]], &[&[6], &[7, 8, 9]]])?; + ctx.register_batch("t", batch)?; + + let results = ctx + .sql("SELECT unnest(unnest(nested)) AS val FROM t") + .await? + .collect() + .await?; + + assert_eq!(total_rows(&results), 9); + + assert_snapshot!( + batches_to_sort_string(&results), + @r" + +-----+ + | val | + +-----+ + | 1 | + | 2 | + | 3 | + | 4 | + | 5 | + | 6 | + | 7 | + | 8 | + | 9 | + +-----+ + " + ); + + Ok(()) +} + +#[tokio::test] +async fn unnest_chunks_stacked_unnest_preserves_order() -> Result<()> { + let ctx = batch_slicing_ctx(2); + + let batch = nested_int32_batch(&[&[&[1, 2], &[3]], &[&[4], &[5, 6]]])?; + ctx.register_batch("t", batch)?; + + let dataframe = ctx + .sql( + "SELECT unnest(nested1) AS val, original \ + FROM (SELECT unnest(nested) AS nested1, nested AS original FROM t)", + ) + .await?; + let physical_plan = dataframe.clone().create_physical_plan().await?; + assert_eq!( + count_physical_plan_nodes_by_name(&physical_plan, "UnnestExec"), + 2 + ); + + let results = dataframe.collect().await?; + + assert_snapshot!( + batches_to_string(&results), + @r" + +-----+---------------+ + | val | original | + +-----+---------------+ + | 1 | [[1, 2], [3]] | + | 2 | [[1, 2], [3]] | + | 3 | [[1, 2], [3]] | + | 4 | [[4], [5, 6]] | + | 5 | [[4], [5, 6]] | + | 6 | [[4], [5, 6]] | + +-----+---------------+ + " + ); + + Ok(()) +} + +#[tokio::test] +async fn unnest_chunks_recursive_repeated_refs_preserve_order() -> Result<()> { + let ctx = batch_slicing_ctx_with_partitions(2, 4); + + create_recursive_unnest_table(&ctx).await?; + + let dataframe = ctx.sql(RECURSIVE_REPEATED_REFS_QUERY).await?; + + let physical_plan = dataframe.clone().create_physical_plan().await?; + let plan_text = displayable(physical_plan.as_ref()) + .indent(false) + .to_string(); + assert!( + plan_text.contains("RepartitionExec: partitioning=RoundRobinBatch(4)"), + "expected repeated-reference unnest plan to include RoundRobinBatch(4), got:\n{plan_text}" + ); + assert!( + count_physical_plan_nodes_by_name(&physical_plan, "UnnestExec") >= 1, + "expected repeated-reference unnest plan to include UnnestExec, got:\n{plan_text}" + ); + + let results = dataframe.collect().await?; + + assert_snapshot!( + batches_to_string(&results), + @r" + +----------------------------------------+------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------------------+ + | UNNEST(recursive_unnest_table.column2) | UNNEST(UNNEST(recursive_unnest_table.column2)) | UNNEST(UNNEST(UNNEST(recursive_unnest_table.column2))) | UNNEST(UNNEST(UNNEST(recursive_unnest_table.column2))) + Int64(1) | + +----------------------------------------+------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------------------+ + | [[1], [2]] | [1] | 1 | 2 | + | [[1, 1]] | [2] | | | + | [[1], [2]] | [1, 1] | 2 | 3 | + | [[1, 1]] | | | | + | [[1], [2]] | [1] | 1 | 2 | + | [[1, 1]] | [2] | 1 | 2 | + | [[1], [2]] | [1, 1] | | | + | [[1, 1]] | | | | + | [[3, 4], [5]] | [3, 4] | 3 | 4 | + | [[, 6], , [7, 8]] | [5] | 4 | 5 | + | [[3, 4], [5]] | [, 6] | 5 | 6 | + | [[, 6], , [7, 8]] | | | | + | | [7, 8] | | | + | [[3, 4], [5]] | [3, 4] | | | + | [[, 6], , [7, 8]] | [5] | 6 | 7 | + | [[3, 4], [5]] | [, 6] | | | + | [[, 6], , [7, 8]] | | | | + | | [7, 8] | | | + | [[3, 4], [5]] | | 7 | 8 | + | [[, 6], , [7, 8]] | | 8 | 9 | + +----------------------------------------+------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------------------+ + " + ); + + Ok(()) +} diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 48de79b741e06..3f833292278de 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -17,7 +17,6 @@ //! Define a plan for unnesting values in columns that contain a list type. -use std::cmp::{self, Ordering}; use std::task::{Poll, ready}; use std::{any::Any, sync::Arc}; @@ -45,7 +44,7 @@ use async_trait::async_trait; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err, - internal_err, + internal_err, is_unnest_placeholder_field, }; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -128,7 +127,6 @@ impl UnnestExec { .collect(); // Manually build projection mapping from non-unnested input columns to their positions in the output - let input_schema = input.schema(); let projection_mapping: ProjectionMapping = non_unnested_indices .iter() .map(|&input_idx| { @@ -205,6 +203,20 @@ impl UnnestExec { ..Self::clone(self) } } + + fn disable_chunking_for_recursive_or_staged_unnest(&self) -> bool { + // TODO(#20788): Re-enable chunking for recursive / staged UNNEST once + // UnnestExec can preserve parent-row boundaries across downstream + // repartitioning while remaining memory-bounded. Stacked UNNEST still + // needs a parent-row-aware chunking strategy to avoid order regressions. + let input_schema = self.input.schema(); + self.list_column_indices.iter().any(|unnest| { + unnest.depth > 1 + || is_unnest_placeholder_field( + input_schema.field(unnest.index_in_input_schema), + ) + }) + } } impl DisplayAs for UnnestExec { @@ -271,6 +283,7 @@ impl ExecutionPlan for UnnestExec { partition: usize, context: Arc, ) -> Result { + let output_batch_size = context.session_config().batch_size().max(1); let input = self.input.execute(partition, context)?; let metrics = UnnestMetrics::new(partition, &self.metrics); @@ -279,6 +292,10 @@ impl ExecutionPlan for UnnestExec { schema: Arc::clone(&self.schema), list_type_columns: self.list_column_indices.clone(), struct_column_indices: self.struct_column_indices.iter().copied().collect(), + pending: None, + disable_chunking_for_recursive_or_staged_unnest: self + .disable_chunking_for_recursive_or_staged_unnest(), + output_batch_size, options: self.options.clone(), metrics, })) @@ -314,6 +331,28 @@ impl UnnestMetrics { } } +/// State for draining one input batch into size-bounded unnest chunks. +struct PendingBatch { + batch: RecordBatch, + next_row: usize, +} + +impl PendingBatch { + fn new(batch: RecordBatch) -> Self { + Self { batch, next_row: 0 } + } + + fn remaining_rows(&self) -> usize { + self.batch.num_rows().saturating_sub(self.next_row) + } + + fn slice(&mut self, row_count: usize) -> RecordBatch { + let batch = self.batch.slice(self.next_row, row_count); + self.next_row += row_count; + batch + } +} + /// A stream that issues [RecordBatch]es with unnested column data. struct UnnestStream { /// Input stream @@ -325,6 +364,10 @@ struct UnnestStream { /// then list_type_columns = [ListUnnest{1,1},ListUnnest{1,2}] list_type_columns: Vec, struct_column_indices: HashSet, + pending: Option, + disable_chunking_for_recursive_or_staged_unnest: bool, + /// Target maximum number of output rows per emitted batch. + output_batch_size: usize, /// Options options: UnnestOptions, /// Metrics @@ -357,30 +400,21 @@ impl UnnestStream { cx: &mut std::task::Context<'_>, ) -> Poll>> { loop { + if let Some(result_batch) = self.build_next_pending_batch()? { + (&result_batch).record_output(&self.metrics.baseline_metrics); + + // Empty record batches should not be emitted. + // They need to be treated as [`Option`]es and handled separately. + debug_assert!(result_batch.num_rows() > 0); + return Poll::Ready(Some(Ok(result_batch))); + } + return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { - let elapsed_compute = - self.metrics.baseline_metrics.elapsed_compute().clone(); - let timer = elapsed_compute.timer(); self.metrics.input_batches.add(1); self.metrics.input_rows.add(batch.num_rows()); - let result = build_batch( - &batch, - &self.schema, - &self.list_type_columns, - &self.struct_column_indices, - &self.options, - )?; - timer.done(); - let Some(result_batch) = result else { - continue; - }; - (&result_batch).record_output(&self.metrics.baseline_metrics); - - // Empty record batches should not be emitted. - // They need to be treated as [`Option`]es and handled separately - debug_assert!(result_batch.num_rows() > 0); - Some(Ok(result_batch)) + self.pending = Some(PendingBatch::new(batch)); + continue; } other => { trace!( @@ -397,6 +431,132 @@ impl UnnestStream { }); } } + + fn build_next_pending_batch(&mut self) -> Result> { + loop { + let Some(pending) = self.pending.as_mut() else { + return Ok(None); + }; + + if pending.remaining_rows() == 0 { + self.pending = None; + return Ok(None); + } + + let row_count = if self.disable_chunking_for_recursive_or_staged_unnest { + pending.remaining_rows() + } else { + next_input_slice_row_count( + &pending.batch, + pending.next_row, + &self.list_type_columns, + &self.options, + self.output_batch_size, + )? + }; + let batch_slice = pending.slice(row_count); + if pending.remaining_rows() == 0 { + self.pending = None; + } + + let Some(result_batch) = self.build_batch(&batch_slice)? else { + continue; + }; + + return Ok(Some(result_batch)); + } + } + + fn build_batch(&self, batch: &RecordBatch) -> Result> { + let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone(); + let timer = elapsed_compute.timer(); + let result = build_batch( + batch, + &self.schema, + &self.list_type_columns, + &self.struct_column_indices, + &self.options, + ); + timer.done(); + result + } +} + +fn next_input_slice_row_count( + batch: &RecordBatch, + start_row: usize, + list_type_columns: &[ListUnnest], + options: &UnnestOptions, + output_batch_size: usize, +) -> Result { + let remaining_rows = batch.num_rows().saturating_sub(start_row); + if remaining_rows == 0 { + return Ok(0); + } + + if list_type_columns.is_empty() { + return Ok(remaining_rows); + } + + // Recursive unnest can amplify a row beyond what top-level list lengths reveal. + // Fall back to single-row slices until recursive unnest becomes chunk-aware. + if list_type_columns.iter().any(|unnest| unnest.depth > 1) { + return Ok(1); + } + + let mut rows = 0; + let mut estimated_output_rows = 0usize; + while start_row + rows < batch.num_rows() { + let row_output_rows = estimate_row_output_rows( + batch, + start_row + rows, + list_type_columns, + options, + )?; + if rows > 0 && estimated_output_rows + row_output_rows > output_batch_size { + break; + } + estimated_output_rows += row_output_rows; + rows += 1; + if estimated_output_rows >= output_batch_size { + break; + } + } + + Ok(rows.max(1)) +} + +fn estimate_row_output_rows( + batch: &RecordBatch, + row: usize, + list_type_columns: &[ListUnnest], + options: &UnnestOptions, +) -> Result { + list_type_columns + .iter() + .map(|unnest| { + list_output_length( + batch.column(unnest.index_in_input_schema).as_ref(), + row, + options, + ) + }) + .try_fold(0usize, |max_len, len| len.map(|len| max_len.max(len))) +} + +fn list_output_length( + array: &dyn Array, + row: usize, + options: &UnnestOptions, +) -> Result { + let null_length = usize::from(options.preserve_nulls); + let typed = as_list_array_type(array)?; + Ok(if typed.is_null(row) { + null_length + } else { + let (start, end) = typed.value_offsets(row); + (end - start) as usize + }) } /// Given a set of struct column indices to flatten @@ -418,8 +578,8 @@ fn flatten_struct_cols( let columns_expanded = input_batch .iter() .enumerate() - .map(|(idx, column_data)| match struct_column_indices.get(&idx) { - Some(_) => match column_data.data_type() { + .map(|(idx, column_data)| match struct_column_indices.contains(&idx) { + true => match column_data.data_type() { DataType::Struct(_) => { let struct_arr = column_data.as_any().downcast_ref::().unwrap(); @@ -429,7 +589,7 @@ fn flatten_struct_cols( "expecting column {idx} from input plan to be a struct, got {data_type}" ), }, - None => Ok(vec![Arc::clone(column_data)]), + false => Ok(vec![Arc::clone(column_data)]), }) .collect::>>()? .into_iter() @@ -544,18 +704,13 @@ fn list_unnest_at_level( .iter() .enumerate() .map(|(i, _)| { - // Check if the column is needed in future levels (levels below the current one) - let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| { - unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest - }); - - // Check if the column is involved in unnesting at any level - let is_involved_in_unnesting = list_type_unnests + let mut unnests = list_type_unnests .iter() - .any(|unnesting| unnesting.index_in_input_schema == i); - - // Repeat columns needed in future levels or not unnested. - needed_in_future_levels || !is_involved_in_unnesting + .filter(|unnest| unnest.index_in_input_schema == i); + unnests.any(|unnest| unnest.depth < level_to_unnest) + || !list_type_unnests + .iter() + .any(|unnest| unnest.index_in_input_schema == i) }) .collect(); @@ -565,11 +720,6 @@ fn list_unnest_at_level( Ok(Some(ret)) } -struct UnnestingResult { - arr: ArrayRef, - depth: usize, -} - /// For each row in a `RecordBatch`, some list/struct columns need to be unnested. /// - For list columns: We will expand the values in each list into multiple rows, /// taking the longest length among these lists, and shorter lists are padded with NULLs. @@ -633,121 +783,68 @@ fn build_batch( struct_column_indices: &HashSet, options: &UnnestOptions, ) -> Result> { - let transformed = match list_type_columns.len() { - 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices), - _ => { - let mut temp_unnested_result = HashMap::new(); - let max_recursion = list_type_columns - .iter() - .fold(0, |highest_depth, ListUnnest { depth, .. }| { - cmp::max(highest_depth, *depth) - }); - - // This arr always has the same column count with the input batch - let mut flatten_arrs = vec![]; - - // Original batch has the same columns - // All unnesting results are written to temp_batch - for depth in (1..=max_recursion).rev() { - let input = match depth == max_recursion { - true => batch.columns(), - false => &flatten_arrs, - }; - let Some(temp_result) = list_unnest_at_level( - input, - list_type_columns, - &mut temp_unnested_result, - depth, - options, - )? - else { - return Ok(None); - }; - flatten_arrs = temp_result; - } - let unnested_array_map: HashMap> = - temp_unnested_result.into_iter().fold( - HashMap::new(), - |mut acc, - ( - ListUnnest { - index_in_input_schema, - depth, - }, - flattened_array, - )| { - acc.entry(index_in_input_schema).or_default().push( - UnnestingResult { - arr: flattened_array, - depth, - }, - ); - acc - }, - ); - let output_order: HashMap = list_type_columns - .iter() - .enumerate() - .map(|(order, unnest_def)| (*unnest_def, order)) - .collect(); - - // One original column may be unnested multiple times into separate columns - let mut multi_unnested_per_original_index = unnested_array_map - .into_iter() - .map( - // Each item in unnested_columns is the result of unnesting the same input column - // we need to sort them to conform with the original expression order - // e.g unnest(unnest(col)) must goes before unnest(col) - |(original_index, mut unnested_columns)| { - unnested_columns.sort_by( - |UnnestingResult { depth: depth1, .. }, - UnnestingResult { depth: depth2, .. }| - -> Ordering { - output_order - .get(&ListUnnest { - depth: *depth1, - index_in_input_schema: original_index, - }) - .unwrap() - .cmp( - output_order - .get(&ListUnnest { - depth: *depth2, - index_in_input_schema: original_index, - }) - .unwrap(), - ) - }, - ); - ( - original_index, - unnested_columns - .into_iter() - .map(|result| result.arr) - .collect::>(), - ) - }, - ) - .collect::>(); - - let ret = flatten_arrs - .into_iter() - .enumerate() - .flat_map(|(col_idx, arr)| { - // Convert original column into its unnested version(s) - // Plural because one column can be unnested with different recursion level - // and into separate output columns - match multi_unnested_per_original_index.remove(&col_idx) { - Some(unnested_arrays) => unnested_arrays, - None => vec![arr], - } - }) - .collect::>(); - - flatten_struct_cols(&ret, schema, struct_column_indices) - } - }?; - Ok(Some(transformed)) + if list_type_columns.is_empty() { + return flatten_struct_cols(batch.columns(), schema, struct_column_indices) + .map(Some); + } + + let mut temp_unnested_result = HashMap::new(); + let max_recursion = list_type_columns + .iter() + .map(|ListUnnest { depth, .. }| *depth) + .max() + .unwrap_or_default(); + let mut flatten_arrs = vec![]; + + for depth in (1..=max_recursion).rev() { + let input = if depth == max_recursion { + batch.columns() + } else { + &flatten_arrs + }; + let Some(temp_result) = list_unnest_at_level( + input, + list_type_columns, + &mut temp_unnested_result, + depth, + options, + )? + else { + return Ok(None); + }; + flatten_arrs = temp_result; + } + + let output_order: HashMap = list_type_columns + .iter() + .enumerate() + .map(|(order, unnest_def)| (*unnest_def, order)) + .collect(); + let mut multi_unnested_per_original_index = HashMap::>::new(); + let mut unnested_columns = temp_unnested_result.into_iter().collect::>(); + unnested_columns + .sort_by_key(|(unnest, _)| output_order.get(unnest).copied().unwrap_or_default()); + for (unnest, arr) in unnested_columns { + multi_unnested_per_original_index + .entry(unnest.index_in_input_schema) + .or_default() + .push(arr); + } + + flatten_struct_cols( + &flatten_arrs + .into_iter() + .enumerate() + .flat_map(|(col_idx, arr)| { + multi_unnested_per_original_index + .remove(&col_idx) + .unwrap_or_else(|| vec![arr]) + }) + .collect::>(), + schema, + struct_column_indices, + ) + .map(Some) } /// Find the longest list length among the given list arrays for each row. @@ -845,6 +942,23 @@ impl ListArrayType for FixedSizeListArray { } } +/// Cast a generic array reference to `&dyn ListArrayType`. +/// +/// This is the single authoritative place for the List / LargeList / +/// FixedSizeList -> `ListArrayType` dispatch. Centralising it here +/// means that adding a new list variant (or changing null-handling) +/// only requires editing one function. +fn as_list_array_type(array: &dyn Array) -> Result<&dyn ListArrayType> { + match array.data_type() { + DataType::List(_) => Ok(array.as_list::() as &dyn ListArrayType), + DataType::LargeList(_) => Ok(array.as_list::() as &dyn ListArrayType), + DataType::FixedSizeList(_, _) => { + Ok(array.as_fixed_size_list() as &dyn ListArrayType) + } + other => exec_err!("Invalid unnest datatype {other}"), + } +} + /// Unnest multiple list arrays according to the length array. fn unnest_list_arrays( list_arrays: &[ArrayRef], @@ -853,16 +967,7 @@ fn unnest_list_arrays( ) -> Result> { let typed_arrays = list_arrays .iter() - .map(|list_array| match list_array.data_type() { - DataType::List(_) => Ok(list_array.as_list::() as &dyn ListArrayType), - DataType::LargeList(_) => { - Ok(list_array.as_list::() as &dyn ListArrayType) - } - DataType::FixedSizeList(_, _) => { - Ok(list_array.as_fixed_size_list() as &dyn ListArrayType) - } - other => exec_err!("Invalid unnest datatype {other }"), - }) + .map(|list_array| as_list_array_type(list_array.as_ref())) .collect::>>()?; typed_arrays @@ -1029,8 +1134,11 @@ mod tests { use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::{Field, Int32Type}; use datafusion_common::test_util::batches_to_string; + use datafusion_common::unnest_placeholder_field_metadata; use insta::assert_snapshot; + use crate::empty::EmptyExec; + // Create a GenericListArray with the following list values: // [A, B, C], [], NULL, [D], NULL, [NULL, F] fn make_generic_array() -> GenericListArray @@ -1079,6 +1187,66 @@ mod tests { ) } + #[test] + fn chunking_is_disabled_for_staged_unnest_inputs() -> Result<()> { + let input_schema = Arc::new(Schema::new(vec![ + Field::new( + "stacked_input", + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), + true, + ) + .with_metadata(unnest_placeholder_field_metadata().to_hashmap()), + ])); + let output_schema = Arc::new(Schema::new(vec![Field::new( + "unnested", + DataType::Int32, + true, + )])); + let exec = UnnestExec::new( + Arc::new(EmptyExec::new(Arc::clone(&input_schema))), + vec![ListUnnest { + index_in_input_schema: 0, + depth: 1, + }], + vec![], + output_schema, + UnnestOptions::default(), + )?; + + assert!(exec.disable_chunking_for_recursive_or_staged_unnest()); + Ok(()) + } + + #[test] + fn chunking_is_disabled_for_recursive_unnest() -> Result<()> { + let input_schema = Arc::new(Schema::new(vec![Field::new( + "recursive_input", + DataType::List(Arc::new(Field::new_list_field( + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), + true, + ))), + true, + )])); + let output_schema = Arc::new(Schema::new(vec![Field::new( + "unnested", + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), + true, + )])); + let exec = UnnestExec::new( + Arc::new(EmptyExec::new(Arc::clone(&input_schema))), + vec![ListUnnest { + index_in_input_schema: 0, + depth: 2, + }], + vec![], + output_schema, + UnnestOptions::default(), + )?; + + assert!(exec.disable_chunking_for_recursive_or_staged_unnest()); + Ok(()) + } + // Create a FixedSizeListArray with the following list values: // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL] fn make_fixed_list() -> FixedSizeListArray { diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 16ac353d4ba9b..5277e05c51359 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -28,6 +28,7 @@ use datafusion_common::tree_node::{ use datafusion_common::{ Column, DFSchemaRef, Diagnostic, HashMap, Result, ScalarValue, assert_or_internal_err, exec_datafusion_err, exec_err, internal_err, plan_err, + unnest_placeholder_field_metadata, }; use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{ @@ -455,7 +456,10 @@ impl RecursiveUnnestRewriter<'_> { ); push_projection_dedupl( self.inner_projection_exprs, - expr_in_unnest.clone().alias(placeholder_name.clone()), + expr_in_unnest.clone().alias_with_metadata( + placeholder_name.clone(), + Some(unnest_placeholder_field_metadata()), + ), ); self.columns_unnestings .insert(Column::from_name(placeholder_name.clone()), None); @@ -469,7 +473,10 @@ impl RecursiveUnnestRewriter<'_> { | DataType::LargeList(_) => { push_projection_dedupl( self.inner_projection_exprs, - expr_in_unnest.clone().alias(placeholder_name.clone()), + expr_in_unnest.clone().alias_with_metadata( + placeholder_name.clone(), + Some(unnest_placeholder_field_metadata()), + ), ); let post_unnest_expr = col(post_unnest_name.clone()).alias(alias_name); @@ -699,9 +706,13 @@ mod tests { use std::{ops::Add, sync::Arc}; use arrow::datatypes::{DataType as ArrowDataType, Field, Fields, Schema}; - use datafusion_common::{Column, DFSchema, Result}; + use datafusion_common::{ + Column, DFSchema, Result, is_unnest_placeholder_field, + unnest_placeholder_field_metadata, + }; use datafusion_expr::{ - ColumnUnnestList, EmptyRelation, LogicalPlan, col, lit, unnest, + ColumnUnnestList, EmptyRelation, Expr, ExprSchemable, LogicalPlan, col, lit, + unnest, }; use datafusion_functions::core::expr_ext::FieldAccessor; use datafusion_functions_aggregate::expr_fn::count; @@ -731,6 +742,13 @@ mod tests { assert_eq!(l_formatted, r_formatted); } + fn placeholder_alias(input_column: &str) -> Expr { + col(input_column).alias_with_metadata( + format!("__unnest_placeholder({input_column})"), + Some(unnest_placeholder_field_metadata()), + ) + } + #[test] fn test_transform_bottom_unnest_recursive() -> Result<()> { let schema = Schema::new(vec![ @@ -792,12 +810,12 @@ mod tests { // Still reference struct_col in original schema but with alias, // to avoid colliding with the projection on the column itself if any + let (_, placeholder_field) = + inner_projection_exprs[0].to_field(input.schema())?; + assert!(is_unnest_placeholder_field(placeholder_field.as_ref())); assert_eq!( inner_projection_exprs, - vec![ - col("3d_col").alias("__unnest_placeholder(3d_col)"), - col("i64_col") - ] + vec![placeholder_alias("3d_col"), col("i64_col")] ); // unnest(3d_col) as 2d_col @@ -826,10 +844,7 @@ mod tests { // to avoid colliding with the projection on the column itself if any assert_eq!( inner_projection_exprs, - vec![ - col("3d_col").alias("__unnest_placeholder(3d_col)"), - col("i64_col") - ] + vec![placeholder_alias("3d_col"), col("i64_col")] ); Ok(()) @@ -890,7 +905,7 @@ mod tests { // to avoid colliding with the projection on the column itself if any assert_eq!( inner_projection_exprs, - vec![col("struct_col").alias("__unnest_placeholder(struct_col)"),] + vec![placeholder_alias("struct_col"),] ); // unnest(array_col) + 1 @@ -924,8 +939,8 @@ mod tests { assert_eq!( inner_projection_exprs, vec![ - col("struct_col").alias("__unnest_placeholder(struct_col)"), - col("array_col").alias("__unnest_placeholder(array_col)") + placeholder_alias("struct_col"), + placeholder_alias("array_col") ] ); @@ -1008,7 +1023,7 @@ mod tests { assert_eq!( inner_projection_exprs, - vec![col("struct_list").alias("__unnest_placeholder(struct_list)")] + vec![placeholder_alias("struct_list")] ); // continue rewrite another expr in select @@ -1040,7 +1055,7 @@ mod tests { assert_eq!( inner_projection_exprs, - vec![col("struct_list").alias("__unnest_placeholder(struct_list)")] + vec![placeholder_alias("struct_list")] ); Ok(())