Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import sbtprotoc.ProtocPlugin.autoImport._
import xsbti.compile.CompileAnalysis

import Checkstyle._
import ShadedIcebergBuild._
import Mima._
import Unidoc._

Expand Down Expand Up @@ -256,6 +257,8 @@ lazy val connectCommon = (project in file("spark-connect/common"))
name := "delta-connect-common",
commonSettings,
crossSparkSettings(),
// iceberg-core 1.8.0 brings jackson 2.18.2 thus force upgrade
dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.18.2",
releaseSettings,
Compile / compile := runTaskOnlyOnSparkMaster(
task = Compile / compile,
Expand Down Expand Up @@ -910,37 +913,54 @@ lazy val iceberg = (project in file("iceberg"))
)
// scalastyle:on println

lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs")

val icebergShadedVersion = "1.8.0"
lazy val icebergShaded = (project in file("icebergShaded"))
.dependsOn(spark % "provided")
.disablePlugins(JavaFormatterPlugin, ScalafmtPlugin)
.settings (
name := "iceberg-shaded",
commonSettings,
skipReleaseSettings,

// Compile, patch and generated Iceberg JARs
generateIcebergJarsTask := {
import sys.process._
val scriptPath = baseDirectory.value / "generate_iceberg_jars.py"
// Download iceberg code in `iceberg_src` dir and generate the JARs in `lib` dir
Seq("python3", scriptPath.getPath)!
},
Compile / unmanagedJars := (Compile / unmanagedJars).dependsOn(generateIcebergJarsTask).value,
cleanFiles += baseDirectory.value / "iceberg_src",
cleanFiles += baseDirectory.value / "lib",

// must exclude all dependencies from Iceberg that delta-spark includes
libraryDependencies ++= Seq(
// Fix Iceberg's legacy java.lang.NoClassDefFoundError: scala/jdk/CollectionConverters$ error
// due to legacy scala.
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1" % "provided",
"org.apache.iceberg" % "iceberg-core" % icebergShadedVersion excludeAll (
icebergExclusionRules: _*
),
"org.apache.iceberg" % "iceberg-hive-metastore" % icebergShadedVersion excludeAll (
icebergExclusionRules: _*
),
// the hadoop client and hive metastore versions come from this file in the
// iceberg repo of icebergShadedVersion: iceberg/gradle/libs.versions.toml
"org.apache.hadoop" % "hadoop-client" % "2.7.3" % "provided" excludeAll (
hadoopClientExclusionRules: _*
),
"org.apache.hive" % "hive-metastore" % "2.3.8" % "provided" excludeAll (
hiveMetastoreExclusionRules: _*
)
),
// Generated shaded Iceberg JARs
Compile / packageBin := assembly.value,
assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar",
assembly / logLevel := Level.Info,
assembly / test := {},
assembly / assemblyShadeRules := Seq(
ShadeRule.rename("org.apache.iceberg.**" -> "shadedForDelta.@0").inAll,
ShadeRule.rename("org.apache.iceberg.**" -> "shadedForDelta.@0").inAll
),
assembly / assemblyExcludedJars := {
val cp = (fullClasspath in assembly).value
cp.filter { jar =>
val doExclude = jar.data.getName.contains("jackson-annotations") ||
jar.data.getName.contains("RoaringBitmap")
doExclude
}
},
// all following clases have Delta customized implementation under icebergShaded/src and thus
// require them to be 'first' to replace the class from iceberg jar
assembly / assemblyMergeStrategy := updateMergeStrategy((assembly / assemblyMergeStrategy).value),
assemblyPackageScala / assembleArtifact := false,
// Make the 'compile' invoke the 'assembly' task to generate the uber jar.
)

lazy val hudi = (project in file("hudi"))
Expand Down
27 changes: 24 additions & 3 deletions examples/scala/src/main/scala/example/UniForm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,37 @@ object UniForm {
.config("spark.sql.catalogImplementation", "hive")
.getOrCreate()

val schema =
"""
|col0 INT,
|col1 STRUCT<
| col2: MAP<INT, INT>,
| col3: ARRAY<INT>,
| col4: STRUCT<col5: STRING>
|>,
|col6 INT,
|col7 INT
|""".stripMargin

def getRowToInsertStr(id: Int): String = {
s"""
|$id,
|struct(map($id, $id), array($id), struct($id)),
|$id,
|$id
|""".stripMargin
}

deltaSpark.sql(s"DROP TABLE IF EXISTS ${testTableName}")
deltaSpark.sql(
s"""CREATE TABLE `${testTableName}` (col1 INT) using DELTA
s"""CREATE TABLE `${testTableName}` ($schema) using DELTA
|PARTITIONED BY (col0, col6, col7)
|TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.enableIcebergCompatV1' = 'true',
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
deltaSpark.sql(s"INSERT INTO `$testTableName` VALUES (123)")
deltaSpark.sql(s"INSERT INTO $testTableName VALUES (${getRowToInsertStr(1)})")

// Wait for the conversion to be done
Thread.sleep(10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class DeltaToIcebergConverter(val snapshot: SnapshotDescriptor, val catalogTable
private val schemaUtils: IcebergSchemaUtils =
IcebergSchemaUtils(snapshot.metadata.columnMappingMode == NoMapping)

def maxFieldId: Int = schemaUtils.maxFieldId(snapshot)

val schema: IcebergSchema = IcebergCompat
.getEnabledVersion(snapshot.metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import shadedForDelta.org.apache.iceberg.{AppendFiles, DataFile, DeleteFiles, ExpireSnapshots, OverwriteFiles, PartitionSpec, PendingUpdate, RewriteFiles, Schema => IcebergSchema, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.MetadataUpdate
import shadedForDelta.org.apache.iceberg.MetadataUpdate.{AddPartitionSpec, AddSchema}
import shadedForDelta.org.apache.iceberg.mapping.MappingUtil
import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser
import shadedForDelta.org.apache.iceberg.util.LocationUtil
Expand Down Expand Up @@ -65,7 +67,9 @@ class IcebergConversionTransaction(
protected val postCommitSnapshot: Snapshot,
protected val tableOp: IcebergTableOp = WRITE_TABLE,
protected val lastConvertedIcebergSnapshotId: Option[Long] = None,
protected val lastConvertedDeltaVersion: Option[Long] = None
protected val lastConvertedDeltaVersion: Option[Long] = None,
protected val metadataUpdates: java.util.ArrayList[MetadataUpdate] =
new java.util.ArrayList[MetadataUpdate]()
) extends DeltaLogging {

///////////////////////////
Expand Down Expand Up @@ -323,7 +327,7 @@ class IcebergConversionTransaction(
log"Setting new Iceberg schema:\n " +
log"${MDC(DeltaLogKeys.SCHEMA, icebergSchema)}"
)
txn.setSchema(icebergSchema).commit()
metadataUpdates.add(new AddSchema(icebergSchema, convert.maxFieldId))

recordDeltaEvent(
postCommitSnapshot.deltaLog,
Expand Down Expand Up @@ -386,13 +390,9 @@ class IcebergConversionTransaction(

val nameMapping = NameMappingParser.toJson(MappingUtil.create(icebergSchema))

// hard code dummy delta version as -1 for CREATE_TABLE, which will be later
// set to correct version in setSchemaTxn. -1 is chosen because it is less than the smallest
// possible legitimate Delta version which is 0.
val deltaVersion = if (tableOp == CREATE_TABLE) -1 else postCommitSnapshot.version

var updateTxn = txn.updateProperties()
updateTxn = updateTxn.set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString)
updateTxn = updateTxn.set(IcebergConverter.DELTA_VERSION_PROPERTY,
postCommitSnapshot.version.toString)
.set(IcebergConverter.DELTA_TIMESTAMP_PROPERTY, postCommitSnapshot.timestamp.toString)
.set(IcebergConstants.ICEBERG_NAME_MAPPING_PROPERTY, nameMapping)

Expand Down Expand Up @@ -426,19 +426,21 @@ class IcebergConversionTransaction(
)
}
try {
txn.commitTransaction()
// Iceberg CREATE_TABLE reassigns the field id in schema, which
// is overwritten by setting Delta schema with Delta generated field id to ensure
// consistency between field id in Iceberg schema after conversion and field id in
// parquet files written by Delta.
if (tableOp == CREATE_TABLE) {
// Iceberg CREATE_TABLE reassigns the field id in schema, which
// is overwritten by setting Delta schema with Delta generated field id to ensure
// consistency between field id in Iceberg schema after conversion and field id in
// parquet files written by Delta.
val setSchemaTxn = createIcebergTxn(Some(WRITE_TABLE))
setSchemaTxn.setSchema(icebergSchema).commit()
setSchemaTxn.updateProperties()
.set(IcebergConverter.DELTA_VERSION_PROPERTY, postCommitSnapshot.version.toString)
.commit()
setSchemaTxn.commitTransaction()
metadataUpdates.add(
new AddSchema(icebergSchema, postCommitSnapshot.metadata.columnMappingMaxId.toInt)
)
if (postCommitSnapshot.metadata.partitionColumns.nonEmpty) {
metadataUpdates.add(
new AddPartitionSpec(partitionSpec)
)
}
}
txn.commitTransaction()
recordIcebergCommit()
} catch {
case NonFatal(e) =>
Expand All @@ -455,7 +457,7 @@ class IcebergConversionTransaction(

protected def createIcebergTxn(tableOpOpt: Option[IcebergTableOp] = None):
IcebergTransaction = {
val hiveCatalog = IcebergTransactionUtils.createHiveCatalog(conf)
val hiveCatalog = IcebergTransactionUtils.createHiveCatalog(conf, metadataUpdates)
val icebergTableId = IcebergTransactionUtils
.convertSparkTableIdentifierToIcebergHive(catalogTable.identifier)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ trait IcebergSchemaUtils extends DeltaLogging {
new IcebergSchema(icebergStruct.fields())
}

def maxFieldId(snapshot: SnapshotDescriptor): Int


////////////////////
// Helper Methods //
Expand Down Expand Up @@ -149,6 +151,7 @@ object IcebergSchemaUtils {
// ground of truth and no column Id is available.
private var dummyId: Int = 1

def maxFieldId(snapshot: SnapshotDescriptor): Int = dummyId

def getFieldId(field: Option[StructField]): Int = {
val fieldId = dummyId
Expand All @@ -162,6 +165,8 @@ object IcebergSchemaUtils {

private class IcebergSchemaUtilsIdMapping() extends IcebergSchemaUtils {

def maxFieldId(snapshot: SnapshotDescriptor): Int =
snapshot.metadata.columnMappingMaxId.toInt

def getFieldId(field: Option[StructField]): Int = {
if (!field.exists(f => DeltaColumnMapping.hasColumnId(f))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.delta.util.PartitionUtils.{timestampPartitionPattern
import org.apache.spark.sql.delta.util.TimestampFormatter
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema}
import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, MetadataUpdate, PartitionSpec, Schema => IcebergSchema}
import shadedForDelta.org.apache.iceberg.Metrics
import shadedForDelta.org.apache.iceberg.StructLike
import shadedForDelta.org.apache.iceberg.catalog.{Namespace, TableIdentifier => IcebergTableIdentifier}
Expand Down Expand Up @@ -230,10 +230,13 @@ object IcebergTransactionUtils
* @param conf: Hadoop Configuration
* @return
*/
def createHiveCatalog(conf : Configuration) : HiveCatalog = {
def createHiveCatalog(
conf: Configuration,
metadataUpdates: java.util.ArrayList[MetadataUpdate]
= new java.util.ArrayList[MetadataUpdate]()) : HiveCatalog = {
val catalog = new HiveCatalog()
catalog.setConf(conf)
catalog.initialize("spark_catalog", Map.empty[String, String].asJava)
catalog.initialize("spark_catalog", Map.empty[String, String].asJava, metadataUpdates)
catalog
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ class ConvertToIcebergSuite extends QueryTest with Eventually {

testTable = testTable.copy(properties = Map.empty)
resultTable = UniversalFormat.enforceSupportInCatalog(testTable, testMetadata)
assert(resultTable.nonEmpty)
assert(resultTable.get.properties("table_type") == "iceberg")
assert(resultTable.isEmpty)
}

test("basic test - managed table created with SQL") {
Expand Down
Loading
Loading