From f49d6a850aee7472df74c04285784f69517fcefe Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Fri, 6 Sep 2024 09:32:03 +0300 Subject: [PATCH] Support encoding and decoding UnnestExec (#267) * Support encoding and decoding UnnestExec * test: check record count and types in parquet window test (#12277) * test: check record count and types in paqruet window test * Review suggestions Co-authored-by: Andrew Lamb --------- Co-authored-by: Andrew Lamb --------- Co-authored-by: Eduard Karacharov Co-authored-by: Andrew Lamb --- datafusion/physical-expr/Cargo.toml | 2 +- datafusion/physical-plan/src/unnest.rs | 19 ++ datafusion/proto/proto/datafusion.proto | 11 +- datafusion/proto/src/generated/pbjson.rs | 180 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 18 +- datafusion/proto/src/physical_plan/mod.rs | 46 ++++- .../tests/cases/roundtrip_physical_plan.rs | 31 ++- .../sqllogictest/test_files/parquet.slt | 32 ++-- 8 files changed, 316 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index b1b13991d974..bbae102d574e 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -81,4 +81,4 @@ name = "in_list" [[bench]] harness = false -name = "is_null" \ No newline at end of file +name = "is_null" diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index e072b214fd36..37a9644db195 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -109,6 +109,25 @@ impl UnnestExec { input.execution_mode(), ) } + + /// Input execution plan + pub fn input(&self) -> &Arc { + &self.input + } + + /// indices of the list-typed columns in the input schema + pub fn list_column_indices(&self) -> &[usize] { + &self.list_column_indices + } + + /// indices of the struct-typed columns in the input schema + pub fn struct_column_indices(&self) -> &[usize] { + &self.struct_column_indices + } + + pub fn options(&self) -> &UnnestOptions { + &self.options + } } impl DisplayAs for UnnestExec { diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 8497c21a3568..83b70dfc43f7 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -742,10 +742,11 @@ message PhysicalPlanNode { AnalyzeExecNode analyze = 23; JsonSinkExecNode json_sink = 24; SymmetricHashJoinExecNode symmetric_hash_join = 25; - InterleaveExecNode interleave = 26; + InterleaveExecNode interleave = 26; PlaceholderRowExecNode placeholder_row = 27; CsvSinkExecNode csv_sink = 28; ParquetSinkExecNode parquet_sink = 29; + UnnestExecNode unnest = 30; } } @@ -802,6 +803,14 @@ message ParquetSinkExecNode { PhysicalSortExprNodeCollection sort_order = 4; } +message UnnestExecNode { + PhysicalPlanNode input = 1; + datafusion_common.Schema schema = 2; + repeated uint64 list_type_columns = 3; + repeated uint64 struct_type_columns = 4; + UnnestOptions options = 5; +} + message PhysicalExtensionNode { bytes node = 1; repeated PhysicalPlanNode inputs = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 15fe00201e6f..820ff6e64e5e 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -14931,6 +14931,9 @@ impl serde::Serialize for PhysicalPlanNode { physical_plan_node::PhysicalPlanType::ParquetSink(v) => { struct_ser.serialize_field("parquetSink", v)?; } + physical_plan_node::PhysicalPlanType::Unnest(v) => { + struct_ser.serialize_field("unnest", v)?; + } } } struct_ser.end() @@ -14986,6 +14989,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "csvSink", "parquet_sink", "parquetSink", + "unnest", ]; #[allow(clippy::enum_variant_names)] @@ -15018,6 +15022,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { PlaceholderRow, CsvSink, ParquetSink, + Unnest, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -15067,6 +15072,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "placeholderRow" | "placeholder_row" => Ok(GeneratedField::PlaceholderRow), "csvSink" | "csv_sink" => Ok(GeneratedField::CsvSink), "parquetSink" | "parquet_sink" => Ok(GeneratedField::ParquetSink), + "unnest" => Ok(GeneratedField::Unnest), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -15283,6 +15289,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { return Err(serde::de::Error::duplicate_field("parquetSink")); } physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::ParquetSink) +; + } + GeneratedField::Unnest => { + if physical_plan_type__.is_some() { + return Err(serde::de::Error::duplicate_field("unnest")); + } + physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::Unnest) ; } } @@ -19446,6 +19459,173 @@ impl<'de> serde::Deserialize<'de> for Unnest { deserializer.deserialize_struct("datafusion.Unnest", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for UnnestExecNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.input.is_some() { + len += 1; + } + if self.schema.is_some() { + len += 1; + } + if !self.list_type_columns.is_empty() { + len += 1; + } + if !self.struct_type_columns.is_empty() { + len += 1; + } + if self.options.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.UnnestExecNode", len)?; + if let Some(v) = self.input.as_ref() { + struct_ser.serialize_field("input", v)?; + } + if let Some(v) = self.schema.as_ref() { + struct_ser.serialize_field("schema", v)?; + } + if !self.list_type_columns.is_empty() { + struct_ser.serialize_field("listTypeColumns", &self.list_type_columns.iter().map(ToString::to_string).collect::>())?; + } + if !self.struct_type_columns.is_empty() { + struct_ser.serialize_field("structTypeColumns", &self.struct_type_columns.iter().map(ToString::to_string).collect::>())?; + } + if let Some(v) = self.options.as_ref() { + struct_ser.serialize_field("options", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for UnnestExecNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "input", + "schema", + "list_type_columns", + "listTypeColumns", + "struct_type_columns", + "structTypeColumns", + "options", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Input, + Schema, + ListTypeColumns, + StructTypeColumns, + Options, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "input" => Ok(GeneratedField::Input), + "schema" => Ok(GeneratedField::Schema), + "listTypeColumns" | "list_type_columns" => Ok(GeneratedField::ListTypeColumns), + "structTypeColumns" | "struct_type_columns" => Ok(GeneratedField::StructTypeColumns), + "options" => Ok(GeneratedField::Options), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = UnnestExecNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.UnnestExecNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut input__ = None; + let mut schema__ = None; + let mut list_type_columns__ = None; + let mut struct_type_columns__ = None; + let mut options__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Input => { + if input__.is_some() { + return Err(serde::de::Error::duplicate_field("input")); + } + input__ = map_.next_value()?; + } + GeneratedField::Schema => { + if schema__.is_some() { + return Err(serde::de::Error::duplicate_field("schema")); + } + schema__ = map_.next_value()?; + } + GeneratedField::ListTypeColumns => { + if list_type_columns__.is_some() { + return Err(serde::de::Error::duplicate_field("listTypeColumns")); + } + list_type_columns__ = + Some(map_.next_value::>>()? + .into_iter().map(|x| x.0).collect()) + ; + } + GeneratedField::StructTypeColumns => { + if struct_type_columns__.is_some() { + return Err(serde::de::Error::duplicate_field("structTypeColumns")); + } + struct_type_columns__ = + Some(map_.next_value::>>()? + .into_iter().map(|x| x.0).collect()) + ; + } + GeneratedField::Options => { + if options__.is_some() { + return Err(serde::de::Error::duplicate_field("options")); + } + options__ = map_.next_value()?; + } + } + } + Ok(UnnestExecNode { + input: input__, + schema: schema__, + list_type_columns: list_type_columns__.unwrap_or_default(), + struct_type_columns: struct_type_columns__.unwrap_or_default(), + options: options__, + }) + } + } + deserializer.deserialize_struct("datafusion.UnnestExecNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for UnnestNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 7a608b24a4ac..2527e66e429f 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1061,7 +1061,7 @@ pub mod table_reference { pub struct PhysicalPlanNode { #[prost( oneof = "physical_plan_node::PhysicalPlanType", - tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29" + tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30" )] pub physical_plan_type: ::core::option::Option, } @@ -1128,6 +1128,8 @@ pub mod physical_plan_node { CsvSink(::prost::alloc::boxed::Box), #[prost(message, tag = "29")] ParquetSink(::prost::alloc::boxed::Box), + #[prost(message, tag = "30")] + Unnest(::prost::alloc::boxed::Box), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -1222,6 +1224,20 @@ pub struct ParquetSinkExecNode { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct UnnestExecNode { + #[prost(message, optional, boxed, tag = "1")] + pub input: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(message, optional, tag = "2")] + pub schema: ::core::option::Option, + #[prost(uint64, repeated, tag = "3")] + pub list_type_columns: ::prost::alloc::vec::Vec, + #[prost(uint64, repeated, tag = "4")] + pub struct_type_columns: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "5")] + pub options: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalExtensionNode { #[prost(bytes = "vec", tag = "1")] pub node: ::prost::alloc::vec::Vec, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index c3d411afca46..649f98dd34c9 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -56,6 +56,7 @@ use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; +use datafusion::physical_plan::unnest::UnnestExec; use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use datafusion::physical_plan::{ udaf, AggregateExpr, ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, @@ -64,7 +65,6 @@ use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_expr::{AggregateUDF, ScalarUDF}; use crate::common::{byte_to_string, str_to_byte}; -use crate::convert_required; use crate::physical_plan::from_proto::{ parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs, parse_physical_window_expr, parse_protobuf_file_scan_config, @@ -77,6 +77,7 @@ use crate::protobuf::physical_aggregate_expr_node::AggregateFunction; use crate::protobuf::physical_expr_node::ExprType; use crate::protobuf::physical_plan_node::PhysicalPlanType; use crate::protobuf::{self, proto_error, window_agg_exec_node}; +use crate::{convert_required, into_required}; use self::from_proto::parse_protobuf_partitioning; use self::to_proto::{serialize_partitioning, serialize_physical_expr}; @@ -1083,6 +1084,22 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { sort_order, ))) } + PhysicalPlanType::Unnest(unnest) => { + let input = into_physical_plan( + &unnest.input, + registry, + runtime, + extension_codec, + )?; + + Ok(Arc::new(UnnestExec::new( + input, + unnest.list_type_columns.iter().map(|c| *c as _).collect(), + unnest.struct_type_columns.iter().map(|c| *c as _).collect(), + Arc::new(convert_required!(unnest.schema)?), + into_required!(unnest.options)?, + ))) + } } } @@ -1933,6 +1950,33 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { // If unknown DataSink then let extension handle it } + if let Some(exec) = plan.downcast_ref::() { + let input = protobuf::PhysicalPlanNode::try_from_physical_plan( + exec.input().to_owned(), + extension_codec, + )?; + + return Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new( + protobuf::UnnestExecNode { + input: Some(Box::new(input)), + schema: Some(exec.schema().try_into()?), + list_type_columns: exec + .list_column_indices() + .iter() + .map(|c| *c as _) + .collect(), + struct_type_columns: exec + .struct_column_indices() + .iter() + .map(|c| *c as _) + .collect(), + options: Some(exec.options().into()), + }, + ))), + }); + } + let mut buf: Vec = vec![]; match extension_codec.try_encode(plan_clone.clone(), &mut buf) { Ok(_) => { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 87f57e11345e..97545045bb89 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use std::vec; use arrow::csv::WriterBuilder; +use arrow::datatypes::{Fields, TimeUnit}; use prost::Message; use crate::cases::{MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf, MyRegexUdfNode}; @@ -63,6 +64,7 @@ use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::udaf::create_aggregate_expr; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; +use datafusion::physical_plan::unnest::UnnestExec; use datafusion::physical_plan::windows::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowAggExec, }; @@ -76,7 +78,7 @@ use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; -use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_common::{not_impl_err, DataFusionError, Result, UnnestOptions}; use datafusion_expr::{ Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF, Signature, SimpleAggregateUDF, WindowFrame, WindowFrameBound, @@ -1150,3 +1152,30 @@ fn roundtrip_interleave() -> Result<()> { let interleave = InterleaveExec::try_new(inputs)?; roundtrip_test(Arc::new(interleave)) } + +#[test] +fn roundtrip_unnest() -> Result<()> { + let fa = Field::new("a", DataType::Int64, true); + let fb0 = Field::new_list_field(DataType::Utf8, true); + let fb = Field::new_list("b", fb0.clone(), false); + let fc1 = Field::new("c1", DataType::Boolean, false); + let fc2 = Field::new("c2", DataType::Date64, true); + let fc = Field::new_struct("c", Fields::from(vec![fc1.clone(), fc2.clone()]), true); + let fd0 = Field::new_list_field(DataType::Float32, false); + let fd = Field::new_list("d", fd0.clone(), true); + let fe1 = Field::new("e1", DataType::UInt16, false); + let fe2 = Field::new("e2", DataType::Duration(TimeUnit::Millisecond), true); + let fe3 = Field::new("e3", DataType::Timestamp(TimeUnit::Millisecond, None), true); + let fe_fields = Fields::from(vec![fe1.clone(), fe2.clone(), fe3.clone()]); + let fe = Field::new_struct("e", fe_fields, false); + + let fb0 = fb0.with_name("b"); + let fd0 = fd0.with_name("d"); + let input_schema = Arc::new(Schema::new(vec![fa.clone(), fb, fc, fd, fe])); + let output_schema = + Arc::new(Schema::new(vec![fa, fb0, fc1, fc2, fd0, fe1, fe2, fe3])); + let input = Arc::new(EmptyExec::new(input_schema)); + let options = UnnestOptions::default(); + let unnest = UnnestExec::new(input, vec![1, 3], vec![2, 4], output_schema, options); + roundtrip_test(Arc::new(unnest)) +} diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index e70f800bde74..e9ca0df81a19 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -251,25 +251,21 @@ SELECT COUNT(*) FROM timestamp_with_tz; ---- 131072 -# Perform the query: -query IPT -SELECT - count, - LAG(timestamp, 1) OVER (ORDER BY timestamp), - arrow_typeof(LAG(timestamp, 1) OVER (ORDER BY timestamp)) -FROM timestamp_with_tz -LIMIT 10; +# Ensure that output timestamp columns preserve the timezone from the input +# and output record count will match input file record count +query TPI +SELECT arrow_typeof(lag_timestamp), + MIN(lag_timestamp), + COUNT(1) +FROM ( + SELECT + count, + LAG(timestamp, 1) OVER (ORDER BY timestamp) AS lag_timestamp + FROM timestamp_with_tz +) t +GROUP BY 1 ---- -0 NULL Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -4 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -14 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +Timestamp(Millisecond, Some("UTC")) 2014-08-27T14:00:00Z 131072 # Test config listing_table_ignore_subdirectory: