Skip to content

Commit 7f5c8f4

Browse files
committed
revert change recompute_schema for LogicalPlan::Values
1 parent a2e454b commit 7f5c8f4

File tree

2 files changed

+32
-15
lines changed

2 files changed

+32
-15
lines changed

datafusion/core/tests/execution/logical_plan.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,45 @@ use arrow::array::Int64Array;
2222
use arrow::datatypes::{DataType, Field, Schema};
2323
use datafusion::datasource::{provider_as_source, ViewTable};
2424
use datafusion::execution::session_state::SessionStateBuilder;
25-
use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue};
25+
use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, ScalarValue, Spans};
2626
use datafusion_execution::TaskContext;
2727
use datafusion_expr::expr::{AggregateFunction, AggregateFunctionParams};
28-
use datafusion_expr::logical_plan::LogicalPlan;
28+
use datafusion_expr::logical_plan::{LogicalPlan, Values};
2929
use datafusion_expr::{
30-
col, Aggregate, AggregateUDF, EmptyRelation, Expr, LogicalPlanBuilder, UNNAMED_TABLE,
30+
Aggregate, AggregateUDF, EmptyRelation, Expr, LogicalPlanBuilder, UNNAMED_TABLE,
3131
};
3232
use datafusion_functions_aggregate::count::Count;
3333
use datafusion_physical_plan::collect;
3434
use insta::assert_snapshot;
35+
use std::collections::HashMap;
3536
use std::fmt::Debug;
3637
use std::ops::Deref;
3738
use std::sync::Arc;
3839

3940
#[tokio::test]
4041
async fn count_only_nulls() -> Result<()> {
4142
// Input: VALUES (NULL), (NULL), (NULL) AS _(col)
42-
let input = LogicalPlanBuilder::values(vec![
43-
vec![Expr::Literal(ScalarValue::Null, None)],
44-
vec![Expr::Literal(ScalarValue::Null, None)],
45-
vec![Expr::Literal(ScalarValue::Null, None)],
46-
])?
47-
.project(vec![col("column1").alias("col")])?
48-
.build()?;
49-
let input_col_ref = col("col");
43+
let input_schema = Arc::new(DFSchema::from_unqualified_fields(
44+
vec![Field::new("col", DataType::Null, true)].into(),
45+
HashMap::new(),
46+
)?);
47+
let input = Arc::new(LogicalPlan::Values(Values {
48+
schema: input_schema,
49+
values: vec![
50+
vec![Expr::Literal(ScalarValue::Null, None)],
51+
vec![Expr::Literal(ScalarValue::Null, None)],
52+
vec![Expr::Literal(ScalarValue::Null, None)],
53+
],
54+
}));
55+
let input_col_ref = Expr::Column(Column {
56+
relation: None,
57+
name: "col".to_string(),
58+
spans: Spans::new(),
59+
});
5060

5161
// Aggregation: count(col) AS count
5262
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
53-
input.into(),
63+
input,
5464
vec![],
5565
vec![Expr::AggregateFunction(AggregateFunction {
5666
func: Arc::new(AggregateUDF::new_from_impl(Count::new())),

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -632,8 +632,9 @@ impl LogicalPlan {
632632
}) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
633633
LogicalPlan::Dml(_) => Ok(self),
634634
LogicalPlan::Copy(_) => Ok(self),
635-
LogicalPlan::Values(Values { schema: _, values }) => {
636-
LogicalPlanBuilder::values(values)?.build()
635+
LogicalPlan::Values(Values { schema, values }) => {
636+
// todo it isn't clear why the schema is not recomputed here
637+
Ok(LogicalPlan::Values(Values { schema, values }))
637638
}
638639
LogicalPlan::Filter(Filter { predicate, input }) => {
639640
Filter::try_new(predicate, input).map(LogicalPlan::Filter)
@@ -1473,7 +1474,13 @@ impl LogicalPlan {
14731474
})?
14741475
// always recompute the schema to ensure the changed in the schema's field should be
14751476
// poplulated to the plan's parent
1476-
.map_data(|plan| plan.recompute_schema())
1477+
.map_data(|plan| plan.recompute_schema())?
1478+
.map_data(|plan| match plan {
1479+
LogicalPlan::Values(Values { values, schema: _ }) => {
1480+
LogicalPlanBuilder::values(values)?.build()
1481+
}
1482+
_ => Ok(plan),
1483+
})
14771484
})
14781485
.map(|res| res.data)
14791486
}

0 commit comments

Comments
 (0)