From 30f2cf7666f6c9a53a47eea39e40caa678c393f8 Mon Sep 17 00:00:00 2001 From: Shiyan Xu Date: Sun, 21 Apr 2024 17:31:47 -0500 Subject: [PATCH] [GLUTEN-5471][VL]feat: Support read Hudi COW table --- .github/workflows/velox_docker.yml | 17 +- docs/get-started/Velox.md | 15 ++ docs/get-started/build-guide.md | 1 + gluten-hudi/pom.xml | 165 ++++++++++++++++++ ...xecution.DataSourceScanTransformerRegister | 1 + .../execution/HudiScanTransformer.scala | 97 ++++++++++ .../HudiScanTransformerProvider.scala | 31 ++++ .../gluten/execution/VeloxHudiSuite.scala | 90 ++++++++++ .../gluten/execution/VeloxTPCHHudiSuite.scala | 63 +++++++ pom.xml | 12 ++ 10 files changed, 484 insertions(+), 8 deletions(-) create mode 100755 gluten-hudi/pom.xml create mode 100644 gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister create mode 100644 gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala create mode 100644 gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala create mode 100644 gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala create mode 100644 gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala diff --git a/.github/workflows/velox_docker.yml b/.github/workflows/velox_docker.yml index 644fe62b932e3..e52d76ee4bb82 100644 --- a/.github/workflows/velox_docker.yml +++ b/.github/workflows/velox_docker.yml @@ -30,6 +30,7 @@ on: - 'gluten-data/**' - 'gluten-delta/**' - 'gluten-iceberg/**' + - 'gluten-hudi/**' - 'gluten-ut/**' - 'shims/**' - 'tools/gluten-it/**' @@ -680,7 +681,7 @@ jobs: cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 $MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg \ - -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \ + -Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags - name: Upload golden files if: failure() @@ -737,7 +738,7 @@ jobs: - name: Build and run unit test for Spark 3.2.2 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest + $MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest run-spark-test-spark33: needs: build-native-lib-centos-7 @@ -799,7 +800,7 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags - name: Upload golden files @@ -858,7 +859,7 @@ jobs: - name: Build and Run unit test for Spark 3.3.1 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \ -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest @@ -922,7 +923,7 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags - name: Upload golden files @@ -981,7 +982,7 @@ jobs: - name: Build and Run unit test for Spark 3.4.2 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \ -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest @@ -1045,7 +1046,7 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags - name: Upload golden files @@ -1173,6 +1174,6 @@ jobs: - name: Build and Run unit test for Spark 3.5.1 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \ -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 776736b2fff5b..aadc7c66adcb7 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -333,6 +333,21 @@ mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests Once built successfully, iceberg features will be included in gluten-velox-bundle-X jar. Then you can query iceberg table by gluten/velox without scan's fallback. + +## Hudi Support + +Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported. + +### How to use + +First of all, compile gluten-hudi module by a `hudi` profile, as follows: + +``` +mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests +``` + +Once built successfully, hudi features will be included in gluten-velox-bundle-X jar. Then you can query hudi table by gluten/velox without scan's fallback. + # Coverage Spark3.3 has 387 functions in total. ~240 are commonly used. To get the support status of all Spark built-in functions, please refer to [Velox Backend's Supported Operators & Functions](../velox-backend-support-progress.md). diff --git a/docs/get-started/build-guide.md b/docs/get-started/build-guide.md index 05e6119ecd4f1..d1b1533ad1e39 100644 --- a/docs/get-started/build-guide.md +++ b/docs/get-started/build-guide.md @@ -62,6 +62,7 @@ The below parameters can be set via `-P` for mvn. | uniffle | Build Gluten with Uniffle. | disabled | | delta | Build Gluten with Delta Lake support. | disabled | | iceberg | Build Gluten with Iceberg support. | disabled | +| hudi | Build Gluten with Hudi support. | disabled | | spark-3.2 | Build Gluten for Spark 3.2. | enabled | | spark-3.3 | Build Gluten for Spark 3.3. | disabled | | spark-3.4 | Build Gluten for Spark 3.4. | disabled | diff --git a/gluten-hudi/pom.xml b/gluten-hudi/pom.xml new file mode 100755 index 0000000000000..e9aa342d4f1d5 --- /dev/null +++ b/gluten-hudi/pom.xml @@ -0,0 +1,165 @@ + + + + gluten-parent + org.apache.gluten + 1.2.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + gluten-hudi + jar + Gluten Hudi + + + ${project.basedir}/src/main/resources + + + + + org.apache.gluten + gluten-core + ${project.version} + provided + + + org.apache.hudi + hudi-spark${sparkbundle.version}-bundle_${scala.binary.version} + ${hudi.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + provided + + + + + org.apache.gluten + gluten-core + ${project.version} + test-jar + test + + + org.apache.gluten + backends-velox + ${project.version} + test + + + org.apache.gluten + backends-velox + ${project.version} + test-jar + test + + + org.apache.spark + spark-core_${scala.binary.version} + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + + + org.apache.spark + spark-sql_${scala.binary.version} + test-jar + + + org.apache.spark + spark-catalyst_${scala.binary.version} + test-jar + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + test + + + com.google.protobuf + protobuf-java + ${protobuf.version} + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + ${resource.dir} + + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + org.scalatest + scalatest-maven-plugin + + . + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + + diff --git a/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister b/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister new file mode 100644 index 0000000000000..ccfe1ada479e8 --- /dev/null +++ b/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister @@ -0,0 +1 @@ +org.apache.gluten.execution.HudiScanTransformerProvider \ No newline at end of file diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala new file mode 100644 index 0000000000000..d8fa461ea5897 --- /dev/null +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.extension.ValidationResult +import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.collection.BitSet + +case class HudiScanTransformer( + @transient override val relation: HadoopFsRelation, + override val output: Seq[Attribute], + override val requiredSchema: StructType, + override val partitionFilters: Seq[Expression], + override val optionalBucketSet: Option[BitSet], + override val optionalNumCoalescedBuckets: Option[Int], + override val dataFilters: Seq[Expression], + override val tableIdentifier: Option[TableIdentifier], + override val disableBucketedScan: Boolean = false) + extends FileSourceScanExecTransformerBase( + relation, + output, + requiredSchema, + partitionFilters, + optionalBucketSet, + optionalNumCoalescedBuckets, + dataFilters, + tableIdentifier, + disableBucketedScan + ) { + + override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat + + override protected def doValidateInternal(): ValidationResult = { + if (!requiredSchema.fields.exists(_.name == "_hoodie_record_key")) { + return ValidationResult.notOk(s"Hudi meta field not present.") + } + + super.doValidateInternal() + } + + override def doCanonicalize(): HudiScanTransformer = { + HudiScanTransformer( + relation, + output.map(QueryPlan.normalizeExpressions(_, output)), + requiredSchema, + QueryPlan.normalizePredicates( + filterUnusedDynamicPruningExpressions(partitionFilters), + output), + optionalBucketSet, + optionalNumCoalescedBuckets, + QueryPlan.normalizePredicates(dataFilters, output), + None, + disableBucketedScan + ) + } +} + +object HudiScanTransformer { + + def apply( + scanExec: FileSourceScanExec, + newPartitionFilters: Seq[Expression]): HudiScanTransformer = { + new HudiScanTransformer( + scanExec.relation, + scanExec.output, + scanExec.requiredSchema, + newPartitionFilters, + scanExec.optionalBucketSet, + scanExec.optionalNumCoalescedBuckets, + scanExec.dataFilters, + scanExec.tableIdentifier, + scanExec.disableBucketedScan + ) + } + +} diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala new file mode 100644 index 0000000000000..2968220dc64eb --- /dev/null +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.FileSourceScanExec + +class HudiScanTransformerProvider extends DataSourceScanTransformerRegister { + + override val scanClassName: String = "org.apache.hudi.HoodieFileIndex" + + override def createDataSourceTransformer( + batchScan: FileSourceScanExec, + newPartitionFilters: Seq[Expression]): FileSourceScanExecTransformerBase = { + HudiScanTransformer(batchScan, newPartitionFilters) + } +} diff --git a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala new file mode 100644 index 0000000000000..942a94a3edff5 --- /dev/null +++ b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType} + +import scala.collection.JavaConverters._ + +class VeloxHudiSuite extends WholeStageTransformerSuite { + + protected val rootPath: String = getClass.getResource("/").getPath + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.sql.sources.useV1SourceList", "avro") + .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + } + + testWithSpecifiedSparkVersion("hudi: time travel", Some("3.3")) { + withTable("hudi_tm") { + spark.sql(s""" + |create table hudi_tm (id int, name string) using hudi + |""".stripMargin) + spark.sql(s""" + |insert into hudi_tm values (1, "v1"), (2, "v2") + |""".stripMargin) + spark.sql(s""" + |insert into hudi_tm values (3, "v3"), (4, "v4") + |""".stripMargin) + val df1 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 1") { _ => } + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) + val df2 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 2") { _ => } + checkLengthAndPlan(df2, 4) + checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil) + val df3 = runQueryAndCompare("select name from hudi_tm VERSION AS OF 2 where id = 2") { + _ => + } + checkLengthAndPlan(df3, 1) + checkAnswer(df3, Row("v2") :: Nil) + } + } + + testWithSpecifiedSparkVersion("hudi: partition filters", Some("3.2")) { + withTable("hudi_pf") { + spark.sql(s""" + |create table hudi_pf (id int, name string) using hudi partitioned by (name) + |""".stripMargin) + spark.sql(s""" + |insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") + |""".stripMargin) + val df1 = runQueryAndCompare("select * from hudi_pf where name = 'v1'") { _ => } + val hudiScanTransformer = df1.queryExecution.executedPlan.collect { + case f: HudiScanTransformer => f + }.head + // No data filters as only partition filters exist + assert(hudiScanTransformer.filterExprs().size == 0) + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil) + } + } + +} diff --git a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala new file mode 100644 index 0000000000000..ce6ec9bbcf304 --- /dev/null +++ b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.SparkConf + +import java.io.File + +class VeloxTPCHHudiSuite extends VeloxTPCHSuite { + + protected val tpchBasePath: String = new File( + "../backends-velox/src/test/resources").getAbsolutePath + + override protected val resourcePath: String = + new File(tpchBasePath, "tpch-data-parquet-velox").getCanonicalPath + + override protected val veloxTPCHQueries: String = + new File(tpchBasePath, "tpch-queries-velox").getCanonicalPath + + override protected val queriesResults: String = + new File(tpchBasePath, "queries-output").getCanonicalPath + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.executor.memory", "4g") + .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + } + + override protected def createTPCHNotNullTables(): Unit = { + TPCHTables + .map(_.name) + .map { + table => + val tablePath = new File(resourcePath, table).getAbsolutePath + val tableDF = spark.read.format(fileFormat).load(tablePath) + tableDF.write.format("hudi").mode("append").saveAsTable(table) + (table, tableDF) + } + .toMap + } + + override protected def afterAll(): Unit = { + TPCHTables.map(_.name).foreach(table => spark.sql(s"DROP TABLE IF EXISTS $table")) + super.afterAll() + } +} diff --git a/pom.xml b/pom.xml index 4fb327f9f7ddd..3d0cc53f1255b 100644 --- a/pom.xml +++ b/pom.xml @@ -264,6 +264,7 @@ 2.0.1 20 4.8 + 0.14.1 @@ -279,6 +280,7 @@ 2.3.0 23 4.8 + 0.14.1 @@ -293,6 +295,7 @@ 2.4.0 24 4.9.3 + 0.14.1 @@ -414,6 +417,15 @@ gluten-iceberg + + hudi + + false + + + gluten-hudi + + backends-velox