Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,22 @@ case class DataSourceV2Relation(
* @param keyGroupedPartitioning if set, the partitioning expressions that are used to split the
* rows in the scan across different partitions
* @param ordering if set, the ordering provided by the scan
* @param pushedFilters Catalyst expressions for filters that were fully pushed to the data
* source and do not appear as post-scan filters
*/
case class DataSourceV2ScanRelation(
relation: DataSourceV2Relation,
scan: Scan,
output: Seq[AttributeReference],
keyGroupedPartitioning: Option[Seq[Expression]] = None,
ordering: Option[Seq[SortOrder]] = None) extends LeafNode with NamedRelation {
ordering: Option[Seq[SortOrder]] = None,
pushedFilters: Seq[Expression] = Seq.empty) extends LeafNode with NamedRelation {

// TODO: Override validConstraints to return ExpressionSet(pushedFilters) so that pushed
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, what do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion. We can do it in this PR and update golden files if needed, or defer it to a followup PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you both for the review! I'd like to defer that in a later PR to avoid blocking this one for too long for adjustments for golden files and others that might take quite some investigation time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, let's remove this comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I think in the long term it's still the right thing to do, just that we don't want to do this very soon, so from the standpoint of completeness I think there's still value in having this comment?

// filters participate in constraint propagation (InferFiltersFromConstraints, PruneFilters).
// This changes which filters InferFiltersFromConstraints adds or removes (e.g., it may
// skip adding IsNotNull when the scan already implies it, or infer new filters across
// joins), so plan stability testing is needed first.

override def name: String = relation.name

Expand Down Expand Up @@ -197,7 +206,8 @@ case class DataSourceV2ScanRelation(
),
ordering = ordering.map(
_.map(o => o.copy(child = QueryPlan.normalizeExpressions(o.child, output)))
)
),
pushedFilters = pushedFilters.map(QueryPlan.normalizeExpressions(_, output))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment in pruneColumns (line 809-814 of V2ScanRelationPushDown.scala) says stale references are "acceptable while pushedFilters is informational only." However, this line normalizes pushedFilters against output via normalizeExpressions, which returns ordinal -1 for attributes not in the output, leaving the original exprId intact. Two equivalent plans would have different exprIds in their canonical form, breaking canonicalized == comparison (used by subquery dedup, plan caching, etc.).

Consider filtering out pushed filters with stale references in pruneColumns:

val remappedPushedFilters = sHolder.pushedFilterExpressions.map(projectionFunc)
  .filter(_.references.subsetOf(AttributeSet(output)))

This ensures only filters with valid references participate in canonicalization, and is consistent with the "drop filters with stale references" option already mentioned in the comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated with myself a bit on this and decided to not include pushed filters whose references are no longer in the pruned output.

The reason for me to include them before, is that I was a bit worried that a fully pushed filter can be lost from pushedFilter when the filtered column is not projected:

SELECT j FROM t WHERE i > 3 with a connector that pushes both GreaterThan and IsNotNull:

  1. After analysis: Project([j], Filter(i > 3, DataSourceV2Relation([i, j])))
  2. InferFiltersFromConstraints (runs before V2ScanRelationPushDown): derives IsNotNull(i) → Project([j], Filter(i > 3 AND IsNotNull(i), DataSourceV2Relation([i, j])))
  3. pushDownFilters: both i > 3 and IsNotNull(i) fully pushed → Filter node removed → Project([j], ScanBuilderHolder), pushedFilterExpressions = [i > 3]
  4. pruneColumns: only j is referenced → output = [j], column i pruned → pushed filter i > 3 references pruned i → dropped

And I was a bit worried since part of the intention for this field was to keep what guarantee the connector can make, and now we couldn't have the complete information.

But for now, I think this is acceptable since 1/ pushedFilters is informational only; a potential future improvement could be to keep fully pushed filters as post-scan Filter nodes, which would prevent the column from being pruned in the first place, and 2/ it appears that spark plan validator will also catch and reject this case when it realizes that some references in expressions is not resolvable.

)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, COLUMN_NAMES, GROUP_BY_EXPRS, JOIN_CONDITION, JOIN_TYPE, POST_SCAN_FILTERS, PUSHED_FILTERS, RELATION_NAME, RELATION_OUTPUT}
import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, ExprId, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, ExpressionSet, ExprId, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ScanOperation}
Expand Down Expand Up @@ -95,6 +95,14 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {

val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery

// Compute the pushed filter expressions: the normalized filters that were fully pushed
// down (i.e., not in postScanFilters). These are stored on the scan relation for
// potential future use in constraint propagation.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpressionSet.contains only checks its internal baseSet, which excludes non-deterministic expressions (they go to originals only — see ExpressionSet.add at ExpressionSet.scala:97-103). So if a non-deterministic filter like rand() > 0.5 is untranslatable and ends up in postScanFiltersWithoutSubquery, postScanFilterSet.contains will return false for it, and it will be incorrectly included in pushedFilterExpressions.

Trace: SELECT * FROM t WHERE i > 3 AND rand() > 0.5:

  1. normalizedFiltersWithoutSubquery = [i > 3, rand() > 0.5]
  2. PushDownUtils.pushFilters can't translate rand() > 0.5untranslatableExprspostScanFiltersWithoutSubquery
  3. ExpressionSet(postScanFiltersWithoutSubquery)rand() > 0.5 goes to originals only
  4. .filterNot(postScanFilterSet.contains)contains(rand() > 0.5)false → NOT filtered
  5. Result: pushedFilterExpressions incorrectly includes rand() > 0.5

Consider filtering the result:

Suggested change
// potential future use in constraint propagation.
val postScanFilterSet = ExpressionSet(postScanFiltersWithoutSubquery)
sHolder.pushedFilterExpressions =
normalizedFiltersWithoutSubquery.filterNot(postScanFilterSet.contains).filter(_.deterministic)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the info, TIL!

val postScanFilterSet = ExpressionSet(postScanFiltersWithoutSubquery)
sHolder.pushedFilterExpressions = normalizedFiltersWithoutSubquery
.filterNot(postScanFilterSet.contains)
.filter(_.deterministic)

logInfo(
log"""
|Pushing operators to ${MDC(RELATION_NAME, sHolder.relation.name)}
Expand Down Expand Up @@ -698,6 +706,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
assert(realOutput.length == holder.output.length,
"The data source returns unexpected number of columns")
val wrappedScan = getWrappedScan(scan, holder)
// Note: holder.pushedFilterExpressions is not propagated here because the output schema
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it hard to fix?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory it's not, but the aggregate path replaces the output schema entirely (table columns -> aggregate columns), so the original filter expressions can't be remapped to the new output. So I'd prefer to defer this and revisit if there's a concrete use case.

// changes to aggregate columns. When validConstraints is wired up, this needs revisiting.
val scanRelation = DataSourceV2ScanRelation(holder.relation, wrappedScan, realOutput)
val projectList = realOutput.zip(holder.output).map { case (a1, a2) =>
// The data source may return columns with arbitrary data types and it's safer to cast them
Expand All @@ -715,6 +725,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
assert(realOutput.length == holder.output.length,
"The data source returns unexpected number of columns")
val wrappedScan = getWrappedScan(scan, holder)
// Note: holder.pushedFilterExpressions is not propagated here because the output schema
// changes with pushed join. When validConstraints is wired up, this needs revisiting.
val scanRelation = DataSourceV2ScanRelation(holder.relation, wrappedScan, realOutput)

// When join is pushed down, the real output is going to be, for example,
Expand All @@ -737,6 +749,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
val scan = holder.builder.build()
val realOutput = toAttributes(scan.readSchema())
val wrappedScan = getWrappedScan(scan, holder)
// Note: holder.pushedFilterExpressions is not propagated here because the output schema
// changes with variant extraction. When validConstraints is wired up, this needs revisiting.
val scanRelation = DataSourceV2ScanRelation(holder.relation, wrappedScan, realOutput)

// Create projection to map real output to expected output (with transformed types)
Expand Down Expand Up @@ -787,14 +801,19 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {

val wrappedScan = getWrappedScan(scan, sHolder)

val scanRelation = DataSourceV2ScanRelation(sHolder.relation, wrappedScan, output)

val projectionOverSchema =
ProjectionOverSchema(output.toStructType, AttributeSet(output))
val projectionFunc = (expr: Expression) => expr transformDown {
case projectionOverSchema(newExpr) => newExpr
}

// Remap pushed filter attributes to the pruned output schema and drop filters
// whose references are no longer in the pruned output.
val remappedPushedFilters = sHolder.pushedFilterExpressions.map(projectionFunc)
.filter(_.references.subsetOf(AttributeSet(output)))
val scanRelation = DataSourceV2ScanRelation(sHolder.relation, wrappedScan, output,
pushedFilters = remappedPushedFilters)

val finalFilters = normalizedFilters.map(projectionFunc)
// bottom-most filters are put in the left of the list.
val withFilter = finalFilters.foldLeft[LogicalPlan](scanRelation)((plan, cond) => {
Expand Down Expand Up @@ -1018,6 +1037,8 @@ case class ScanBuilderHolder(
var pushedVariantAttributeMap: Map[ExprId, AttributeReference] = Map.empty

var pushedVariants: Option[VariantInRelation] = None

var pushedFilterExpressions: Seq[Expression] = Seq.empty
}

// A wrapper for v1 scan to carry the translated filters and the handled ones, along with
Expand Down
Loading