-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation #55252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -157,13 +157,22 @@ case class DataSourceV2Relation( | |
| * @param keyGroupedPartitioning if set, the partitioning expressions that are used to split the | ||
| * rows in the scan across different partitions | ||
| * @param ordering if set, the ordering provided by the scan | ||
| * @param pushedFilters Catalyst expressions for filters that were fully pushed to the data | ||
| * source and do not appear as post-scan filters | ||
| */ | ||
| case class DataSourceV2ScanRelation( | ||
| relation: DataSourceV2Relation, | ||
| scan: Scan, | ||
| output: Seq[AttributeReference], | ||
| keyGroupedPartitioning: Option[Seq[Expression]] = None, | ||
| ordering: Option[Seq[SortOrder]] = None) extends LeafNode with NamedRelation { | ||
| ordering: Option[Seq[SortOrder]] = None, | ||
| pushedFilters: Seq[Expression] = Seq.empty) extends LeafNode with NamedRelation { | ||
|
|
||
| // TODO: Override validConstraints to return ExpressionSet(pushedFilters) so that pushed | ||
| // filters participate in constraint propagation (InferFiltersFromConstraints, PruneFilters). | ||
| // This changes which filters InferFiltersFromConstraints adds or removes (e.g., it may | ||
| // skip adding IsNotNull when the scan already implies it, or infer new filters across | ||
| // joins), so plan stability testing is needed first. | ||
|
|
||
| override def name: String = relation.name | ||
|
|
||
|
|
@@ -197,7 +206,8 @@ case class DataSourceV2ScanRelation( | |
| ), | ||
| ordering = ordering.map( | ||
| _.map(o => o.copy(child = QueryPlan.normalizeExpressions(o.child, output))) | ||
| ) | ||
| ), | ||
| pushedFilters = pushedFilters.map(QueryPlan.normalizeExpressions(_, output)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment in Consider filtering out pushed filters with stale references in val remappedPushedFilters = sHolder.pushedFilterExpressions.map(projectionFunc)
.filter(_.references.subsetOf(AttributeSet(output)))This ensures only filters with valid references participate in canonicalization, and is consistent with the "drop filters with stale references" option already mentioned in the comment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I debated with myself a bit on this and decided to not include pushed filters whose references are no longer in the pruned output. The reason for me to include them before, is that I was a bit worried that a fully pushed filter can be lost from
And I was a bit worried since part of the intention for this field was to keep what guarantee the connector can make, and now we couldn't have the complete information. But for now, I think this is acceptable since 1/ pushedFilters is informational only; a potential future improvement could be to keep fully pushed filters as post-scan Filter nodes, which would prevent the column from being pruned in the first place, and 2/ it appears that spark plan validator will also catch and reject this case when it realizes that some references in expressions is not resolvable. |
||
| ) | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,7 +23,7 @@ import scala.collection.mutable | |||||||||
|
|
||||||||||
| import org.apache.spark.SparkException | ||||||||||
| import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, COLUMN_NAMES, GROUP_BY_EXPRS, JOIN_CONDITION, JOIN_TYPE, POST_SCAN_FILTERS, PUSHED_FILTERS, RELATION_NAME, RELATION_OUTPUT} | ||||||||||
| import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, ExprId, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} | ||||||||||
| import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, ExpressionSet, ExprId, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} | ||||||||||
| import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression | ||||||||||
| import org.apache.spark.sql.catalyst.optimizer.CollapseProject | ||||||||||
| import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ScanOperation} | ||||||||||
|
|
@@ -95,6 +95,14 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |||||||||
|
|
||||||||||
| val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery | ||||||||||
|
|
||||||||||
| // Compute the pushed filter expressions: the normalized filters that were fully pushed | ||||||||||
| // down (i.e., not in postScanFilters). These are stored on the scan relation for | ||||||||||
| // potential future use in constraint propagation. | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Trace:
Consider filtering the result:
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for the info, TIL! |
||||||||||
| val postScanFilterSet = ExpressionSet(postScanFiltersWithoutSubquery) | ||||||||||
| sHolder.pushedFilterExpressions = normalizedFiltersWithoutSubquery | ||||||||||
| .filterNot(postScanFilterSet.contains) | ||||||||||
| .filter(_.deterministic) | ||||||||||
|
|
||||||||||
| logInfo( | ||||||||||
| log""" | ||||||||||
| |Pushing operators to ${MDC(RELATION_NAME, sHolder.relation.name)} | ||||||||||
|
|
@@ -698,6 +706,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |||||||||
| assert(realOutput.length == holder.output.length, | ||||||||||
| "The data source returns unexpected number of columns") | ||||||||||
| val wrappedScan = getWrappedScan(scan, holder) | ||||||||||
| // Note: holder.pushedFilterExpressions is not propagated here because the output schema | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it hard to fix?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In theory it's not, but the aggregate path replaces the output schema entirely (table columns -> aggregate columns), so the original filter expressions can't be remapped to the new output. So I'd prefer to defer this and revisit if there's a concrete use case. |
||||||||||
| // changes to aggregate columns. When validConstraints is wired up, this needs revisiting. | ||||||||||
| val scanRelation = DataSourceV2ScanRelation(holder.relation, wrappedScan, realOutput) | ||||||||||
| val projectList = realOutput.zip(holder.output).map { case (a1, a2) => | ||||||||||
| // The data source may return columns with arbitrary data types and it's safer to cast them | ||||||||||
|
|
@@ -715,6 +725,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |||||||||
| assert(realOutput.length == holder.output.length, | ||||||||||
| "The data source returns unexpected number of columns") | ||||||||||
| val wrappedScan = getWrappedScan(scan, holder) | ||||||||||
| // Note: holder.pushedFilterExpressions is not propagated here because the output schema | ||||||||||
| // changes with pushed join. When validConstraints is wired up, this needs revisiting. | ||||||||||
| val scanRelation = DataSourceV2ScanRelation(holder.relation, wrappedScan, realOutput) | ||||||||||
|
|
||||||||||
| // When join is pushed down, the real output is going to be, for example, | ||||||||||
|
|
@@ -737,6 +749,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |||||||||
| val scan = holder.builder.build() | ||||||||||
| val realOutput = toAttributes(scan.readSchema()) | ||||||||||
| val wrappedScan = getWrappedScan(scan, holder) | ||||||||||
| // Note: holder.pushedFilterExpressions is not propagated here because the output schema | ||||||||||
| // changes with variant extraction. When validConstraints is wired up, this needs revisiting. | ||||||||||
| val scanRelation = DataSourceV2ScanRelation(holder.relation, wrappedScan, realOutput) | ||||||||||
|
|
||||||||||
| // Create projection to map real output to expected output (with transformed types) | ||||||||||
|
|
@@ -787,14 +801,19 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |||||||||
|
|
||||||||||
| val wrappedScan = getWrappedScan(scan, sHolder) | ||||||||||
|
|
||||||||||
| val scanRelation = DataSourceV2ScanRelation(sHolder.relation, wrappedScan, output) | ||||||||||
|
|
||||||||||
| val projectionOverSchema = | ||||||||||
| ProjectionOverSchema(output.toStructType, AttributeSet(output)) | ||||||||||
| val projectionFunc = (expr: Expression) => expr transformDown { | ||||||||||
| case projectionOverSchema(newExpr) => newExpr | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Remap pushed filter attributes to the pruned output schema and drop filters | ||||||||||
| // whose references are no longer in the pruned output. | ||||||||||
| val remappedPushedFilters = sHolder.pushedFilterExpressions.map(projectionFunc) | ||||||||||
| .filter(_.references.subsetOf(AttributeSet(output))) | ||||||||||
| val scanRelation = DataSourceV2ScanRelation(sHolder.relation, wrappedScan, output, | ||||||||||
| pushedFilters = remappedPushedFilters) | ||||||||||
|
|
||||||||||
| val finalFilters = normalizedFilters.map(projectionFunc) | ||||||||||
| // bottom-most filters are put in the left of the list. | ||||||||||
| val withFilter = finalFilters.foldLeft[LogicalPlan](scanRelation)((plan, cond) => { | ||||||||||
|
|
@@ -1018,6 +1037,8 @@ case class ScanBuilderHolder( | |||||||||
| var pushedVariantAttributeMap: Map[ExprId, AttributeReference] = Map.empty | ||||||||||
|
|
||||||||||
| var pushedVariants: Option[VariantInRelation] = None | ||||||||||
|
|
||||||||||
| var pushedFilterExpressions: Seq[Expression] = Seq.empty | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // A wrapper for v1 scan to carry the translated filters and the handled ones, along with | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion. We can do it in this PR and update golden files if needed, or defer it to a followup PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you both for the review! I'd like to defer that in a later PR to avoid blocking this one for too long for adjustments for golden files and others that might take quite some investigation time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, let's remove this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I think in the long term it's still the right thing to do, just that we don't want to do this very soon, so from the standpoint of completeness I think there's still value in having this comment?