Skip to content

[Spark-50873][SQL] Prune column after RewriteSubquery rule for DSV2 #50399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

Akeron-Zhu
Copy link

@Akeron-Zhu Akeron-Zhu commented Mar 26, 2025

What changes were proposed in this pull request?

This PR offers an optimize rule for SparkOptimizer to prune unnecessary column for DataSourceV2 (DSV2) after RewriteSubquery.
Spark 3 use V2ScanRelationPushDown rule to prune column for DSV2. However, if there are subquerys in the qeuery sql, RewriteSubery rule will be generated new predicates which can be use to prune column after executed V2ScanRelationPushDown, but Spark does not prune column again which cause lower performance.
See the issue for more detail description : SPARK-50873

Why are the changes needed?

A better performance for Spark DSV2.
For example, in 10T TPCDS test, the query16 execution time will be reduced by 50% from 2.5min to 1.3min in my cluster.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

GitHub Actions.

Was this patch authored or co-authored using generative AI tooling?

No.

@cloud-fan
Copy link
Contributor

can you add a test to show an example query that gets benefit from this change?

@jackylee-ch
Copy link
Contributor

It seems that we have generated a new ScanBuilder. Does this affect the filters that have been pushed down through V2ScanRelationPushDown?

@Akeron-Zhu
Copy link
Author

It seems that we have generated a new ScanBuilder. Does this affect the filters that have been pushed down through V2ScanRelationPushDown?

Thanks for your question. It will not affect. Because this rule will check the Scan by comparing Scan's columns and columns required for Project and Filter, if there are no needless columns, it will do nothing. Otherwise, it will generate a new Scan and finally add the Project and Filter on the Scan if necessary. In summary, it only change the scan.
I learn from V2ScanRelationPushDown pruneColumns func to write this rule. There is only some different logic to prune unnecessary columns that generated by RewriteSubquery rule.

@LuciferYang
Copy link
Contributor

It seems that it could also resolve the issue described in SPARK-51831. Can the case described in SPARK-51831 be modified into a test case?

@Akeron-Zhu
Copy link
Author

It seems that it could also resolve the issue described in SPARK-51831. Can the case described in SPARK-51831 be modified into a test case?

Thank you! @LuciferYang , it really helps.
Yes, I wrote this PR because I encountered the same problem as SPARK-51831. After checking the column pruning of DSV1 in the code, I found that DSV1 calculates the required columns and performs column pruning when generating SCAN at the end. So this PR is the same as the DSV1 method, which check the plan and perform the column pruning for unnecessary column.

@jackylee-ch
Copy link
Contributor

After running fllow test case, I found no filter is pushed down to Scan, although columns are pruned. @Akeron-Zhu Could you check for it?

test("Test exist join with v2 source plan") {
    import org.apache.spark.sql.functions._
    withTempPath { dir =>
      spark.range(100)
        .withColumn("col1", col("id") + 1)
        .withColumn("col2", col("id") + 2)
        .withColumn("col3", col("id") + 3)
        .withColumn("col4", col("id") + 4)
        .withColumn("col5", col("id") + 5)
        .withColumn("col6", col("id") + 6)
        .withColumn("col7", col("id") + 7)
        .withColumn("col8", col("id") + 8)
        .withColumn("col9", col("id") + 9)
        .write
        .mode("overwrite")
        .parquet(dir.getCanonicalPath + "/t1")
      spark.range(10).write.mode("overwrite").parquet(dir.getCanonicalPath + "/t2")
      Seq("parquet", "").foreach { v1SourceList =>
        withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key-> v1SourceList) {
          spark.read.parquet(dir.getCanonicalPath + "/t1").createOrReplaceTempView("t1")
          spark.read.parquet(dir.getCanonicalPath + "/t2").createOrReplaceTempView("t2")
          spark.sql(
            """
              |select sum(t1.id) as sum_id
              |from t1, t2
              |where t1.id == t2.id
              |      and exists(select * from t1 where t1.id == t2.id  and t1.col1>5)
              |""".stripMargin).explain()
        }
      }
    }
  } 

Before this PR:
DataSource V1:

FileScan parquet [id#32L,col1#33L] Batched: true, DataFilters: [isnotnull(col1#33L), (col1#33L > 5)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-40..., PartitionFilters: [], PushedFilters: [IsNotNull(col1), GreaterThan(col1,5)], ReadSchema: struct<id:bigint,col1:bigint>

DataSource V2:

BatchScan parquet file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-c0b5b1ad-138c-4a87-a39b-12c50600f061/t1[id#58L, col1#59L, col2#60L, col3#61L, col4#62L, col5#63L, col6#64L, col7#65L, col8#66L, col9#67L] ParquetScan DataFilters: [isnotnull(col1#59L), (col1#59L > 5)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-c0..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(col1), GreaterThan(col1,5)], PushedGroupBy: [], ReadSchema: struct<id:bigint,col1:bigint,col2:bigint,col3:bigint,col4:bigint,col5:bigint,col6:bigint,col7:big... RuntimeFilters: []

After this PR:
DataSource V2:

BatchScan parquet file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-4030df5b-b0de-423d-b548-07b85390bade/t1[id#58L, col1#59L] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-40..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<id:bigint,col1:bigint> RuntimeFilters: []

@Akeron-Zhu
Copy link
Author

After running fllow test case, I found no filter is pushed down to Scan, although columns are pruned. @Akeron-Zhu Could you check for it?

test("Test exist join with v2 source plan") {
    import org.apache.spark.sql.functions._
    withTempPath { dir =>
      spark.range(100)
        .withColumn("col1", col("id") + 1)
        .withColumn("col2", col("id") + 2)
        .withColumn("col3", col("id") + 3)
        .withColumn("col4", col("id") + 4)
        .withColumn("col5", col("id") + 5)
        .withColumn("col6", col("id") + 6)
        .withColumn("col7", col("id") + 7)
        .withColumn("col8", col("id") + 8)
        .withColumn("col9", col("id") + 9)
        .write
        .mode("overwrite")
        .parquet(dir.getCanonicalPath + "/t1")
      spark.range(10).write.mode("overwrite").parquet(dir.getCanonicalPath + "/t2")
      Seq("parquet", "").foreach { v1SourceList =>
        withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key-> v1SourceList) {
          spark.read.parquet(dir.getCanonicalPath + "/t1").createOrReplaceTempView("t1")
          spark.read.parquet(dir.getCanonicalPath + "/t2").createOrReplaceTempView("t2")
          spark.sql(
            """
              |select sum(t1.id) as sum_id
              |from t1, t2
              |where t1.id == t2.id
              |      and exists(select * from t1 where t1.id == t2.id  and t1.col1>5)
              |""".stripMargin).explain()
        }
      }
    }
  } 

Before this PR: DataSource V1:

FileScan parquet [id#32L,col1#33L] Batched: true, DataFilters: [isnotnull(col1#33L), (col1#33L > 5)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-40..., PartitionFilters: [], PushedFilters: [IsNotNull(col1), GreaterThan(col1,5)], ReadSchema: struct<id:bigint,col1:bigint>

DataSource V2:

BatchScan parquet file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-c0b5b1ad-138c-4a87-a39b-12c50600f061/t1[id#58L, col1#59L, col2#60L, col3#61L, col4#62L, col5#63L, col6#64L, col7#65L, col8#66L, col9#67L] ParquetScan DataFilters: [isnotnull(col1#59L), (col1#59L > 5)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-c0..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(col1), GreaterThan(col1,5)], PushedGroupBy: [], ReadSchema: struct<id:bigint,col1:bigint,col2:bigint,col3:bigint,col4:bigint,col5:bigint,col6:bigint,col7:big... RuntimeFilters: []

After this PR: DataSource V2:

BatchScan parquet file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-4030df5b-b0de-423d-b548-07b85390bade/t1[id#58L, col1#59L] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-40..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<id:bigint,col1:bigint> RuntimeFilters: []

Oh, I see, It is my mistake, I thought pushed filter is fliters in plan, Thanks for point out. I will check it later.

@Akeron-Zhu
Copy link
Author

Akeron-Zhu commented May 16, 2025

Hello teachers, @cloud-fan @jackylee-ch @LuciferYang , I submitted and tested my another solution SPARK-50873-2 today. This solution only addresses the problem in SPARK-51831 which caused by EXISTS subquery. The solution rewrite "SELECT *" as "SELECT 1" in WHERE EXISTS during optimization phase. Because all these problems in TPCDS are caused by using "SELECT *" in EXISTS. I have submitted the PR, but it currently cannot pass the TPCDSV1_4-PLanStability and TPCDSV1_4-PLanStabilityWithStats tests, because after this rule, some columns may be swapped and ID may be changed, it is different from the plan file in source code, but the plan and answer is right. Like the picture show:
Plan Compare

After @jackylee-ch reminder, I found that my first solution (this PR) cannot push down operators such as filters and aggregate. If push down all operators, I need to rewrite V2ScanRelationPushDown rule completely because SCAN cannot be modified after generated, the SCAN interface in spark does not provide modify function.
I wrote the first solution hoping to calculate the required columns at the end like FileScan, which can effectively avoid unnecessary columns in any situation. So I have an idea, can V2ScanRelationPushDown be executed at last in SparkOptimizer? As there are many involved and I am a new learner of Spark, it is up to all teachers.

@Akeron-Zhu Akeron-Zhu closed this May 16, 2025
@Akeron-Zhu
Copy link
Author

After running fllow test case, I found no filter is pushed down to Scan, although columns are pruned. @Akeron-Zhu Could you check for it?

test("Test exist join with v2 source plan") {
    import org.apache.spark.sql.functions._
    withTempPath { dir =>
      spark.range(100)
        .withColumn("col1", col("id") + 1)
        .withColumn("col2", col("id") + 2)
        .withColumn("col3", col("id") + 3)
        .withColumn("col4", col("id") + 4)
        .withColumn("col5", col("id") + 5)
        .withColumn("col6", col("id") + 6)
        .withColumn("col7", col("id") + 7)
        .withColumn("col8", col("id") + 8)
        .withColumn("col9", col("id") + 9)
        .write
        .mode("overwrite")
        .parquet(dir.getCanonicalPath + "/t1")
      spark.range(10).write.mode("overwrite").parquet(dir.getCanonicalPath + "/t2")
      Seq("parquet", "").foreach { v1SourceList =>
        withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key-> v1SourceList) {
          spark.read.parquet(dir.getCanonicalPath + "/t1").createOrReplaceTempView("t1")
          spark.read.parquet(dir.getCanonicalPath + "/t2").createOrReplaceTempView("t2")
          spark.sql(
            """
              |select sum(t1.id) as sum_id
              |from t1, t2
              |where t1.id == t2.id
              |      and exists(select * from t1 where t1.id == t2.id  and t1.col1>5)
              |""".stripMargin).explain()
        }
      }
    }
  } 

Before this PR: DataSource V1:

FileScan parquet [id#32L,col1#33L] Batched: true, DataFilters: [isnotnull(col1#33L), (col1#33L > 5)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-40..., PartitionFilters: [], PushedFilters: [IsNotNull(col1), GreaterThan(col1,5)], ReadSchema: struct<id:bigint,col1:bigint>

DataSource V2:

BatchScan parquet file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-c0b5b1ad-138c-4a87-a39b-12c50600f061/t1[id#58L, col1#59L, col2#60L, col3#61L, col4#62L, col5#63L, col6#64L, col7#65L, col8#66L, col9#67L] ParquetScan DataFilters: [isnotnull(col1#59L), (col1#59L > 5)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-c0..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(col1), GreaterThan(col1,5)], PushedGroupBy: [], ReadSchema: struct<id:bigint,col1:bigint,col2:bigint,col3:bigint,col4:bigint,col5:bigint,col6:bigint,col7:big... RuntimeFilters: []

After this PR: DataSource V2:

BatchScan parquet file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-4030df5b-b0de-423d-b548-07b85390bade/t1[id#58L, col1#59L] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-40..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<id:bigint,col1:bigint> RuntimeFilters: []

Hello, Jackylee, I found that my first solution cannot push down operators such as filters and aggregate. If push down all operators, I need to rewrite V2ScanRelationPushDown rule completely because SCAN cannot be modified after generated, the SCAN interface in spark does not provide modify function. So I submitted other solutions SPARK-50873-2 & SPARK-52186.
Thanks again for your reminder.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants