Skip to content

Commit

Permalink
Refine code and fix UTs
Browse files Browse the repository at this point in the history
  • Loading branch information
yma11 committed Jun 12, 2024
1 parent db6f75a commit 1ac829c
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 39 deletions.
9 changes: 2 additions & 7 deletions docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ About column mapping, see more [here](https://docs.delta.io/latest/delta-column-

## Iceberg Support

Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.
Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, both reading COW (Copy-On-Write) and MOR (Merge-On-Read) tables are supported.

### How to use

Expand All @@ -305,8 +305,6 @@ mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests

Once built successfully, iceberg features will be included in gluten-velox-bundle-X jar. Then you can query iceberg table by gluten/velox without scan's fallback.

After the two steps, you can query iceberg table by gluten/velox without scan's fallback.

## Hudi Support

Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.
Expand All @@ -319,10 +317,7 @@ First of all, compile gluten-hudi module by a `hudi` profile, as follows:
mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests
```

Then, put the additional `gluten-hudi-XX-SNAPSHOT.jar` to the class path (usually it's `$SPARK_HOME/jars`).
The gluten-hudi jar is in `gluten-hudi/target` directory.

After the two steps, you can query Hudi table by gluten/velox without scan's fallback.
Once built successfully, hudi features will be included in gluten-velox-bundle-X jar. Then you can query hudi **COW** table by gluten/velox without scan's fallback.

# Coverage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ trait DataSourceScanTransformerRegister {
/**
* The class name that used to identify what kind of datasource this is。
*
* For DataSource V1, it should be the child class name of
* [[org.apache.spark.sql.execution.datasources.FileIndex]].
* For DataSource V1, it should be relation.fileFormat like
* {{{
* override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat"
* }}}
*
* For DataSource V2, it should be the child class name of
* [[org.apache.spark.sql.connector.read.Scan]].
*
* For example:
* [[org.apache.spark.sql.connector.read.Scan]]. For example:
* {{{
* override val scanClassName: String = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
* }}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ object ScanTransformerFactory {
ExpressionConverter.transformDynamicPruningExpr(scanExec.partitionFilters)
}
val fileFormat = scanExec.relation.fileFormat
lookupDataSourceScanTransformer(fileFormat.getClass.getName) match {
val name = fileFormat.getClass.getName
lookupDataSourceScanTransformer(name) match {
case Some(clz) =>
clz
.getDeclaredConstructor()
Expand Down Expand Up @@ -147,7 +148,7 @@ object ScanTransformerFactory {
.getOrElse(getClass.getClassLoader)
val serviceLoader = ServiceLoader.load(classOf[DataSourceScanTransformerRegister], loader)
serviceLoader.asScala
.filter(_.scanClassName.equalsIgnoreCase(scanClassName))
.filter(service => scanClassName.contains(service.scanClassName))
.toList match {
case head :: Nil =>
// there is exactly one registered alias
Expand Down
5 changes: 1 addition & 4 deletions gluten-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ case class HudiScanTransformer(
override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat

override protected def doValidateInternal(): ValidationResult = {
if (!requiredSchema.fields.exists(_.name == "_hoodie_record_key")) {
return ValidationResult.notOk(s"Hudi meta field not present.")
if (requiredSchema.fields.exists(_.name.startsWith("_hoodie"))) {
return ValidationResult.notOk(s"Hudi meta field not supported.")
}

super.doValidateInternal()
}

Expand Down Expand Up @@ -93,5 +92,4 @@ object HudiScanTransformer {
scanExec.disableBucketedScan
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.execution.FileSourceScanExec

class HudiScanTransformerProvider extends DataSourceScanTransformerRegister {

override val scanClassName: String = "org.apache.hudi.HoodieFileIndex"
override val scanClassName: String = "HoodieParquetFileFormat"

override def createDataSourceTransformer(
batchScan: FileSourceScanExec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package org.apache.gluten.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}

import scala.collection.JavaConverters._

class VeloxHudiSuite extends WholeStageTransformerSuite {

Expand All @@ -31,8 +28,6 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Expand All @@ -43,7 +38,7 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
}

testWithSpecifiedSparkVersion("hudi: time travel", Some("3.3")) {
testWithSpecifiedSparkVersion("hudi: time travel", Some("3.2")) {
withTable("hudi_tm") {
spark.sql(s"""
|create table hudi_tm (id int, name string) using hudi
Expand All @@ -54,17 +49,65 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
spark.sql(s"""
|insert into hudi_tm values (3, "v3"), (4, "v4")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 1") { _ => }
val df = spark.sql(" select _hoodie_commit_time from hudi_tm;")
val value = df.collectAsList().get(0).getAs[String](0)
val df1 = runQueryAndCompare("select id, name from hudi_tm timestamp AS OF " + value) {
checkGlutenOperatorMatch[HudiScanTransformer]
}
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
val df2 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 2") { _ => }
checkLengthAndPlan(df2, 4)
checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil)
val df3 = runQueryAndCompare("select name from hudi_tm VERSION AS OF 2 where id = 2") {
_ =>
val df2 =
runQueryAndCompare("select name from hudi_tm timestamp AS OF " + value + " where id = 2") {
checkGlutenOperatorMatch[HudiScanTransformer]
}
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}
}

testWithSpecifiedSparkVersion("hudi: soft delete", Some("3.2")) {
withTable("hudi_pf") {
spark.sql(s"""
|create table hudi_pf (id int, name string) using hudi
|""".stripMargin)
spark.sql(s"""
|insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
|""".stripMargin)
spark.sql(s"""
|delete from hudi_pf where name = "v2"
|""".stripMargin)
val df1 = runQueryAndCompare("select id, name from hudi_pf") {
checkGlutenOperatorMatch[HudiScanTransformer]
}
checkLengthAndPlan(df3, 1)
checkAnswer(df3, Row("v2") :: Nil)
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
}
}

// FIXME: flaky leaked file systems issue
ignore("hudi: mor", Some("3.2")) {
withTable("hudi_mor") {
spark.sql(s"""
|create table hudi_mor (id int, name string, ts bigint)
|using hudi
|tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|""".stripMargin)
spark.sql(s"""
|insert into hudi_mor values (1, "v1", 1000), (2, "v2", 2000),
| (3, "v1", 3000), (4, "v2", 4000)
|""".stripMargin)
spark.sql(s"""
|delete from hudi_mor where id = 1
|""".stripMargin)
val df1 =
runQueryAndCompare("select id, name from hudi_mor where name = 'v1'", true, false, false) {
_ =>
}
checkAnswer(df1, Row(3, "v1") :: Nil)
}
}

Expand All @@ -76,7 +119,7 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
spark.sql(s"""
|insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from hudi_pf where name = 'v1'") { _ => }
val df1 = runQueryAndCompare("select id, name from hudi_pf where name = 'v1'") { _ => }
val hudiScanTransformer = df1.queryExecution.executedPlan.collect {
case f: HudiScanTransformer => f
}.head
Expand All @@ -86,5 +129,4 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
}
}

}
10 changes: 10 additions & 0 deletions package/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hudi</id>
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-hudi</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>

<build>
Expand Down

0 comments on commit 1ac829c

Please sign in to comment.