Skip to content

Commit

Permalink
[GLUTEN-5471][VL]feat: Support read Hudi COW table
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored and yma11 committed Aug 21, 2024
1 parent aab550f commit 30f2cf7
Show file tree
Hide file tree
Showing 10 changed files with 484 additions and 8 deletions.
17 changes: 9 additions & 8 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ on:
- 'gluten-data/**'
- 'gluten-delta/**'
- 'gluten-iceberg/**'
- 'gluten-hudi/**'
- 'gluten-ut/**'
- 'shims/**'
- 'tools/gluten-it/**'
Expand Down Expand Up @@ -680,7 +681,7 @@ jobs:
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg \
-Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \
-Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -737,7 +738,7 @@ jobs:
- name: Build and run unit test for Spark 3.2.2 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
$MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
run-spark-test-spark33:
needs: build-native-lib-centos-7
Expand Down Expand Up @@ -799,7 +800,7 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: Upload golden files
Expand Down Expand Up @@ -858,7 +859,7 @@ jobs:
- name: Build and Run unit test for Spark 3.3.1 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
Expand Down Expand Up @@ -922,7 +923,7 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: Upload golden files
Expand Down Expand Up @@ -981,7 +982,7 @@ jobs:
- name: Build and Run unit test for Spark 3.4.2 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
Expand Down Expand Up @@ -1045,7 +1046,7 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: Upload golden files
Expand Down Expand Up @@ -1173,6 +1174,6 @@ jobs:
- name: Build and Run unit test for Spark 3.5.1 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
15 changes: 15 additions & 0 deletions docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,21 @@ mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests

Once built successfully, iceberg features will be included in gluten-velox-bundle-X jar. Then you can query iceberg table by gluten/velox without scan's fallback.


## Hudi Support

Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.

### How to use

First of all, compile gluten-hudi module by a `hudi` profile, as follows:

```
mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests
```

Once built successfully, hudi features will be included in gluten-velox-bundle-X jar. Then you can query hudi table by gluten/velox without scan's fallback.

# Coverage

Spark3.3 has 387 functions in total. ~240 are commonly used. To get the support status of all Spark built-in functions, please refer to [Velox Backend's Supported Operators & Functions](../velox-backend-support-progress.md).
Expand Down
1 change: 1 addition & 0 deletions docs/get-started/build-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The below parameters can be set via `-P` for mvn.
| uniffle | Build Gluten with Uniffle. | disabled |
| delta | Build Gluten with Delta Lake support. | disabled |
| iceberg | Build Gluten with Iceberg support. | disabled |
| hudi | Build Gluten with Hudi support. | disabled |
| spark-3.2 | Build Gluten for Spark 3.2. | enabled |
| spark-3.3 | Build Gluten for Spark 3.3. | disabled |
| spark-3.4 | Build Gluten for Spark 3.4. | disabled |
Expand Down
165 changes: 165 additions & 0 deletions gluten-hudi/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gluten-parent</artifactId>
<groupId>org.apache.gluten</groupId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>gluten-hudi</artifactId>
<packaging>jar</packaging>
<name>Gluten Hudi</name>

<properties>
<resource.dir>${project.basedir}/src/main/resources</resource.dir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark${sparkbundle.version}-bundle_${scala.binary.version}</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<!-- For test -->
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>backends-velox</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>backends-velox</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${resource.dir}</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<junitxml>.</junitxml>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-test-jar</id>
<phase>test-compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.execution.HudiScanTransformerProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet

case class HudiScanTransformer(
@transient override val relation: HadoopFsRelation,
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
override val optionalBucketSet: Option[BitSet],
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
override val disableBucketedScan: Boolean = false)
extends FileSourceScanExecTransformerBase(
relation,
output,
requiredSchema,
partitionFilters,
optionalBucketSet,
optionalNumCoalescedBuckets,
dataFilters,
tableIdentifier,
disableBucketedScan
) {

override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat

override protected def doValidateInternal(): ValidationResult = {
if (!requiredSchema.fields.exists(_.name == "_hoodie_record_key")) {
return ValidationResult.notOk(s"Hudi meta field not present.")
}

super.doValidateInternal()
}

override def doCanonicalize(): HudiScanTransformer = {
HudiScanTransformer(
relation,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters),
output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None,
disableBucketedScan
)
}
}

object HudiScanTransformer {

def apply(
scanExec: FileSourceScanExec,
newPartitionFilters: Seq[Expression]): HudiScanTransformer = {
new HudiScanTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
newPartitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters,
scanExec.tableIdentifier,
scanExec.disableBucketedScan
)
}

}
Loading

0 comments on commit 30f2cf7

Please sign in to comment.