Skip to content

Commit

Permalink
Refine code and fix UTs
Browse files Browse the repository at this point in the history
  • Loading branch information
yma11 committed Aug 21, 2024
1 parent 30f2cf7 commit eea4b77
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 47 deletions.
5 changes: 2 additions & 3 deletions docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ About column mapping, see more [here](https://docs.delta.io/latest/delta-column-

## Iceberg Support

Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.
Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, both reading COW (Copy-On-Write) and MOR (Merge-On-Read) tables are supported.

### How to use

Expand All @@ -333,7 +333,6 @@ mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests

Once built successfully, iceberg features will be included in gluten-velox-bundle-X jar. Then you can query iceberg table by gluten/velox without scan's fallback.


## Hudi Support

Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.
Expand All @@ -346,7 +345,7 @@ First of all, compile gluten-hudi module by a `hudi` profile, as follows:
mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests
```

Once built successfully, hudi features will be included in gluten-velox-bundle-X jar. Then you can query hudi table by gluten/velox without scan's fallback.
Once built successfully, hudi features will be included in gluten-velox-bundle-X jar. Then you can query hudi **COW** table by gluten/velox without scan's fallback.

# Coverage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ trait DataSourceScanTransformerRegister {
/**
* The class name that used to identify what kind of datasource this is。
*
* For DataSource V1, it should be the child class name of
* [[org.apache.spark.sql.execution.datasources.FileIndex]].
* For DataSource V1, it should be relation.fileFormat like
* {{{
* override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat"
* }}}
*
* For DataSource V2, it should be the child class name of
* [[org.apache.spark.sql.connector.read.Scan]].
*
* For example:
* [[org.apache.spark.sql.connector.read.Scan]]. For example:
* {{{
* override val scanClassName: String = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
* }}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object ScanTransformerFactory {
.getOrElse(getClass.getClassLoader)
val serviceLoader = ServiceLoader.load(classOf[DataSourceScanTransformerRegister], loader)
serviceLoader.asScala
.filter(_.scanClassName.equalsIgnoreCase(scanClassName))
.filter(service => scanClassName.contains(service.scanClassName))
.toList match {
case head :: Nil =>
// there is exactly one registered alias
Expand Down
5 changes: 1 addition & 4 deletions gluten-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ case class HudiScanTransformer(
override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat

override protected def doValidateInternal(): ValidationResult = {
if (!requiredSchema.fields.exists(_.name == "_hoodie_record_key")) {
return ValidationResult.notOk(s"Hudi meta field not present.")
if (requiredSchema.fields.exists(_.name.startsWith("_hoodie"))) {
return ValidationResult.failed(s"Hudi meta field not supported.")
}

super.doValidateInternal()
}

Expand All @@ -78,20 +77,17 @@ case class HudiScanTransformer(

object HudiScanTransformer {

def apply(
scanExec: FileSourceScanExec,
newPartitionFilters: Seq[Expression]): HudiScanTransformer = {
def apply(scanExec: FileSourceScanExec): HudiScanTransformer = {
new HudiScanTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
newPartitionFilters,
scanExec.partitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters,
scanExec.tableIdentifier,
scanExec.disableBucketedScan
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.FileSourceScanExec

class HudiScanTransformerProvider extends DataSourceScanTransformerRegister {

override val scanClassName: String = "org.apache.hudi.HoodieFileIndex"
override val scanClassName: String = "HoodieParquetFileFormat"

override def createDataSourceTransformer(
batchScan: FileSourceScanExec,
newPartitionFilters: Seq[Expression]): FileSourceScanExecTransformerBase = {
HudiScanTransformer(batchScan, newPartitionFilters)
batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
HudiScanTransformer(batchScan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package org.apache.gluten.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}

import scala.collection.JavaConverters._

class VeloxHudiSuite extends WholeStageTransformerSuite {

Expand All @@ -31,8 +28,6 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Expand All @@ -43,7 +38,7 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
}

testWithSpecifiedSparkVersion("hudi: time travel", Some("3.3")) {
testWithSpecifiedSparkVersion("hudi: time travel", Some("3.2")) {
withTable("hudi_tm") {
spark.sql(s"""
|create table hudi_tm (id int, name string) using hudi
Expand All @@ -54,17 +49,65 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
spark.sql(s"""
|insert into hudi_tm values (3, "v3"), (4, "v4")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 1") { _ => }
val df = spark.sql(" select _hoodie_commit_time from hudi_tm;")
val value = df.collectAsList().get(0).getAs[String](0)
val df1 = runQueryAndCompare("select id, name from hudi_tm timestamp AS OF " + value) {
checkGlutenOperatorMatch[HudiScanTransformer]
}
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
val df2 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 2") { _ => }
checkLengthAndPlan(df2, 4)
checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil)
val df3 = runQueryAndCompare("select name from hudi_tm VERSION AS OF 2 where id = 2") {
_ =>
val df2 =
runQueryAndCompare("select name from hudi_tm timestamp AS OF " + value + " where id = 2") {
checkGlutenOperatorMatch[HudiScanTransformer]
}
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}
}

testWithSpecifiedSparkVersion("hudi: soft delete", Some("3.2")) {
withTable("hudi_pf") {
spark.sql(s"""
|create table hudi_pf (id int, name string) using hudi
|""".stripMargin)
spark.sql(s"""
|insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
|""".stripMargin)
spark.sql(s"""
|delete from hudi_pf where name = "v2"
|""".stripMargin)
val df1 = runQueryAndCompare("select id, name from hudi_pf") {
checkGlutenOperatorMatch[HudiScanTransformer]
}
checkLengthAndPlan(df3, 1)
checkAnswer(df3, Row("v2") :: Nil)
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
}
}

// FIXME: flaky leaked file systems issue
ignore("hudi: mor", Some("3.2")) {
withTable("hudi_mor") {
spark.sql(s"""
|create table hudi_mor (id int, name string, ts bigint)
|using hudi
|tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|""".stripMargin)
spark.sql(s"""
|insert into hudi_mor values (1, "v1", 1000), (2, "v2", 2000),
| (3, "v1", 3000), (4, "v2", 4000)
|""".stripMargin)
spark.sql(s"""
|delete from hudi_mor where id = 1
|""".stripMargin)
val df1 =
runQueryAndCompare("select id, name from hudi_mor where name = 'v1'", true, false, false) {
_ =>
}
checkAnswer(df1, Row(3, "v1") :: Nil)
}
}

Expand All @@ -76,7 +119,7 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
spark.sql(s"""
|insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from hudi_pf where name = 'v1'") { _ => }
val df1 = runQueryAndCompare("select id, name from hudi_pf where name = 'v1'") { _ => }
val hudiScanTransformer = df1.queryExecution.executedPlan.collect {
case f: HudiScanTransformer => f
}.head
Expand All @@ -86,5 +129,4 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
}
}

}
12 changes: 11 additions & 1 deletion package/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,17 @@
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-delta</artifactId>
<artifactId>gluten-delta</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hudi</id>
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-hudi</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
9 changes: 5 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@
<delta.version>2.0.1</delta.version>
<delta.binary.version>20</delta.binary.version>
<antlr4.version>4.8</antlr4.version>
<hudi.version>0.14.1</hudi.version>
<hudi.version>0.15.0</hudi.version>
</properties>
</profile>
<profile>
Expand All @@ -280,7 +280,7 @@
<delta.version>2.3.0</delta.version>
<delta.binary.version>23</delta.binary.version>
<antlr4.version>4.8</antlr4.version>
<hudi.version>0.14.1</hudi.version>
<hudi.version>0.15.0</hudi.version>
</properties>
</profile>
<profile>
Expand All @@ -295,7 +295,7 @@
<delta.version>2.4.0</delta.version>
<delta.binary.version>24</delta.binary.version>
<antlr4.version>4.9.3</antlr4.version>
<hudi.version>0.14.1</hudi.version>
<hudi.version>0.15.0</hudi.version>
</properties>
</profile>
<profile>
Expand All @@ -308,7 +308,8 @@
<iceberg.version>1.5.0</iceberg.version>
<delta.package.name>delta-spark</delta.package.name>
<delta.version>3.2.0</delta.version>
<delta.binary.version>32</delta.binary.version>
<delta.binary.version>32</delta.binary.version>
<hudi.version>0.15.0</hudi.version>
<fasterxml.version>2.15.1</fasterxml.version>
<hadoop.version>3.3.4</hadoop.version>
<antlr4.version>4.9.3</antlr4.version>
Expand Down

0 comments on commit eea4b77

Please sign in to comment.