From 8df23b6972130b84c8fc79d38b06aa27df11a9f3 Mon Sep 17 00:00:00 2001 From: Yan Ma Date: Tue, 11 Jun 2024 14:08:04 +0000 Subject: [PATCH] Refine code and fix UTs --- docs/get-started/Velox.md | 9 +-- .../DataSourceScanTransformerRegister.scala | 10 +-- .../execution/ScanTransformerFactory.scala | 2 +- gluten-hudi/pom.xml | 5 +- .../execution/HudiScanTransformer.scala | 12 +-- .../HudiScanTransformerProvider.scala | 7 +- .../gluten/execution/VeloxHudiSuite.scala | 74 +++++++++++++++---- package/pom.xml | 10 +++ pom.xml | 9 ++- 9 files changed, 89 insertions(+), 49 deletions(-) diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index c956592d70284..3f1eeeef890da 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -324,7 +324,7 @@ About column mapping, see more [here](https://docs.delta.io/latest/delta-column- ## Iceberg Support -Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported. +Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, both reading COW (Copy-On-Write) and MOR (Merge-On-Read) tables are supported. ### How to use @@ -336,8 +336,6 @@ mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests Once built successfully, iceberg features will be included in gluten-velox-bundle-X jar. Then you can query iceberg table by gluten/velox without scan's fallback. -After the two steps, you can query iceberg table by gluten/velox without scan's fallback. - ## Hudi Support Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported. @@ -350,10 +348,7 @@ First of all, compile gluten-hudi module by a `hudi` profile, as follows: mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests ``` -Then, put the additional `gluten-hudi-XX-SNAPSHOT.jar` to the class path (usually it's `$SPARK_HOME/jars`). -The gluten-hudi jar is in `gluten-hudi/target` directory. - -After the two steps, you can query Hudi table by gluten/velox without scan's fallback. +Once built successfully, hudi features will be included in gluten-velox-bundle-X jar. Then you can query hudi **COW** table by gluten/velox without scan's fallback. # Coverage diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala index 5b46c23857d3d..ac6811f0ba2f7 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala @@ -29,13 +29,13 @@ trait DataSourceScanTransformerRegister { /** * The class name that used to identify what kind of datasource this is。 * - * For DataSource V1, it should be the child class name of - * [[org.apache.spark.sql.execution.datasources.FileIndex]]. + * For DataSource V1, it should be relation.fileFormat like + * {{{ + * override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat" + * }}} * * For DataSource V2, it should be the child class name of - * [[org.apache.spark.sql.connector.read.Scan]]. - * - * For example: + * [[org.apache.spark.sql.connector.read.Scan]]. For example: * {{{ * override val scanClassName: String = "org.apache.iceberg.spark.source.SparkBatchQueryScan" * }}} diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index a05a5e72bfe1d..7d8c5aab6b87b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -127,7 +127,7 @@ object ScanTransformerFactory { .getOrElse(getClass.getClassLoader) val serviceLoader = ServiceLoader.load(classOf[DataSourceScanTransformerRegister], loader) serviceLoader.asScala - .filter(_.scanClassName.equalsIgnoreCase(scanClassName)) + .filter(service => scanClassName.contains(service.scanClassName)) .toList match { case head :: Nil => // there is exactly one registered alias diff --git a/gluten-hudi/pom.xml b/gluten-hudi/pom.xml index e9aa342d4f1d5..be3d84b0167e2 100755 --- a/gluten-hudi/pom.xml +++ b/gluten-hudi/pom.xml @@ -63,14 +63,11 @@ test-jar test - - org.apache.spark - spark-core_${scala.binary.version} - org.apache.spark spark-core_${scala.binary.version} test-jar + test org.apache.spark diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala index d8fa461ea5897..76a818c96e37d 100644 --- a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala @@ -52,10 +52,9 @@ case class HudiScanTransformer( override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat override protected def doValidateInternal(): ValidationResult = { - if (!requiredSchema.fields.exists(_.name == "_hoodie_record_key")) { - return ValidationResult.notOk(s"Hudi meta field not present.") + if (requiredSchema.fields.exists(_.name.startsWith("_hoodie"))) { + return ValidationResult.failed(s"Hudi meta field not supported.") } - super.doValidateInternal() } @@ -78,14 +77,12 @@ case class HudiScanTransformer( object HudiScanTransformer { - def apply( - scanExec: FileSourceScanExec, - newPartitionFilters: Seq[Expression]): HudiScanTransformer = { + def apply(scanExec: FileSourceScanExec): HudiScanTransformer = { new HudiScanTransformer( scanExec.relation, scanExec.output, scanExec.requiredSchema, - newPartitionFilters, + scanExec.partitionFilters, scanExec.optionalBucketSet, scanExec.optionalNumCoalescedBuckets, scanExec.dataFilters, @@ -93,5 +90,4 @@ object HudiScanTransformer { scanExec.disableBucketedScan ) } - } diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala index 2968220dc64eb..17c9375da3f2a 100644 --- a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala @@ -21,11 +21,10 @@ import org.apache.spark.sql.execution.FileSourceScanExec class HudiScanTransformerProvider extends DataSourceScanTransformerRegister { - override val scanClassName: String = "org.apache.hudi.HoodieFileIndex" + override val scanClassName: String = "HoodieParquetFileFormat" override def createDataSourceTransformer( - batchScan: FileSourceScanExec, - newPartitionFilters: Seq[Expression]): FileSourceScanExecTransformerBase = { - HudiScanTransformer(batchScan, newPartitionFilters) + batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = { + HudiScanTransformer(batchScan) } } diff --git a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala index 942a94a3edff5..4f9bd896c2522 100644 --- a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala +++ b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala @@ -18,9 +18,6 @@ package org.apache.gluten.execution import org.apache.spark.SparkConf import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType} - -import scala.collection.JavaConverters._ class VeloxHudiSuite extends WholeStageTransformerSuite { @@ -31,8 +28,6 @@ class VeloxHudiSuite extends WholeStageTransformerSuite { override protected def sparkConf: SparkConf = { super.sparkConf .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") - .set("spark.sql.files.maxPartitionBytes", "1g") - .set("spark.sql.shuffle.partitions", "1") .set("spark.memory.offHeap.size", "2g") .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set("spark.sql.autoBroadcastJoinThreshold", "-1") @@ -43,7 +38,7 @@ class VeloxHudiSuite extends WholeStageTransformerSuite { .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") } - testWithSpecifiedSparkVersion("hudi: time travel", Some("3.3")) { + testWithSpecifiedSparkVersion("hudi: time travel", Some("3.2")) { withTable("hudi_tm") { spark.sql(s""" |create table hudi_tm (id int, name string) using hudi @@ -54,17 +49,65 @@ class VeloxHudiSuite extends WholeStageTransformerSuite { spark.sql(s""" |insert into hudi_tm values (3, "v3"), (4, "v4") |""".stripMargin) - val df1 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 1") { _ => } + val df = spark.sql(" select _hoodie_commit_time from hudi_tm;") + val value = df.collectAsList().get(0).getAs[String](0) + val df1 = runQueryAndCompare("select id, name from hudi_tm timestamp AS OF " + value) { + checkGlutenOperatorMatch[HudiScanTransformer] + } checkLengthAndPlan(df1, 2) checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) - val df2 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 2") { _ => } - checkLengthAndPlan(df2, 4) - checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil) - val df3 = runQueryAndCompare("select name from hudi_tm VERSION AS OF 2 where id = 2") { - _ => + val df2 = + runQueryAndCompare("select name from hudi_tm timestamp AS OF " + value + " where id = 2") { + checkGlutenOperatorMatch[HudiScanTransformer] + } + checkLengthAndPlan(df2, 1) + checkAnswer(df2, Row("v2") :: Nil) + } + } + + testWithSpecifiedSparkVersion("hudi: soft delete", Some("3.2")) { + withTable("hudi_pf") { + spark.sql(s""" + |create table hudi_pf (id int, name string) using hudi + |""".stripMargin) + spark.sql(s""" + |insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") + |""".stripMargin) + spark.sql(s""" + |delete from hudi_pf where name = "v2" + |""".stripMargin) + val df1 = runQueryAndCompare("select id, name from hudi_pf") { + checkGlutenOperatorMatch[HudiScanTransformer] } - checkLengthAndPlan(df3, 1) - checkAnswer(df3, Row("v2") :: Nil) + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil) + } + } + + // FIXME: flaky leaked file systems issue + ignore("hudi: mor", Some("3.2")) { + withTable("hudi_mor") { + spark.sql(s""" + |create table hudi_mor (id int, name string, ts bigint) + |using hudi + |tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |""".stripMargin) + spark.sql(s""" + |insert into hudi_mor values (1, "v1", 1000), (2, "v2", 2000), + | (3, "v1", 3000), (4, "v2", 4000) + |""".stripMargin) + spark.sql(s""" + |delete from hudi_mor where id = 1 + |""".stripMargin) + val df1 = + runQueryAndCompare("select id, name from hudi_mor where name = 'v1'", true, false, false) { + _ => + } + checkAnswer(df1, Row(3, "v1") :: Nil) } } @@ -76,7 +119,7 @@ class VeloxHudiSuite extends WholeStageTransformerSuite { spark.sql(s""" |insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") |""".stripMargin) - val df1 = runQueryAndCompare("select * from hudi_pf where name = 'v1'") { _ => } + val df1 = runQueryAndCompare("select id, name from hudi_pf where name = 'v1'") { _ => } val hudiScanTransformer = df1.queryExecution.executedPlan.collect { case f: HudiScanTransformer => f }.head @@ -86,5 +129,4 @@ class VeloxHudiSuite extends WholeStageTransformerSuite { checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil) } } - } diff --git a/package/pom.xml b/package/pom.xml index ab87e14805ff5..488fd1b9e78b9 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -93,6 +93,16 @@ + + hudi + + + org.apache.gluten + gluten-hudi + ${project.version} + + + diff --git a/pom.xml b/pom.xml index 659bcf27ccfc1..bc1c3f4a79a6c 100644 --- a/pom.xml +++ b/pom.xml @@ -261,7 +261,7 @@ delta-core 2.0.1 20 - 0.14.1 + 0.15.0 @@ -276,7 +276,7 @@ delta-core 2.3.0 23 - 0.14.1 + 0.15.0 @@ -290,7 +290,7 @@ delta-core 2.4.0 24 - 0.14.1 + 0.15.0 @@ -303,7 +303,8 @@ 1.5.0 delta-spark 3.2.0 - 32 + 32 + 0.15.0 2.15.1 3.3.4