diff --git a/.github/workflows/connectors_test.yaml b/.github/workflows/connectors_test.yaml index 6594b30fb12..2c75c50701d 100644 --- a/.github/workflows/connectors_test.yaml +++ b/.github/workflows/connectors_test.yaml @@ -14,7 +14,7 @@ jobs: uses: actions/setup-java@v2 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Scala, SBT uses: actions/cache@v4 with: diff --git a/.github/workflows/iceberg_test.yaml b/.github/workflows/iceberg_test.yaml index aecc6e3241b..cd91814efdc 100644 --- a/.github/workflows/iceberg_test.yaml +++ b/.github/workflows/iceberg_test.yaml @@ -25,7 +25,7 @@ jobs: uses: actions/setup-java@v3 with: distribution: "zulu" - java-version: "8" + java-version: "11" - name: Cache Scala, SBT uses: actions/cache@v3 with: diff --git a/.github/workflows/kernel_docs.yaml b/.github/workflows/kernel_docs.yaml index 27e397df56f..885c2c3047c 100644 --- a/.github/workflows/kernel_docs.yaml +++ b/.github/workflows/kernel_docs.yaml @@ -31,7 +31,7 @@ jobs: uses: actions/setup-java@v3 with: distribution: "zulu" - java-version: "8" + java-version: "11" - name: Generate docs run: | build/sbt kernelGroup/unidoc diff --git a/.github/workflows/kernel_test.yaml b/.github/workflows/kernel_test.yaml index f48a2af68f4..2b8c040a0bf 100644 --- a/.github/workflows/kernel_test.yaml +++ b/.github/workflows/kernel_test.yaml @@ -41,7 +41,7 @@ jobs: uses: actions/setup-java@v4 with: distribution: "zulu" - java-version: "8" + java-version: "11" - name: Cache SBT and dependencies id: cache-sbt uses: actions/cache@v4 diff --git a/.github/workflows/spark_examples_test.yaml b/.github/workflows/spark_examples_test.yaml index 6337e3e36ef..f4cc29117bd 100644 --- a/.github/workflows/spark_examples_test.yaml +++ b/.github/workflows/spark_examples_test.yaml @@ -24,7 +24,7 @@ jobs: uses: actions/setup-java@v3 with: distribution: "zulu" - java-version: "8" + java-version: "11" - name: Cache Scala, SBT uses: actions/cache@v3 with: diff --git a/.github/workflows/spark_python_test.yaml b/.github/workflows/spark_python_test.yaml index 55c54741175..a9fc9f2acf8 100644 --- a/.github/workflows/spark_python_test.yaml +++ b/.github/workflows/spark_python_test.yaml @@ -25,7 +25,7 @@ jobs: uses: actions/setup-java@v3 with: distribution: "zulu" - java-version: "8" + java-version: "11" - name: Cache Scala, SBT uses: actions/cache@v3 with: diff --git a/.github/workflows/spark_test.yaml b/.github/workflows/spark_test.yaml index c82a8be5a4f..d8faa5c8b88 100644 --- a/.github/workflows/spark_test.yaml +++ b/.github/workflows/spark_test.yaml @@ -29,7 +29,7 @@ jobs: uses: actions/setup-java@v3 with: distribution: "zulu" - java-version: "8" + java-version: "11" - name: Cache Scala, SBT uses: actions/cache@v3 with: diff --git a/.github/workflows/unidoc.yaml b/.github/workflows/unidoc.yaml index f7a04572d67..3854684017f 100644 --- a/.github/workflows/unidoc.yaml +++ b/.github/workflows/unidoc.yaml @@ -13,7 +13,7 @@ uses: actions/setup-java@v3 with: distribution: "zulu" - java-version: "8" + java-version: "11" - uses: actions/checkout@v3 - name: generate unidoc run: build/sbt "++ ${{ matrix.scala }}" unidoc diff --git a/.github/workflows/unity_test.yaml b/.github/workflows/unity_test.yaml index a03cf75ef46..65eb22890ab 100644 --- a/.github/workflows/unity_test.yaml +++ b/.github/workflows/unity_test.yaml @@ -22,7 +22,7 @@ jobs: uses: actions/setup-java@v3 with: distribution: "zulu" - java-version: "8" + java-version: "11" if: steps.git-diff.outputs.diff - name: Run Unity tests with coverage run: | diff --git a/build.sbt b/build.sbt index 685204f3b4e..0c97f0ca788 100644 --- a/build.sbt +++ b/build.sbt @@ -35,6 +35,7 @@ import sbtprotoc.ProtocPlugin.autoImport._ import xsbti.compile.CompileAnalysis import Checkstyle._ +import ShadedIcebergBuild._ import Mima._ import Unidoc._ @@ -91,7 +92,7 @@ crossScalaVersions := Nil // For Java 11 use the following on command line // sbt 'set targetJvm := "11"' [commands] val targetJvm = settingKey[String]("Target JVM version") -Global / targetJvm := "8" +Global / targetJvm := "11" lazy val javaVersion = sys.props.getOrElse("java.version", "Unknown") lazy val javaVersionInt = javaVersion.split("\\.")(0).toInt @@ -267,6 +268,8 @@ lazy val connectCommon = (project in file("spark-connect/common")) name := "delta-connect-common", commonSettings, crossSparkSettings(), + // iceberg-core 1.8.0 brings jackson 2.18.2 thus force upgrade + dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.18.2", releaseSettings, Compile / compile := runTaskOnlyOnSparkMaster( task = Compile / compile, @@ -921,8 +924,7 @@ lazy val iceberg = (project in file("iceberg")) ) // scalastyle:on println -lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs") - +val icebergShadedVersion = "1.8.0" lazy val icebergShaded = (project in file("icebergShaded")) .dependsOn(spark % "provided") .disablePlugins(JavaFormatterPlugin, ScalafmtPlugin) @@ -930,28 +932,46 @@ lazy val icebergShaded = (project in file("icebergShaded")) name := "iceberg-shaded", commonSettings, skipReleaseSettings, - - // Compile, patch and generated Iceberg JARs - generateIcebergJarsTask := { - import sys.process._ - val scriptPath = baseDirectory.value / "generate_iceberg_jars.py" - // Download iceberg code in `iceberg_src` dir and generate the JARs in `lib` dir - Seq("python3", scriptPath.getPath)! - }, - Compile / unmanagedJars := (Compile / unmanagedJars).dependsOn(generateIcebergJarsTask).value, - cleanFiles += baseDirectory.value / "iceberg_src", - cleanFiles += baseDirectory.value / "lib", - + // must exclude all dependencies from Iceberg that delta-spark includes + libraryDependencies ++= Seq( + // Fix Iceberg's legacy java.lang.NoClassDefFoundError: scala/jdk/CollectionConverters$ error + // due to legacy scala. + "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1" % "provided", + "org.apache.iceberg" % "iceberg-core" % icebergShadedVersion excludeAll ( + icebergExclusionRules: _* + ), + "org.apache.iceberg" % "iceberg-hive-metastore" % icebergShadedVersion excludeAll ( + icebergExclusionRules: _* + ), + // the hadoop client and hive metastore versions come from this file in the + // iceberg repo of icebergShadedVersion: iceberg/gradle/libs.versions.toml + "org.apache.hadoop" % "hadoop-client" % "2.7.3" % "provided" excludeAll ( + hadoopClientExclusionRules: _* + ), + "org.apache.hive" % "hive-metastore" % "2.3.8" % "provided" excludeAll ( + hiveMetastoreExclusionRules: _* + ) + ), // Generated shaded Iceberg JARs Compile / packageBin := assembly.value, assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar", assembly / logLevel := Level.Info, assembly / test := {}, assembly / assemblyShadeRules := Seq( - ShadeRule.rename("org.apache.iceberg.**" -> "shadedForDelta.@0").inAll, + ShadeRule.rename("org.apache.iceberg.**" -> "shadedForDelta.@0").inAll ), + assembly / assemblyExcludedJars := { + val cp = (fullClasspath in assembly).value + cp.filter { jar => + val doExclude = jar.data.getName.contains("jackson-annotations") || + jar.data.getName.contains("RoaringBitmap") + doExclude + } + }, + // all following clases have Delta customized implementation under icebergShaded/src and thus + // require them to be 'first' to replace the class from iceberg jar + assembly / assemblyMergeStrategy := updateMergeStrategy((assembly / assemblyMergeStrategy).value), assemblyPackageScala / assembleArtifact := false, - // Make the 'compile' invoke the 'assembly' task to generate the uber jar. ) lazy val hudi = (project in file("hudi")) diff --git a/examples/scala/src/main/scala/example/UniForm.scala b/examples/scala/src/main/scala/example/UniForm.scala index 291f9101bcf..7f5d0cc947a 100644 --- a/examples/scala/src/main/scala/example/UniForm.scala +++ b/examples/scala/src/main/scala/example/UniForm.scala @@ -60,16 +60,37 @@ object UniForm { .config("spark.sql.catalogImplementation", "hive") .getOrCreate() + val schema = + """ + |col0 INT, + |col1 STRUCT< + | col2: MAP, + | col3: ARRAY, + | col4: STRUCT + |>, + |col6 INT, + |col7 INT + |""".stripMargin + + def getRowToInsertStr(id: Int): String = { + s""" + |$id, + |struct(map($id, $id), array($id), struct($id)), + |$id, + |$id + |""".stripMargin + } deltaSpark.sql(s"DROP TABLE IF EXISTS ${testTableName}") deltaSpark.sql( - s"""CREATE TABLE `${testTableName}` (col1 INT) using DELTA + s"""CREATE TABLE `${testTableName}` ($schema) using DELTA + |PARTITIONED BY (col0, col6, col7) |TBLPROPERTIES ( | 'delta.columnMapping.mode' = 'name', - | 'delta.enableIcebergCompatV1' = 'true', + | 'delta.enableIcebergCompatV2' = 'true', | 'delta.universalFormat.enabledFormats' = 'iceberg' |)""".stripMargin) - deltaSpark.sql(s"INSERT INTO `$testTableName` VALUES (123)") + deltaSpark.sql(s"INSERT INTO $testTableName VALUES (${getRowToInsertStr(1)})") // Wait for the conversion to be done Thread.sleep(10000) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/DeltaToIcebergConvert.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/DeltaToIcebergConvert.scala index 9392c687a95..5e3a4310080 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/DeltaToIcebergConvert.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/DeltaToIcebergConvert.scala @@ -44,6 +44,7 @@ class DeltaToIcebergConverter(val snapshot: SnapshotDescriptor, val catalogTable private val schemaUtils: IcebergSchemaUtils = IcebergSchemaUtils(snapshot.metadata.columnMappingMode == NoMapping) + def maxFieldId: Int = schemaUtils.maxFieldId(snapshot) val schema: IcebergSchema = IcebergCompat .getEnabledVersion(snapshot.metadata) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index d1bef91b939..9699e635e5c 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -34,6 +34,8 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import shadedForDelta.org.apache.iceberg.{AppendFiles, DataFile, DeleteFiles, ExpireSnapshots, OverwriteFiles, PartitionSpec, PendingUpdate, RewriteFiles, Schema => IcebergSchema, Transaction => IcebergTransaction} +import shadedForDelta.org.apache.iceberg.MetadataUpdate +import shadedForDelta.org.apache.iceberg.MetadataUpdate.{AddPartitionSpec, AddSchema} import shadedForDelta.org.apache.iceberg.mapping.MappingUtil import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser import shadedForDelta.org.apache.iceberg.util.LocationUtil @@ -65,7 +67,9 @@ class IcebergConversionTransaction( protected val postCommitSnapshot: Snapshot, protected val tableOp: IcebergTableOp = WRITE_TABLE, protected val lastConvertedIcebergSnapshotId: Option[Long] = None, - protected val lastConvertedDeltaVersion: Option[Long] = None + protected val lastConvertedDeltaVersion: Option[Long] = None, + protected val metadataUpdates: java.util.ArrayList[MetadataUpdate] = + new java.util.ArrayList[MetadataUpdate]() ) extends DeltaLogging { /////////////////////////// @@ -323,7 +327,7 @@ class IcebergConversionTransaction( log"Setting new Iceberg schema:\n " + log"${MDC(DeltaLogKeys.SCHEMA, icebergSchema)}" ) - txn.setSchema(icebergSchema).commit() + metadataUpdates.add(new AddSchema(icebergSchema, convert.maxFieldId)) recordDeltaEvent( postCommitSnapshot.deltaLog, @@ -386,13 +390,9 @@ class IcebergConversionTransaction( val nameMapping = NameMappingParser.toJson(MappingUtil.create(icebergSchema)) - // hard code dummy delta version as -1 for CREATE_TABLE, which will be later - // set to correct version in setSchemaTxn. -1 is chosen because it is less than the smallest - // possible legitimate Delta version which is 0. - val deltaVersion = if (tableOp == CREATE_TABLE) -1 else postCommitSnapshot.version - var updateTxn = txn.updateProperties() - updateTxn = updateTxn.set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString) + updateTxn = updateTxn.set(IcebergConverter.DELTA_VERSION_PROPERTY, + postCommitSnapshot.version.toString) .set(IcebergConverter.DELTA_TIMESTAMP_PROPERTY, postCommitSnapshot.timestamp.toString) .set(IcebergConstants.ICEBERG_NAME_MAPPING_PROPERTY, nameMapping) @@ -426,19 +426,17 @@ class IcebergConversionTransaction( ) } try { - txn.commitTransaction() if (tableOp == CREATE_TABLE) { - // Iceberg CREATE_TABLE reassigns the field id in schema, which - // is overwritten by setting Delta schema with Delta generated field id to ensure - // consistency between field id in Iceberg schema after conversion and field id in - // parquet files written by Delta. - val setSchemaTxn = createIcebergTxn(Some(WRITE_TABLE)) - setSchemaTxn.setSchema(icebergSchema).commit() - setSchemaTxn.updateProperties() - .set(IcebergConverter.DELTA_VERSION_PROPERTY, postCommitSnapshot.version.toString) - .commit() - setSchemaTxn.commitTransaction() + metadataUpdates.add( + new AddSchema(icebergSchema, postCommitSnapshot.metadata.columnMappingMaxId.toInt) + ) + if (postCommitSnapshot.metadata.partitionColumns.nonEmpty) { + metadataUpdates.add( + new AddPartitionSpec(partitionSpec) + ) + } } + txn.commitTransaction() recordIcebergCommit() } catch { case NonFatal(e) => @@ -455,7 +453,7 @@ class IcebergConversionTransaction( protected def createIcebergTxn(tableOpOpt: Option[IcebergTableOp] = None): IcebergTransaction = { - val hiveCatalog = IcebergTransactionUtils.createHiveCatalog(conf) + val hiveCatalog = IcebergTransactionUtils.createHiveCatalog(conf, metadataUpdates) val icebergTableId = IcebergTransactionUtils .convertSparkTableIdentifierToIcebergHive(catalogTable.identifier) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergSchemaUtils.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergSchemaUtils.scala index 3a657bb22fb..f885dd74d74 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergSchemaUtils.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergSchemaUtils.scala @@ -46,6 +46,8 @@ trait IcebergSchemaUtils extends DeltaLogging { new IcebergSchema(icebergStruct.fields()) } + def maxFieldId(snapshot: SnapshotDescriptor): Int + //////////////////// // Helper Methods // @@ -149,6 +151,7 @@ object IcebergSchemaUtils { // ground of truth and no column Id is available. private var dummyId: Int = 1 + def maxFieldId(snapshot: SnapshotDescriptor): Int = dummyId def getFieldId(field: Option[StructField]): Int = { val fieldId = dummyId @@ -162,6 +165,8 @@ object IcebergSchemaUtils { private class IcebergSchemaUtilsIdMapping() extends IcebergSchemaUtils { + def maxFieldId(snapshot: SnapshotDescriptor): Int = + snapshot.metadata.columnMappingMaxId.toInt def getFieldId(field: Option[StructField]): Int = { if (!field.exists(f => DeltaColumnMapping.hasColumnId(f))) { diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala index 8cbc6a9df07..b88c14aa3b0 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.delta.util.PartitionUtils.{timestampPartitionPattern import org.apache.spark.sql.delta.util.TimestampFormatter import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema} +import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, MetadataUpdate, PartitionSpec, Schema => IcebergSchema} import shadedForDelta.org.apache.iceberg.Metrics import shadedForDelta.org.apache.iceberg.StructLike import shadedForDelta.org.apache.iceberg.catalog.{Namespace, TableIdentifier => IcebergTableIdentifier} @@ -230,10 +230,13 @@ object IcebergTransactionUtils * @param conf: Hadoop Configuration * @return */ - def createHiveCatalog(conf : Configuration) : HiveCatalog = { + def createHiveCatalog( + conf: Configuration, + metadataUpdates: java.util.ArrayList[MetadataUpdate] + = new java.util.ArrayList[MetadataUpdate]()) : HiveCatalog = { val catalog = new HiveCatalog() catalog.setConf(conf) - catalog.initialize("spark_catalog", Map.empty[String, String].asJava) + catalog.initialize("spark_catalog", Map.empty[String, String].asJava, metadataUpdates) catalog } diff --git a/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala b/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala index 08871cf8d70..c713f00e603 100644 --- a/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala +++ b/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala @@ -100,8 +100,7 @@ class ConvertToIcebergSuite extends QueryTest with Eventually { testTable = testTable.copy(properties = Map.empty) resultTable = UniversalFormat.enforceSupportInCatalog(testTable, testMetadata) - assert(resultTable.nonEmpty) - assert(resultTable.get.properties("table_type") == "iceberg") + assert(resultTable.isEmpty) } test("basic test - managed table created with SQL") { diff --git a/icebergShaded/generate_iceberg_jars.py b/icebergShaded/generate_iceberg_jars.py deleted file mode 100644 index 4b3302962d5..00000000000 --- a/icebergShaded/generate_iceberg_jars.py +++ /dev/null @@ -1,213 +0,0 @@ -#!/usr/bin/env python3 - -# -# Copyright (2021) The Delta Lake Project Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import argparse -import os -import glob -import re -import subprocess -import shlex -import shutil -from os import path - -iceberg_lib_dir_name = "lib" -iceberg_src_dir_name = "iceberg_src" # this is a git dir -iceberg_patches_dir_name = "iceberg_src_patches" - -iceberg_src_commit_hash = "ede085d0f7529f24acd0c81dd0a43f7bb969b763" -iceberg_src_branch = "main" # only this branch will be downloaded - -# Relative to iceberg_src directory. -# We use * because after applying the patches, a random git hash will be appended to each jar name. -# This, for all usages below, we must search for these jar files using `glob.glob(pattern)` -iceberg_src_compiled_jar_rel_glob_patterns = [ - "bundled-guava/build/libs/iceberg-bundled-guava-*.jar", - "common/build/libs/iceberg-common-*.jar", - "api/build/libs/iceberg-api-*.jar", - "core/build/libs/iceberg-core-*.jar", - "parquet/build/libs/iceberg-parquet-*.jar", - "hive-metastore/build/libs/iceberg-hive-*.jar", - "data/build/libs/iceberg-data-*.jar" -] - -iceberg_root_dir = path.abspath(path.dirname(__file__)) # this is NOT a git dir -iceberg_src_dir = path.join(iceberg_root_dir, iceberg_src_dir_name) -iceberg_patches_dir = path.join(iceberg_root_dir, iceberg_patches_dir_name) -iceberg_lib_dir = path.join(iceberg_root_dir, iceberg_lib_dir_name) - - -def iceberg_jars_exists(): - for compiled_jar_rel_glob_pattern in iceberg_src_compiled_jar_rel_glob_patterns: - jar_file_name_pattern = path.basename(path.normpath(compiled_jar_rel_glob_pattern)) - lib_jar_abs_pattern = path.join(iceberg_lib_dir, jar_file_name_pattern) - results = glob.glob(lib_jar_abs_pattern) - - if len(results) > 1: - raise Exception("More jars than expected: " + str(results)) - - if len(results) == 0: - return False - - return True - - -def add_google_maven_repo_to_gradle_config(): - with WorkingDirectory(iceberg_src_dir): - file_path = 'build.gradle' - - with open(file_path, 'r') as file: - content = file.read() - - # Define the old and new configurations - old_config = r'repositories {\n' - - new_config = 'repositories {\n maven {\n ' + \ - 'url "https://maven-central.storage-download.googleapis.com/maven2"\n }\n' - - # Replace the old configuration with the new one - updated_content = re.sub(old_config, new_config, content, flags=re.DOTALL) - - # Write the updated content back to the file - with open(file_path, 'w') as file: - file.write(updated_content) - - -def prepare_iceberg_source(): - with WorkingDirectory(iceberg_root_dir): - print(">>> Cloning Iceberg repo") - shutil.rmtree(iceberg_src_dir_name, ignore_errors=True) - - # We just want the shallowest, smallest iceberg clone. We will check out the commit later. - run_cmd("git clone --depth 1 --branch %s https://github.com/apache/iceberg.git %s" % - (iceberg_src_branch, iceberg_src_dir_name)) - - with WorkingDirectory(iceberg_src_dir): - run_cmd("git config user.email \"<>\"") - run_cmd("git config user.name \"Anonymous\"") - - # Fetch just the single commit (shallow) - run_cmd("git fetch origin %s --depth 1" % iceberg_src_commit_hash) - run_cmd("git checkout %s" % iceberg_src_commit_hash) - - print(">>> Applying patch files") - patch_files = glob.glob(path.join(iceberg_patches_dir, "*.patch")) - patch_files.sort() - - for patch_file in patch_files: - print(">>> Applying '%s'" % patch_file) - run_cmd("git apply %s" % patch_file) - run_cmd("git add .") - run_cmd("git commit -a -m 'applied %s'" % path.basename(patch_file)) - - add_google_maven_repo_to_gradle_config() - - -def generate_iceberg_jars(): - print(">>> Compiling JARs") - with WorkingDirectory(iceberg_src_dir): - # disable style checks (can fail with patches) and tests - build_args = "-x spotlessCheck -x checkstyleMain -x test -x integrationTest -x javadoc" - run_cmd("./gradlew :iceberg-core:build %s" % build_args) - run_cmd("./gradlew :iceberg-parquet:build %s" % build_args) - run_cmd("./gradlew :iceberg-hive-metastore:build %s" % build_args) - run_cmd("./gradlew :iceberg-data:build %s" % build_args) - - print(">>> Copying JARs to lib directory") - shutil.rmtree(iceberg_lib_dir, ignore_errors=True) - os.mkdir(iceberg_lib_dir) - - # For each relative pattern p ... - for compiled_jar_rel_glob_pattern in iceberg_src_compiled_jar_rel_glob_patterns: - # Get the absolute pattern - compiled_jar_abs_pattern = path.join(iceberg_src_dir, compiled_jar_rel_glob_pattern) - # Search for all glob results - results = glob.glob(compiled_jar_abs_pattern) - # Compiled jars will include tests, sources, javadocs; exclude them - results = list(filter(lambda result: all(x not in result for x in ["tests.jar", "sources.jar", "javadoc.jar"]), results)) - - if len(results) == 0: - raise Exception("Could not find the jar: " + compled_jar_rel_glob_pattern) - if len(results) > 1: - raise Exception("More jars created than expected: " + str(results)) - - # Copy the one jar result into the /lib directory - compiled_jar_abs_path = results[0] - compiled_jar_name = path.basename(path.normpath(compiled_jar_abs_path)) - lib_jar_abs_path = path.join(iceberg_lib_dir, compiled_jar_name) - shutil.copyfile(compiled_jar_abs_path, lib_jar_abs_path) - - if not iceberg_jars_exists(): - raise Exception("JAR copying failed") - - -def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs): - if isinstance(cmd, str): - cmd = shlex.split(cmd) - cmd_env = os.environ.copy() - if env: - cmd_env.update(env) - - if stream_output: - child = subprocess.Popen(cmd, env=cmd_env, **kwargs) - exit_code = child.wait() - if throw_on_error and exit_code != 0: - raise Exception("Non-zero exitcode: %s" % (exit_code)) - print("----\n") - return exit_code - else: - child = subprocess.Popen( - cmd, - env=cmd_env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs) - (stdout, stderr) = child.communicate() - exit_code = child.wait() - if throw_on_error and exit_code != 0: - raise Exception( - "Non-zero exitcode: %s\n\nSTDOUT:\n%s\n\nSTDERR:%s" % - (exit_code, stdout, stderr)) - return (exit_code, stdout, stderr) - - -# pylint: disable=too-few-public-methods -class WorkingDirectory(object): - def __init__(self, working_directory): - self.working_directory = working_directory - self.old_workdir = os.getcwd() - - def __enter__(self): - os.chdir(self.working_directory) - - def __exit__(self, tpe, value, traceback): - os.chdir(self.old_workdir) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "--force", - required=False, - default=False, - action="store_true", - help="Force the generation even if already generated, useful for testing.") - args = parser.parse_args() - - if args.force or not iceberg_jars_exists(): - prepare_iceberg_source() - generate_iceberg_jars() diff --git a/icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch b/icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch deleted file mode 100644 index 8be6a077109..00000000000 --- a/icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch +++ /dev/null @@ -1,186 +0,0 @@ -Creates a new `SetSchema` pending update that will let us set the latest iceberg schema instead of having to apply incremental/delta changes to the existing schema. - -This PR requires that column mapping ID mode be enabled, and uses the same fieldId on the iceberg schema using the delta schema columnIds. - -This PR also blocks MapType or ArrayType (on the iceberg side). Doing so requires more complicated fieldId calculation, which is out of scope of this PR and of the first milestone. TLDR Delta Map and Array types have their inner elements as DataTypes, but iceberg Map and List types have their inner elements as actual fields (which need a field ID). So even though delta column mapping ID mode will assign IDs to each delta field, this is insufficient as it won't assign IDs for these maps/array types. - ---- - .../java/org/apache/iceberg/SetSchema.java | 25 ++ - .../java/org/apache/iceberg/Transaction.java | 7 + - .../org/apache/iceberg/BaseTransaction.java | 8 + - .../iceberg/CommitCallbackTransaction.java | 5 + - .../org/apache/iceberg/SetSchemaImpl.java | 45 ++++ - .../org/apache/iceberg/TableMetadata.java | 14 +- - .../IcebergConversionTransaction.scala | 232 +++++++++--------- - .../tahoe/iceberg/IcebergSchemaUtils.scala | 55 +++-- - .../iceberg/IcebergTransactionUtils.scala | 16 +- - .../IcebergConversionTransactionSuite.scala | 224 ++++++++++++++++- - .../tahoe/iceberg/IcebergConverterSuite.scala | 3 +- - .../iceberg/IcebergSchemaUtilsSuite.scala | 200 ++++++++------- - .../IcebergTransactionUtilsSuite.scala | 25 +- - 13 files changed, 595 insertions(+), 264 deletions(-) - create mode 100644 api/src/main/java/org/apache/iceberg/SetSchema.java - create mode 100644 core/src/main/java/org/apache/iceberg/SetSchemaImpl.java - -diff --git a/api/src/main/java/org/apache/iceberg/SetSchema.java b/connector/iceberg-core/api/src/main/java/org/apache/iceberg/SetSchema.java -new file mode 100644 -index 00000000000..042a594ae5b ---- /dev/null -+++ b/api/src/main/java/org/apache/iceberg/SetSchema.java -@@ -0,0 +1,25 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one -+ * or more contributor license agreements. See the NOTICE file -+ * distributed with this work for additional information -+ * regarding copyright ownership. The ASF licenses this file -+ * to you under the Apache License, Version 2.0 (the -+ * "License"); you may not use this file except in compliance -+ * with the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, -+ * software distributed under the License is distributed on an -+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -+ * KIND, either express or implied. See the License for the -+ * specific language governing permissions and limitations -+ * under the License. -+ */ -+ -+package org.apache.iceberg; -+ -+/** -+ * API to set the new, latest Iceberg schema. -+ */ -+public interface SetSchema extends PendingUpdate { } -diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/connector/iceberg-core/api/src/main/java/org/apache/iceberg/Transaction.java -index 090b5dfe37c..3879c9a9146 100644 ---- a/api/src/main/java/org/apache/iceberg/Transaction.java -+++ b/api/src/main/java/org/apache/iceberg/Transaction.java -@@ -37,6 +37,13 @@ public interface Transaction { - */ - UpdateSchema updateSchema(); - -+ /** -+ * Create a new {@link SetSchema} to set the new table schema. -+ * -+ * @return a new {@link SetSchema} -+ */ -+ SetSchema setSchema(Schema newSchema); -+ - /** - * Create a new {@link UpdatePartitionSpec} to alter the partition spec of this table. - * -diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/BaseTransaction.java -index 241738fedab..e299d04ebbd 100644 ---- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java -+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java -@@ -113,6 +113,14 @@ public class BaseTransaction implements Transaction { - return schemaChange; - } - -+ @Override -+ public SetSchema setSchema(Schema newSchema) { -+ checkLastOperationCommitted("SetSchema"); -+ SetSchema setSchema = new SetSchemaImpl(transactionOps, transactionOps.current(), newSchema); -+ updates.add(setSchema); -+ return setSchema; -+ } -+ - @Override - public UpdatePartitionSpec updateSpec() { - checkLastOperationCommitted("UpdateSpec"); -diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java -index 19b74a65eca..6a2d7614a82 100644 ---- a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java -+++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java -@@ -41,6 +41,11 @@ class CommitCallbackTransaction implements Transaction { - return wrapped.updateSchema(); - } - -+ @Override -+ public SetSchema setSchema(Schema newSchema) { -+ return wrapped.setSchema(newSchema); -+ } -+ - @Override - public UpdatePartitionSpec updateSpec() { - return wrapped.updateSpec(); -diff --git a/core/src/main/java/org/apache/iceberg/SetSchemaImpl.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/SetSchemaImpl.java -new file mode 100644 -index 00000000000..ce6731a4e13 ---- /dev/null -+++ b/core/src/main/java/org/apache/iceberg/SetSchemaImpl.java -@@ -0,0 +1,45 @@ -+/* -+ * Copyright (2021) The Delta Lake Project Authors. -+ * -+ * Licensed under the Apache License, Version 2.0 (the "License"); -+ * you may not use this file except in compliance with the License. -+ * You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+ -+ -+ -+package org.apache.iceberg; -+ -+public class SetSchemaImpl implements SetSchema { -+ -+ private final TableOperations ops; -+ private final TableMetadata base; -+ private final Schema newSchema; -+ -+ public SetSchemaImpl(TableOperations ops, TableMetadata base, Schema newSchema) { -+ this.ops = ops; -+ this.base = base; -+ this.newSchema = newSchema; -+ } -+ -+ @Override -+ public Schema apply() { -+ return newSchema; -+ } -+ -+ @Override -+ public void commit() { -+ // This will override the current schema -+ TableMetadata update = base.updateSchema(apply(), newSchema.highestFieldId()); -+ ops.commit(base, update); -+ } -+} -diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/TableMetadata.java -index afa2c7ac2d5..52546f02a75 100644 ---- a/core/src/main/java/org/apache/iceberg/TableMetadata.java -+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java -@@ -1339,11 +1339,15 @@ public class TableMetadata implements Serializable { - } - - private int addSchemaInternal(Schema schema, int newLastColumnId) { -- Preconditions.checkArgument( -- newLastColumnId >= lastColumnId, -- "Invalid last column ID: %s < %s (previous last column ID)", -- newLastColumnId, -- lastColumnId); -+ // Since we use txn.setSchema instead of txn.updateSchema, we are manually setting the new -+ // schema. Thus, if we drop the last column, it is clearly possible and valid for the -+ // newLastColumnId to be < the previous lastColumnId. Thus, we ignore this check. -+ // -+ // Preconditions.checkArgument( -+ // newLastColumnId >= lastColumnId, -+ // "Invalid last column ID: %s < %s (previous last column ID)", -+ // newLastColumnId, -+ // lastColumnId); - - int newSchemaId = reuseOrCreateNewSchemaId(schema); - boolean schemaFound = schemasById.containsKey(newSchemaId); --- -2.39.2 (Apple Git-143) diff --git a/icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch b/icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch deleted file mode 100644 index 23386853c2d..00000000000 --- a/icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch +++ /dev/null @@ -1,45 +0,0 @@ -HiveTableOperations should have its catalog operations compatible with Delta - -This patch prevent Iceberg HiveTableOperations to overwrite catalog table properties used by Delta. It also writes a dummy schema to metastore to be aligned with Delta's behavior. ---- -Index: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java -=================================================================== -diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java ---- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java (revision ede085d0f7529f24acd0c81dd0a43f7bb969b763) -+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java (revision 4470b919dd6a97b0f6d6b7d57d1d57348a40c025) -@@ -43,6 +43,7 @@ - import org.apache.hadoop.hive.metastore.IMetaStoreClient; - import org.apache.hadoop.hive.metastore.TableType; - import org.apache.hadoop.hive.metastore.api.InvalidObjectException; -+import org.apache.hadoop.hive.metastore.api.FieldSchema; - import org.apache.hadoop.hive.metastore.api.LockComponent; - import org.apache.hadoop.hive.metastore.api.LockLevel; - import org.apache.hadoop.hive.metastore.api.LockRequest; -@@ -286,7 +287,9 @@ - LOG.debug("Committing new table: {}", fullName); - } - -- tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes -+ StorageDescriptor newsd = storageDescriptor(metadata, hiveEngineEnabled); -+ newsd.getSerdeInfo().setParameters(tbl.getSd().getSerdeInfo().getParameters()); -+ tbl.setSd(newsd); // set to pickup any schema changes - - String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); - String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; -@@ -393,6 +396,7 @@ - @VisibleForTesting - void persistTable(Table hmsTable, boolean updateHiveTable) - throws TException, InterruptedException { -+ hmsTable.getSd().setCols(Collections.singletonList(new FieldSchema("col", "array", ""))); - if (updateHiveTable) { - metaClients.run( - client -> { -@@ -468,7 +472,7 @@ - } - - // remove any props from HMS that are no longer present in Iceberg table props -- obsoleteProps.forEach(parameters::remove); -+ // obsoleteProps.forEach(parameters::remove); - - parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); - parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); diff --git a/icebergShaded/iceberg_src_patches/0005-iceberg-takes-updated-source-column-field-id.patch b/icebergShaded/iceberg_src_patches/0005-iceberg-takes-updated-source-column-field-id.patch deleted file mode 100644 index 21b438bdd90..00000000000 --- a/icebergShaded/iceberg_src_patches/0005-iceberg-takes-updated-source-column-field-id.patch +++ /dev/null @@ -1,40 +0,0 @@ -Iceberg PartitionField takes source column field id from latest schema if changed - ---- a/core/src/main/java/org/apache/iceberg/TableMetadata.java -+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java -@@ -664,13 +664,22 @@ public class TableMetadata implements Serializable { - return new Builder(this).upgradeFormatVersion(newFormatVersion).build(); - } - -- private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) { -+ private static PartitionSpec updateSpecSchema(Schema newSchema, Schema currSchema, PartitionSpec partitionSpec) { - PartitionSpec.Builder specBuilder = -- PartitionSpec.builderFor(schema).withSpecId(partitionSpec.specId()); -+ PartitionSpec.builderFor(newSchema).withSpecId(partitionSpec.specId()); - -- // add all the fields to the builder. IDs should not change. -+ // add all the fields to the builder. IDs may change so it looks up the source field id by -+ // name from the new schema - for (PartitionField field : partitionSpec.fields()) { -- specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform()); -+ String partFieldSourceName = currSchema.findField(field.sourceId()).name(); -+ int partFieldSourceInt; -+ org.apache.iceberg.types.Types.NestedField partSourceFieldInNewSchema = newSchema.findField(partFieldSourceName); -+ if (partSourceFieldInNewSchema == null) { -+ partFieldSourceInt = field.sourceId(); -+ } else { -+ partFieldSourceInt = partSourceFieldInNewSchema.fieldId(); -+ } -+ specBuilder.add(partFieldSourceInt, field.fieldId(), field.name(), field.transform()); - } - - // build without validation because the schema may have changed in a way that makes this spec -@@ -970,7 +979,7 @@ public class TableMetadata implements Serializable { - - // rebuild all the partition specs and sort orders for the new current schema - this.specs = -- Lists.newArrayList(Iterables.transform(specs, spec -> updateSpecSchema(schema, spec))); -+ Lists.newArrayList(Iterables.transform(specs, spec -> updateSpecSchema(schema, schemasById.get(currentSchemaId), spec))); - specsById.clear(); - specsById.putAll(indexSpecs(specs)); - diff --git a/icebergShaded/src/main/java/org/apache/iceberg/PartitionSpec.java b/icebergShaded/src/main/java/org/apache/iceberg/PartitionSpec.java new file mode 100644 index 00000000000..a8db93430d5 --- /dev/null +++ b/icebergShaded/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; + +/** + * Represents how to produce partition data for a table. + * + *

Partition data is produced by transforming columns in a table. Each column transform is + * represented by a named {@link PartitionField}. + * + * This class is directly copied from iceberg repo; The only change is this sets checkConflicts + * to false by default for partition spec converted from Delta to honor the field id assigned by Delta + */ +public class PartitionSpec implements Serializable { + // IDs for partition fields start at 1000 + private static final int PARTITION_DATA_ID_START = 1000; + + private final Schema schema; + + // this is ordered so that DataFile has a consistent schema + private final int specId; + private final PartitionField[] fields; + private transient volatile ListMultimap fieldsBySourceId = null; + private transient volatile Class[] lazyJavaClasses = null; + private transient volatile StructType lazyPartitionType = null; + private transient volatile StructType lazyRawPartitionType = null; + private transient volatile List fieldList = null; + private final int lastAssignedFieldId; + + private PartitionSpec( + Schema schema, int specId, List fields, int lastAssignedFieldId) { + this.schema = schema; + this.specId = specId; + this.fields = fields.toArray(new PartitionField[0]); + this.lastAssignedFieldId = lastAssignedFieldId; + } + + /** Returns the {@link Schema} for this spec. */ + public Schema schema() { + return schema; + } + + /** Returns the ID of this spec. */ + public int specId() { + return specId; + } + + /** Returns the list of {@link PartitionField partition fields} for this spec. */ + public List fields() { + return lazyFieldList(); + } + + public boolean isPartitioned() { + return fields.length > 0 && fields().stream().anyMatch(f -> !f.transform().isVoid()); + } + + public boolean isUnpartitioned() { + return !isPartitioned(); + } + + int lastAssignedFieldId() { + return lastAssignedFieldId; + } + + public UnboundPartitionSpec toUnbound() { + UnboundPartitionSpec.Builder builder = UnboundPartitionSpec.builder().withSpecId(specId); + + for (PartitionField field : fields) { + builder.addField( + field.transform().toString(), field.sourceId(), field.fieldId(), field.name()); + } + + return builder.build(); + } + + /** + * Returns the {@link PartitionField field} that partitions the given source field + * + * @param fieldId a field id from the source schema + * @return the {@link PartitionField field} that partitions the given source field + */ + public List getFieldsBySourceId(int fieldId) { + return lazyFieldsBySourceId().get(fieldId); + } + + /** Returns a {@link StructType} for partition data defined by this spec. */ + public StructType partitionType() { + if (lazyPartitionType == null) { + synchronized (this) { + if (lazyPartitionType == null) { + List structFields = Lists.newArrayListWithExpectedSize(fields.length); + + for (PartitionField field : fields) { + Type sourceType = schema.findType(field.sourceId()); + Type resultType = field.transform().getResultType(sourceType); + structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType)); + } + + this.lazyPartitionType = Types.StructType.of(structFields); + } + } + } + + return lazyPartitionType; + } + + /** + * Returns a struct matching partition information as written into manifest files. See {@link + * #partitionType()} for a struct with field ID's potentially re-assigned to avoid conflict. + */ + public StructType rawPartitionType() { + if (schema.idsToOriginal().isEmpty()) { + // not re-assigned. + return partitionType(); + } + if (lazyRawPartitionType == null) { + synchronized (this) { + if (lazyRawPartitionType == null) { + this.lazyRawPartitionType = + StructType.of( + partitionType().fields().stream() + .map(f -> f.withFieldId(schema.idsToOriginal().get(f.fieldId()))) + .collect(Collectors.toList())); + } + } + } + + return lazyRawPartitionType; + } + + public Class[] javaClasses() { + if (lazyJavaClasses == null) { + synchronized (this) { + if (lazyJavaClasses == null) { + Class[] classes = new Class[fields.length]; + for (int i = 0; i < fields.length; i += 1) { + PartitionField field = fields[i]; + if (field.transform() instanceof UnknownTransform) { + classes[i] = Object.class; + } else { + Type sourceType = schema.findType(field.sourceId()); + Type result = field.transform().getResultType(sourceType); + classes[i] = result.typeId().javaClass(); + } + } + + this.lazyJavaClasses = classes; + } + } + } + + return lazyJavaClasses; + } + + @SuppressWarnings("unchecked") + private T get(StructLike data, int pos, Class javaClass) { + return data.get(pos, (Class) javaClass); + } + + private String escape(String string) { + try { + return URLEncoder.encode(string, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + public String partitionToPath(StructLike data) { + StringBuilder sb = new StringBuilder(); + Class[] javaClasses = javaClasses(); + List outputFields = partitionType().fields(); + for (int i = 0; i < javaClasses.length; i += 1) { + PartitionField field = fields[i]; + Type type = outputFields.get(i).type(); + String valueString = field.transform().toHumanString(type, get(data, i, javaClasses[i])); + + if (i > 0) { + sb.append("/"); + } + sb.append(escape(field.name())).append("=").append(escape(valueString)); + } + return sb.toString(); + } + + /** + * Returns true if this spec is equivalent to the other, with partition field ids ignored. That + * is, if both specs have the same number of fields, field order, field name, source columns, and + * transforms. + * + * @param other another PartitionSpec + * @return true if the specs have the same fields, source columns, and transforms. + */ + public boolean compatibleWith(PartitionSpec other) { + if (equals(other)) { + return true; + } + + if (fields.length != other.fields.length) { + return false; + } + + for (int i = 0; i < fields.length; i += 1) { + PartitionField thisField = fields[i]; + PartitionField thatField = other.fields[i]; + if (thisField.sourceId() != thatField.sourceId() + || !thisField.transform().toString().equals(thatField.transform().toString()) + || !thisField.name().equals(thatField.name())) { + return false; + } + } + + return true; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (!(other instanceof PartitionSpec)) { + return false; + } + + PartitionSpec that = (PartitionSpec) other; + if (this.specId != that.specId) { + return false; + } + return Arrays.equals(fields, that.fields); + } + + @Override + public int hashCode() { + return 31 * Integer.hashCode(specId) + Arrays.hashCode(fields); + } + + private List lazyFieldList() { + if (fieldList == null) { + synchronized (this) { + if (fieldList == null) { + this.fieldList = ImmutableList.copyOf(fields); + } + } + } + return fieldList; + } + + private ListMultimap lazyFieldsBySourceId() { + if (fieldsBySourceId == null) { + synchronized (this) { + if (fieldsBySourceId == null) { + ListMultimap multiMap = + Multimaps.newListMultimap( + Maps.newHashMap(), () -> Lists.newArrayListWithCapacity(fields.length)); + for (PartitionField field : fields) { + multiMap.put(field.sourceId(), field); + } + this.fieldsBySourceId = multiMap; + } + } + } + + return fieldsBySourceId; + } + + /** + * Returns the source field ids for identity partitions. + * + * @return a set of source ids for the identity partitions. + */ + public Set identitySourceIds() { + Set sourceIds = Sets.newHashSet(); + for (PartitionField field : fields()) { + if ("identity".equals(field.transform().toString())) { + sourceIds.add(field.sourceId()); + } + } + + return sourceIds; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (PartitionField field : fields) { + sb.append("\n"); + sb.append(" ").append(field); + } + if (fields.length > 0) { + sb.append("\n"); + } + sb.append("]"); + return sb.toString(); + } + + private static final PartitionSpec UNPARTITIONED_SPEC = + new PartitionSpec(new Schema(), 0, ImmutableList.of(), unpartitionedLastAssignedId()); + + /** + * Returns a spec for unpartitioned tables. + * + * @return a partition spec with no partitions + */ + public static PartitionSpec unpartitioned() { + return UNPARTITIONED_SPEC; + } + + private static int unpartitionedLastAssignedId() { + return PARTITION_DATA_ID_START - 1; + } + + /** + * Creates a new {@link Builder partition spec builder} for the given {@link Schema}. + * + * @param schema a schema + * @return a partition spec builder for the given schema + */ + public static Builder builderFor(Schema schema) { + return new Builder(schema); + } + + /** + * Used to create valid {@link PartitionSpec partition specs}. + * + *

Call {@link #builderFor(Schema)} to create a new builder. + */ + public static class Builder { + private final Schema schema; + private final List fields = Lists.newArrayList(); + private final Set partitionNames = Sets.newHashSet(); + private final Map, PartitionField> dedupFields = Maps.newHashMap(); + private int specId = 0; + private final AtomicInteger lastAssignedFieldId = + new AtomicInteger(unpartitionedLastAssignedId()); + // check if there are conflicts between partition and schema field name + // HACK HACK: disable checkConflicts for partition spec converted from Delta + // to honor the field id assigned by Delta + private boolean checkConflicts = false; + private boolean caseSensitive = true; + + private Builder(Schema schema) { + this.schema = schema; + } + + private int nextFieldId() { + return lastAssignedFieldId.incrementAndGet(); + } + + private void checkAndAddPartitionName(String name) { + checkAndAddPartitionName(name, null); + } + + Builder checkConflicts(boolean check) { + checkConflicts = check; + return this; + } + + private void checkAndAddPartitionName(String name, Integer sourceColumnId) { + Types.NestedField schemaField = + this.caseSensitive ? schema.findField(name) : schema.caseInsensitiveFindField(name); + if (checkConflicts) { + if (sourceColumnId != null) { + // for identity transform case we allow conflicts between partition and schema field name + // as + // long as they are sourced from the same schema field + Preconditions.checkArgument( + schemaField == null || schemaField.fieldId() == sourceColumnId, + "Cannot create identity partition sourced from different field in schema: %s", + name); + } else { + // for all other transforms we don't allow conflicts between partition name and schema + // field name + Preconditions.checkArgument( + schemaField == null, + "Cannot create partition from name that exists in schema: %s", + name); + } + } + Preconditions.checkArgument(!name.isEmpty(), "Cannot use empty partition name: %s", name); + Preconditions.checkArgument( + !partitionNames.contains(name), "Cannot use partition name more than once: %s", name); + partitionNames.add(name); + } + + private void checkForRedundantPartitions(PartitionField field) { + Map.Entry dedupKey = + new AbstractMap.SimpleEntry<>(field.sourceId(), field.transform().dedupName()); + PartitionField partitionField = dedupFields.get(dedupKey); + Preconditions.checkArgument( + partitionField == null, + "Cannot add redundant partition: %s conflicts with %s", + partitionField, + field); + dedupFields.put(dedupKey, field); + } + + public Builder caseSensitive(boolean sensitive) { + this.caseSensitive = sensitive; + return this; + } + + public Builder withSpecId(int newSpecId) { + this.specId = newSpecId; + return this; + } + + private Types.NestedField findSourceColumn(String sourceName) { + Types.NestedField sourceColumn = + this.caseSensitive + ? schema.findField(sourceName) + : schema.caseInsensitiveFindField(sourceName); + Preconditions.checkArgument( + sourceColumn != null, "Cannot find source column: %s", sourceName); + return sourceColumn; + } + + Builder identity(String sourceName, String targetName) { + return identity(findSourceColumn(sourceName), targetName); + } + + private Builder identity(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName, sourceColumn.fieldId()); + PartitionField field = + new PartitionField( + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.identity()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder identity(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + return identity(sourceColumn, schema.findColumnName(sourceColumn.fieldId())); + } + + public Builder year(String sourceName, String targetName) { + return year(findSourceColumn(sourceName), targetName); + } + + private Builder year(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName); + PartitionField field = + new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.year()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder year(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return year(sourceColumn, columnName + "_year"); + } + + public Builder month(String sourceName, String targetName) { + return month(findSourceColumn(sourceName), targetName); + } + + private Builder month(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName); + PartitionField field = + new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.month()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder month(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return month(sourceColumn, columnName + "_month"); + } + + public Builder day(String sourceName, String targetName) { + return day(findSourceColumn(sourceName), targetName); + } + + private Builder day(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName); + PartitionField field = + new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.day()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder day(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return day(sourceColumn, columnName + "_day"); + } + + public Builder hour(String sourceName, String targetName) { + return hour(findSourceColumn(sourceName), targetName); + } + + private Builder hour(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName); + PartitionField field = + new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.hour()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder hour(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return hour(sourceColumn, columnName + "_hour"); + } + + public Builder bucket(String sourceName, int numBuckets, String targetName) { + return bucket(findSourceColumn(sourceName), numBuckets, targetName); + } + + private Builder bucket(Types.NestedField sourceColumn, int numBuckets, String targetName) { + checkAndAddPartitionName(targetName); + fields.add( + new PartitionField( + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.bucket(numBuckets))); + return this; + } + + public Builder bucket(String sourceName, int numBuckets) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return bucket(sourceColumn, numBuckets, columnName + "_bucket"); + } + + public Builder truncate(String sourceName, int width, String targetName) { + return truncate(findSourceColumn(sourceName), width, targetName); + } + + private Builder truncate(Types.NestedField sourceColumn, int width, String targetName) { + checkAndAddPartitionName(targetName); + fields.add( + new PartitionField( + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.truncate(width))); + return this; + } + + public Builder truncate(String sourceName, int width) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return truncate(sourceColumn, width, columnName + "_trunc"); + } + + public Builder alwaysNull(String sourceName, String targetName) { + return alwaysNull(findSourceColumn(sourceName), targetName); + } + + private Builder alwaysNull(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName( + targetName, sourceColumn.fieldId()); // can duplicate a source column name + fields.add( + new PartitionField( + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.alwaysNull())); + return this; + } + + public Builder alwaysNull(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return alwaysNull(sourceColumn, columnName + "_null"); + } + + // add a partition field with an auto-increment partition field id starting from + // PARTITION_DATA_ID_START + Builder add(int sourceId, String name, Transform transform) { + return add(sourceId, nextFieldId(), name, transform); + } + + Builder add(int sourceId, int fieldId, String name, Transform transform) { + checkAndAddPartitionName(name, sourceId); + fields.add(new PartitionField(sourceId, fieldId, name, transform)); + lastAssignedFieldId.getAndAccumulate(fieldId, Math::max); + return this; + } + + public PartitionSpec build() { + PartitionSpec spec = buildUnchecked(); + checkCompatibility(spec, schema); + return spec; + } + + PartitionSpec buildUnchecked() { + return new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get()); + } + } + + static void checkCompatibility(PartitionSpec spec, Schema schema) { + for (PartitionField field : spec.fields) { + Type sourceType = schema.findType(field.sourceId()); + Transform transform = field.transform(); + // In the case of a Version 1 partition-spec field gets deleted, + // it is replaced with a void transform, see: + // https://iceberg.apache.org/spec/#partition-transforms + // We don't care about the source type since a VoidTransform is always compatible and skip the + // checks + if (!transform.equals(Transforms.alwaysNull())) { + ValidationException.check( + sourceType != null, "Cannot find source column for partition field: %s", field); + ValidationException.check( + sourceType.isPrimitiveType(), + "Cannot partition by non-primitive source field: %s", + sourceType); + ValidationException.check( + transform.canTransform(sourceType), + "Invalid source type %s for transform: %s", + sourceType, + transform); + } + } + } + + static boolean hasSequentialIds(PartitionSpec spec) { + for (int i = 0; i < spec.fields.length; i += 1) { + if (spec.fields[i].fieldId() != PARTITION_DATA_ID_START + i) { + return false; + } + } + return true; + } +} + diff --git a/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveCatalog.java new file mode 100644 index 00000000000..e627b281445 --- /dev/null +++ b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -0,0 +1,916 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.*; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewBuilder; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewOperations; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is directly copied from iceberg 1.8.0; The only change made is + * 1. + * accept metadataUpdates in constructor and pass to HiveTableOperations + * to support using schema/partitionSpec with field ids assigned by Delta lake + * 2. + * Validate metadataLocation for validating table as Iceberg in tableExists! + */ +public class HiveCatalog extends BaseMetastoreViewCatalog + implements SupportsNamespaces, Configurable { + public static final String LIST_ALL_TABLES = "list-all-tables"; + public static final String LIST_ALL_TABLES_DEFAULT = "false"; + + public static final String HMS_TABLE_OWNER = "hive.metastore.table.owner"; + public static final String HMS_DB_OWNER = "hive.metastore.database.owner"; + public static final String HMS_DB_OWNER_TYPE = "hive.metastore.database.owner-type"; + + // MetastoreConf is not available with current Hive version + static final String HIVE_CONF_CATALOG = "metastore.catalog.default"; + + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + + private String name; + private Configuration conf; + private FileIO fileIO; + private ClientPool clients; + private boolean listAllTables = false; + private Map catalogProperties; + + private List metadataUpdates = new ArrayList(); + + public HiveCatalog() {} + + public void initialize(String inputName, Map properties, List metadataUpdates) { + initialize(inputName, properties); + this.metadataUpdates = metadataUpdates; + } + + @Override + public void initialize(String inputName, Map properties) { + this.catalogProperties = ImmutableMap.copyOf(properties); + this.name = inputName; + if (conf == null) { + LOG.warn("No Hadoop Configuration was set, using the default environment Configuration"); + this.conf = new Configuration(); + } + + if (properties.containsKey(CatalogProperties.URI)) { + this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get(CatalogProperties.URI)); + } + + if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) { + this.conf.set( + HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION))); + } + + this.listAllTables = + Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT)); + + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + this.fileIO = + fileIOImpl == null + ? new HadoopFileIO(conf) + : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); + + this.clients = new CachedClientPool(conf, properties); + } + + @Override + public TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + return new ViewAwareTableBuilder(identifier, schema); + } + + @Override + public ViewBuilder buildView(TableIdentifier identifier) { + return new TableAwareViewBuilder(identifier); + } + + @Override + public List listTables(Namespace namespace) { + Preconditions.checkArgument( + isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); + String database = namespace.level(0); + + try { + List tableNames = clients.run(client -> client.getAllTables(database)); + List tableIdentifiers; + + if (listAllTables) { + tableIdentifiers = + tableNames.stream() + .map(t -> TableIdentifier.of(namespace, t)) + .collect(Collectors.toList()); + } else { + tableIdentifiers = + listIcebergTables( + tableNames, namespace, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE); + } + + LOG.debug( + "Listing of namespace: {} resulted in the following tables: {}", + namespace, + tableIdentifiers); + return tableIdentifiers; + + } catch (UnknownDBException e) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException("Failed to list all tables under namespace " + namespace, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listTables", e); + } + } + + @Override + public List listViews(Namespace namespace) { + Preconditions.checkArgument( + isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); + + try { + String database = namespace.level(0); + List viewNames = + clients.run(client -> client.getTables(database, "*", TableType.VIRTUAL_VIEW)); + + // Retrieving the Table objects from HMS in batches to avoid OOM + List filteredTableIdentifiers = Lists.newArrayList(); + Iterable> viewNameSets = Iterables.partition(viewNames, 100); + + for (List viewNameSet : viewNameSets) { + filteredTableIdentifiers.addAll( + listIcebergTables(viewNameSet, namespace, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE)); + } + + return filteredTableIdentifiers; + } catch (UnknownDBException e) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException("Failed to list all views under namespace " + namespace, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listViews", e); + } + } + + @Override + public String name() { + return name; + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + if (!isValidIdentifier(identifier)) { + return false; + } + + String database = identifier.namespace().level(0); + + TableOperations ops = newTableOps(identifier); + TableMetadata lastMetadata = null; + if (purge) { + try { + lastMetadata = ops.current(); + } catch (NotFoundException e) { + LOG.warn( + "Failed to load table metadata for table: {}, continuing drop without purge", + identifier, + e); + } + } + + try { + clients.run( + client -> { + client.dropTable( + database, + identifier.name(), + false /* do not delete data */, + false /* throw NoSuchObjectException if the table doesn't exist */); + return null; + }); + + if (purge && lastMetadata != null) { + CatalogUtil.dropTableData(ops.io(), lastMetadata); + } + + LOG.info("Dropped table: {}", identifier); + return true; + + } catch (NoSuchTableException | NoSuchObjectException e) { + LOG.info("Skipping drop, table does not exist: {}", identifier, e); + return false; + + } catch (TException e) { + throw new RuntimeException("Failed to drop " + identifier, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropTable", e); + } + } + + @Override + public boolean dropView(TableIdentifier identifier) { + if (!isValidIdentifier(identifier)) { + return false; + } + + try { + String database = identifier.namespace().level(0); + String viewName = identifier.name(); + + HiveViewOperations ops = (HiveViewOperations) newViewOps(identifier); + ViewMetadata lastViewMetadata = null; + try { + lastViewMetadata = ops.current(); + } catch (NotFoundException e) { + LOG.warn("Failed to load view metadata for view: {}", identifier, e); + } + + clients.run( + client -> { + client.dropTable(database, viewName, false, false); + return null; + }); + + if (lastViewMetadata != null) { + CatalogUtil.dropViewMetadata(ops.io(), lastViewMetadata); + } + + LOG.info("Dropped view: {}", identifier); + return true; + } catch (NoSuchObjectException e) { + LOG.info("Skipping drop, view does not exist: {}", identifier, e); + return false; + } catch (TException e) { + throw new RuntimeException("Failed to drop view " + identifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropView", e); + } + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier originalTo) { + renameTableOrView(from, originalTo, HiveOperationsBase.ContentType.TABLE); + } + + @Override + public void renameView(TableIdentifier from, TableIdentifier to) { + renameTableOrView(from, to, HiveOperationsBase.ContentType.VIEW); + } + + private List listIcebergTables( + List tableNames, Namespace namespace, String tableTypeProp) + throws TException, InterruptedException { + List tableObjects = + clients.run(client -> client.getTableObjectsByName(namespace.level(0), tableNames)); + return tableObjects.stream() + .filter( + table -> + table.getParameters() != null + && tableTypeProp.equalsIgnoreCase( + table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP))) + .map(table -> TableIdentifier.of(namespace, table.getTableName())) + .collect(Collectors.toList()); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private void renameTableOrView( + TableIdentifier from, + TableIdentifier originalTo, + HiveOperationsBase.ContentType contentType) { + Preconditions.checkArgument(isValidIdentifier(from), "Invalid identifier: %s", from); + + TableIdentifier to = removeCatalogName(originalTo); + Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to); + if (!namespaceExists(to.namespace())) { + throw new NoSuchNamespaceException( + "Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace()); + } + + if (tableExists(to)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Cannot rename %s to %s. Table already exists", from, to); + } + + if (viewExists(to)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Cannot rename %s to %s. View already exists", from, to); + } + + String toDatabase = to.namespace().level(0); + String fromDatabase = from.namespace().level(0); + String fromName = from.name(); + + try { + Table table = clients.run(client -> client.getTable(fromDatabase, fromName)); + validateTableIsIcebergTableOrView(contentType, table, CatalogUtil.fullTableName(name, from)); + + table.setDbName(toDatabase); + table.setTableName(to.name()); + + clients.run( + client -> { + MetastoreUtil.alterTable(client, fromDatabase, fromName, table); + return null; + }); + + LOG.info("Renamed {} from {}, to {}", contentType.value(), from, to); + + } catch (NoSuchObjectException e) { + switch (contentType) { + case TABLE: + throw new NoSuchTableException("Cannot rename %s to %s. Table does not exist", from, to); + case VIEW: + throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to); + } + + } catch (InvalidOperationException e) { + if (e.getMessage() != null + && e.getMessage().contains(String.format("new table %s already exists", to))) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table already exists: %s", to); + } else { + throw new RuntimeException("Failed to rename " + from + " to " + to, e); + } + + } catch (TException e) { + throw new RuntimeException("Failed to rename " + from + " to " + to, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to rename", e); + } + } + + private void validateTableIsIcebergTableOrView( + HiveOperationsBase.ContentType contentType, Table table, String fullName) { + switch (contentType) { + case TABLE: + HiveOperationsBase.validateTableIsIceberg(table, fullName); + break; + case VIEW: + HiveOperationsBase.validateTableIsIcebergView(table, fullName); + } + } + + /** + * Check whether table or metadata table exists. + * + *

Note: If a hive table with the same identifier exists in catalog, this method will return + * {@code false}. + * + * @param identifier a table identifier + * @return true if the table exists, false otherwise + */ + @Override + public boolean tableExists(TableIdentifier identifier) { + TableIdentifier baseTableIdentifier = identifier; + if (!isValidIdentifier(identifier)) { + if (!isValidMetadataIdentifier(identifier)) { + return false; + } else { + baseTableIdentifier = TableIdentifier.of(identifier.namespace().levels()); + } + } + + String database = baseTableIdentifier.namespace().level(0); + String tableName = baseTableIdentifier.name(); + try { + Table table = clients.run(client -> client.getTable(database, tableName)); + validateTableIsIceberg(table, fullTableName(name, baseTableIdentifier)); + return true; + } catch (NoSuchTableException | NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new RuntimeException("Failed to check table existence of " + baseTableIdentifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to check table existence of " + baseTableIdentifier, e); + } + } + + private void validateTableIsIceberg(Table table, String fullName) { + HiveOperationsBase.validateTableIsIceberg(table, fullName); + String metadataLocation = table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + NoSuchIcebergTableException.check( + metadataLocation != null, + "Not an iceberg table: %s, metadataLocation is null", + fullName); + } + + @Override + public boolean viewExists(TableIdentifier viewIdentifier) { + if (!isValidIdentifier(viewIdentifier)) { + return false; + } + + String database = viewIdentifier.namespace().level(0); + String viewName = viewIdentifier.name(); + try { + Table table = clients.run(client -> client.getTable(database, viewName)); + HiveOperationsBase.validateTableIsIcebergView(table, fullTableName(name, viewIdentifier)); + return true; + } catch (NoSuchIcebergViewException | NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new RuntimeException("Failed to check view existence of " + viewIdentifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to check view existence of " + viewIdentifier, e); + } + } + + @Override + public void createNamespace(Namespace namespace, Map meta) { + Preconditions.checkArgument( + !namespace.isEmpty(), "Cannot create namespace with invalid name: %s", namespace); + Preconditions.checkArgument( + isValidateNamespace(namespace), + "Cannot support multi part namespace in Hive Metastore: %s", + namespace); + Preconditions.checkArgument( + meta.get(HMS_DB_OWNER_TYPE) == null || meta.get(HMS_DB_OWNER) != null, + "Create namespace setting %s without setting %s is not allowed", + HMS_DB_OWNER_TYPE, + HMS_DB_OWNER); + try { + clients.run( + client -> { + client.createDatabase(convertToDatabase(namespace, meta)); + return null; + }); + + LOG.info("Created namespace: {}", namespace); + + } catch (AlreadyExistsException e) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + e, "Namespace already exists: %s", namespace); + + } catch (TException e) { + throw new RuntimeException( + "Failed to create namespace " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to createDatabase(name) " + namespace + " in Hive Metastore", e); + } + } + + @Override + public List listNamespaces(Namespace namespace) { + if (!isValidateNamespace(namespace) && !namespace.isEmpty()) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + if (!namespace.isEmpty()) { + return ImmutableList.of(); + } + try { + List namespaces = + clients.run(IMetaStoreClient::getAllDatabases).stream() + .map(Namespace::of) + .collect(Collectors.toList()); + + LOG.debug("Listing namespace {} returned tables: {}", namespace, namespaces); + return namespaces; + + } catch (TException e) { + throw new RuntimeException( + "Failed to list all namespace: " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to getAllDatabases() " + namespace + " in Hive Metastore", e); + } + } + + @Override + public boolean dropNamespace(Namespace namespace) { + if (!isValidateNamespace(namespace)) { + return false; + } + + try { + clients.run( + client -> { + client.dropDatabase( + namespace.level(0), + false /* deleteData */, + false /* ignoreUnknownDb */, + false /* cascade */); + return null; + }); + + LOG.info("Dropped namespace: {}", namespace); + return true; + + } catch (InvalidOperationException e) { + throw new NamespaceNotEmptyException( + e, "Namespace %s is not empty. One or more tables exist.", namespace); + + } catch (NoSuchObjectException e) { + return false; + + } catch (TException e) { + throw new RuntimeException("Failed to drop namespace " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to drop dropDatabase(name) " + namespace + " in Hive Metastore", e); + } + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) { + Preconditions.checkArgument( + (properties.get(HMS_DB_OWNER_TYPE) == null) == (properties.get(HMS_DB_OWNER) == null), + "Setting %s and %s has to be performed together or not at all", + HMS_DB_OWNER_TYPE, + HMS_DB_OWNER); + Map parameter = Maps.newHashMap(); + + parameter.putAll(loadNamespaceMetadata(namespace)); + parameter.putAll(properties); + Database database = convertToDatabase(namespace, parameter); + + alterHiveDataBase(namespace, database); + LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace); + + // Always successful, otherwise exception is thrown + return true; + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) { + Preconditions.checkArgument( + properties.contains(HMS_DB_OWNER_TYPE) == properties.contains(HMS_DB_OWNER), + "Removing %s and %s has to be performed together or not at all", + HMS_DB_OWNER_TYPE, + HMS_DB_OWNER); + Map parameter = Maps.newHashMap(); + + parameter.putAll(loadNamespaceMetadata(namespace)); + properties.forEach(key -> parameter.put(key, null)); + Database database = convertToDatabase(namespace, parameter); + + alterHiveDataBase(namespace, database); + LOG.debug("Successfully removed properties {} from {}", properties, namespace); + + // Always successful, otherwise exception is thrown + return true; + } + + private void alterHiveDataBase(Namespace namespace, Database database) { + try { + clients.run( + client -> { + client.alterDatabase(namespace.level(0), database); + return null; + }); + + } catch (NoSuchObjectException | UnknownDBException e) { + throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException( + "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e); + } + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) { + if (!isValidateNamespace(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + + try { + Database database = clients.run(client -> client.getDatabase(namespace.level(0))); + Map metadata = convertToMetadata(database); + LOG.debug("Loaded metadata for namespace {} found {}", namespace, metadata.keySet()); + return metadata; + + } catch (NoSuchObjectException | UnknownDBException e) { + throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException( + "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e); + } + } + + @Override + protected boolean isValidIdentifier(TableIdentifier tableIdentifier) { + return tableIdentifier.namespace().levels().length == 1; + } + + private TableIdentifier removeCatalogName(TableIdentifier to) { + if (isValidIdentifier(to)) { + return to; + } + + // check if the identifier includes the catalog name and remove it + if (to.namespace().levels().length == 2 && name().equalsIgnoreCase(to.namespace().level(0))) { + return TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name()); + } + + // return the original unmodified + return to; + } + + private boolean isValidateNamespace(Namespace namespace) { + return namespace.levels().length == 1; + } + + @Override + public TableOperations newTableOps(TableIdentifier tableIdentifier) { + String dbName = tableIdentifier.namespace().level(0); + String tableName = tableIdentifier.name(); + return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName, metadataUpdates); + } + + @Override + protected ViewOperations newViewOps(TableIdentifier identifier) { + return new HiveViewOperations(conf, clients, fileIO, name, identifier); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + // This is a little edgy since we basically duplicate the HMS location generation logic. + // Sadly I do not see a good way around this if we want to keep the order of events, like: + // - Create meta files + // - Create the metadata in HMS, and this way committing the changes + + // Create a new location based on the namespace / database if it is set on database level + try { + Database databaseData = + clients.run(client -> client.getDatabase(tableIdentifier.namespace().levels()[0])); + if (databaseData.getLocationUri() != null) { + // If the database location is set use it as a base. + return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name()); + } + + } catch (NoSuchObjectException e) { + throw new NoSuchNamespaceException( + e, "Namespace does not exist: %s", tableIdentifier.namespace().levels()[0]); + } catch (TException e) { + throw new RuntimeException( + String.format("Metastore operation failed for %s", tableIdentifier), e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + } + + // Otherwise, stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path + String databaseLocation = databaseLocation(tableIdentifier.namespace().levels()[0]); + return String.format("%s/%s", databaseLocation, tableIdentifier.name()); + } + + private String databaseLocation(String databaseName) { + String warehouseLocation = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname); + Preconditions.checkNotNull( + warehouseLocation, "Warehouse location is not set: hive.metastore.warehouse.dir=null"); + warehouseLocation = LocationUtil.stripTrailingSlash(warehouseLocation); + return String.format("%s/%s.db", warehouseLocation, databaseName); + } + + private Map convertToMetadata(Database database) { + + Map meta = Maps.newHashMap(); + + meta.putAll(database.getParameters()); + meta.put("location", database.getLocationUri()); + if (database.getDescription() != null) { + meta.put("comment", database.getDescription()); + } + if (database.getOwnerName() != null) { + meta.put(HMS_DB_OWNER, database.getOwnerName()); + if (database.getOwnerType() != null) { + meta.put(HMS_DB_OWNER_TYPE, database.getOwnerType().name()); + } + } + + return meta; + } + + Database convertToDatabase(Namespace namespace, Map meta) { + if (!isValidateNamespace(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + + Database database = new Database(); + Map parameter = Maps.newHashMap(); + + database.setName(namespace.level(0)); + database.setLocationUri(databaseLocation(namespace.level(0))); + + meta.forEach( + (key, value) -> { + if (key.equals("comment")) { + database.setDescription(value); + } else if (key.equals("location")) { + database.setLocationUri(value); + } else if (key.equals(HMS_DB_OWNER)) { + database.setOwnerName(value); + } else if (key.equals(HMS_DB_OWNER_TYPE) && value != null) { + database.setOwnerType(PrincipalType.valueOf(value)); + } else { + if (value != null) { + parameter.put(key, value); + } + } + }); + + if (database.getOwnerName() == null) { + database.setOwnerName(HiveHadoopUtil.currentUser()); + database.setOwnerType(PrincipalType.USER); + } + + database.setParameters(parameter); + + return database; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("uri", this.conf == null ? "" : this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname)) + .toString(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = new Configuration(conf); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + protected Map properties() { + return catalogProperties == null ? ImmutableMap.of() : catalogProperties; + } + + @VisibleForTesting + void setListAllTables(boolean listAllTables) { + this.listAllTables = listAllTables; + } + + @VisibleForTesting + ClientPool clientPool() { + return clients; + } + + /** + * The purpose of this class is to add view detection only for Hive-Specific tables. Hive catalog + * follows checks at different levels: 1. During refresh, it validates if the table is an iceberg + * table or not. 2. During commit, it validates if there is any concurrent commit with table or + * table-name already exists. This class helps to do the validation on an early basis. + */ + private class ViewAwareTableBuilder extends BaseMetastoreViewCatalogTableBuilder { + + private final TableIdentifier identifier; + + private ViewAwareTableBuilder(TableIdentifier identifier, Schema schema) { + super(identifier, schema); + this.identifier = identifier; + } + + @Override + public Transaction createOrReplaceTransaction() { + if (viewExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "View with same name already exists: %s", identifier); + } + return super.createOrReplaceTransaction(); + } + + @Override + public org.apache.iceberg.Table create() { + if (viewExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "View with same name already exists: %s", identifier); + } + return super.create(); + } + } + + /** + * The purpose of this class is to add table detection only for Hive-Specific view. Hive catalog + * follows checks at different levels: 1. During refresh, it validates if the view is an iceberg + * view or not. 2. During commit, it validates if there is any concurrent commit with view or + * view-name already exists. This class helps to do the validation on an early basis. + */ + private class TableAwareViewBuilder extends BaseViewBuilder { + + private final TableIdentifier identifier; + + private TableAwareViewBuilder(TableIdentifier identifier) { + super(identifier); + this.identifier = identifier; + } + + @Override + public View createOrReplace() { + if (tableExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table with same name already exists: %s", identifier); + } + return super.createOrReplace(); + } + + @Override + public View create() { + if (tableExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table with same name already exists: %s", identifier); + } + return super.create(); + } + } +} \ No newline at end of file diff --git a/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java new file mode 100644 index 00000000000..1a0710d5a9a --- /dev/null +++ b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import static org.apache.iceberg.TableProperties.GC_ENABLED; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.BaseMetastoreOperations; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.SortOrderParser; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NoSuchIcebergTableException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.hadoop.ConfigProperties; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.BiMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.JsonUtil; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is directly copied from iceberg 1.8.0; The only change made are + * 1) accept metadataUpdates in constructor apply those before writing metadata + * to support using schema/partitionSpec with field ids assigned by Delta lake; + * 2) handle NoSuchIcebergTableException in doRefresh to regard a table entry + * that exists in HMS but does not have "table_type" = "ICEBERG" as table does + * not exist, so Delta lake can correctly start create table transaction + */ +public class HiveTableOperations extends BaseMetastoreTableOperations + implements HiveOperationsBase { + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class); + + private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = + "iceberg.hive.metadata-refresh-max-retries"; + private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2; + private static final BiMap ICEBERG_TO_HMS_TRANSLATION = + ImmutableBiMap.of( + // gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things + // but with different names + GC_ENABLED, "external.table.purge"); + + /** + * Provides key translation where necessary between Iceberg and HMS props. This translation is + * needed because some properties control the same behaviour but are named differently in Iceberg + * and Hive. Therefore changes to these property pairs should be synchronized. + * + *

Example: Deleting data files upon DROP TABLE is enabled using gc.enabled=true in Iceberg and + * external.table.purge=true in Hive. Hive and Iceberg users are unaware of each other's control + * flags, therefore inconsistent behaviour can occur from e.g. a Hive user's point of view if + * external.table.purge=true is set on the HMS table but gc.enabled=false is set on the Iceberg + * table, resulting in no data file deletion. + * + * @param hmsProp The HMS property that should be translated to Iceberg property + * @return Iceberg property equivalent to the hmsProp. If no such translation exists, the original + * hmsProp is returned + */ + public static String translateToIcebergProp(String hmsProp) { + return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp); + } + + private final String fullName; + private final String catalogName; + private final String database; + private final String tableName; + private final Configuration conf; + private final long maxHiveTablePropertySize; + private final int metadataRefreshMaxRetries; + private final FileIO fileIO; + private final ClientPool metaClients; + + private List metadataUpdates = new ArrayList(); + + protected HiveTableOperations( + Configuration conf, + ClientPool metaClients, + FileIO fileIO, + String catalogName, + String database, + String table, + List metadataUpdates) { + this(conf, metaClients, fileIO, catalogName, database, table); + this.metadataUpdates = metadataUpdates; + } + + protected HiveTableOperations( + Configuration conf, + ClientPool metaClients, + FileIO fileIO, + String catalogName, + String database, + String table) { + this.conf = conf; + this.metaClients = metaClients; + this.fileIO = fileIO; + this.fullName = catalogName + "." + database + "." + table; + this.catalogName = catalogName; + this.database = database; + this.tableName = table; + this.metadataRefreshMaxRetries = + conf.getInt( + HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, + HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT); + this.maxHiveTablePropertySize = + conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); + } + + @Override + protected String tableName() { + return fullName; + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected void doRefresh() { + String metadataLocation = null; + try { + Table table = metaClients.run(client -> client.getTable(database, tableName)); + HiveOperationsBase.validateTableIsIceberg(table, fullName); + + metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); + + } catch (NoSuchObjectException e) { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("No such table: %s.%s", database, tableName); + } + } catch (NoSuchIcebergTableException e) { + // NoSuchIcebergTableException is throw when table exists in catalog but not with + // table_type=iceberg; in that case we want to swallow so createTable + // txn can proceed with creating the iceberg table/metadata and set table_type=iceberg + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("No such table: %s.%s", database, tableName); + } + } catch (TException e) { + String errMsg = + String.format("Failed to get table info from metastore %s.%s", database, tableName); + throw new RuntimeException(errMsg, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during refresh", e); + } + + refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); + } + + @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) + @Override + protected void doCommit(TableMetadata base, TableMetadata metadata) { + boolean newTable = base == null; + + // Apply metadata updates so adjustedMetadata has field id and partition spec created + // from Delta lake + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + Schema lastAddedSchema = metadata.schema(); + for (MetadataUpdate update : metadataUpdates) { + if (update instanceof MetadataUpdate.AddSchema) { + MetadataUpdate.AddSchema addSchema = (MetadataUpdate.AddSchema) update; + builder.setCurrentSchema(addSchema.schema(), addSchema.lastColumnId()); + lastAddedSchema = addSchema.schema(); + } else if (update instanceof MetadataUpdate.AddPartitionSpec) { + // regard AddPartitionSpec as replace all existing specs as Delta Uniform only + // support one partition spec + PartitionSpec specToAdd = ((MetadataUpdate.AddPartitionSpec) update).spec().bind(lastAddedSchema); + if (!specToAdd.compatibleWith(metadata.spec())) { + HashSet idsToRemove = new HashSet(); + for (PartitionSpec spec : metadata.specs()) { + idsToRemove.add(spec.specId()); + } + builder.setDefaultPartitionSpec(specToAdd); + MetadataUpdate.RemovePartitionSpecs removeSpecs = new MetadataUpdate.RemovePartitionSpecs(idsToRemove); + removeSpecs.applyTo(builder); + } + } else { + update.applyTo(builder); + } + } + TableMetadata adjustedMetadata = builder.build(); + + String newMetadataLocation = writeNewMetadataIfRequired(newTable, adjustedMetadata); + boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf); + boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false); + + BaseMetastoreOperations.CommitStatus commitStatus = + BaseMetastoreOperations.CommitStatus.FAILURE; + boolean updateHiveTable = false; + + HiveLock lock = lockObject(base); + try { + lock.lock(); + + Table tbl = loadHmsTable(); + + if (tbl != null) { + // If we try to create the table but the metadata location is already set, then we had a + // concurrent commit + if (newTable + && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + != null) { + if (TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType())) { + throw new AlreadyExistsException( + "View with same name already exists: %s.%s", database, tableName); + } + throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); + } + + updateHiveTable = true; + LOG.debug("Committing existing table: {}", fullName); + } else { + tbl = + newHmsTable( + adjustedMetadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); + LOG.debug("Committing new table: {}", fullName); + } + + StorageDescriptor newsd = HiveOperationsBase.storageDescriptor( + adjustedMetadata.schema(), + adjustedMetadata.location(), + hiveEngineEnabled); + // use storage descriptor from Delta + newsd.getSerdeInfo().setParameters(tbl.getSd().getSerdeInfo().getParameters()); + tbl.setSd(newsd); + // set schema to be empty to match Delta behavior + tbl.getSd().setCols(Collections.singletonList(new FieldSchema("col", "array", ""))); + + String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + if (!Objects.equals(baseMetadataLocation, metadataLocation)) { + throw new CommitFailedException( + "Cannot commit: Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", + baseMetadataLocation, metadataLocation, database, tableName); + } + + // get Iceberg props that have been removed + Set removedProps = Collections.emptySet(); + if (base != null) { + removedProps = + base.properties().keySet().stream() + .filter(key -> !adjustedMetadata.properties().containsKey(key)) + .collect(Collectors.toSet()); + } + + Map summary = + Optional.ofNullable(adjustedMetadata.currentSnapshot()) + .map(Snapshot::summary) + .orElseGet(ImmutableMap::of); + setHmsTableParameters( + newMetadataLocation, tbl, adjustedMetadata, removedProps, hiveEngineEnabled, summary); + + if (!keepHiveStats) { + tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); + tbl.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + } + + lock.ensureActive(); + + try { + persistTable( + tbl, updateHiveTable, hiveLockEnabled(base, conf) ? null : baseMetadataLocation); + lock.ensureActive(); + + commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS; + } catch (LockException le) { + commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; + throw new CommitStateUnknownException( + "Failed to heartbeat for hive lock while " + + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. " + + "Please check the commit history. If you are running into this issue, try reducing " + + "iceberg.hive.lock-heartbeat-interval-ms.", + le); + } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { + throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName); + + } catch (InvalidObjectException e) { + throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName); + + } catch (CommitFailedException | CommitStateUnknownException e) { + throw e; + + } catch (Throwable e) { + if (e.getMessage() != null + && e.getMessage() + .contains( + "The table has been modified. The parameter value for key '" + + HiveTableOperations.METADATA_LOCATION_PROP + + "' is")) { + throw new CommitFailedException( + e, "The table %s.%s has been modified concurrently", database, tableName); + } + + if (e.getMessage() != null + && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { + throw new RuntimeException( + "Failed to acquire locks from metastore because the underlying metastore " + + "table 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not " + + "support transactions. To fix this use an alternative metastore.", + e); + } + + LOG.error( + "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database, + tableName, + e); + commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; + commitStatus = + BaseMetastoreOperations.CommitStatus.valueOf( + checkCommitStatus(newMetadataLocation, adjustedMetadata).name()); + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw e; + case UNKNOWN: + throw new CommitStateUnknownException(e); + } + } + } catch (TException e) { + throw new RuntimeException( + String.format("Metastore operation failed for %s.%s", database, tableName), e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + + } catch (LockException e) { + throw new CommitFailedException(e); + + } finally { + HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock); + } + + LOG.info( + "Committed to table {} with the new metadata location {}", fullName, newMetadataLocation); + } + + private void setHmsTableParameters( + String newMetadataLocation, + Table tbl, + TableMetadata metadata, + Set obsoleteProps, + boolean hiveEngineEnabled, + Map summary) { + Map parameters = + Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap); + + // push all Iceberg table properties into HMS + metadata.properties().entrySet().stream() + .filter(entry -> !entry.getKey().equalsIgnoreCase(HiveCatalog.HMS_TABLE_OWNER)) + .forEach( + entry -> { + String key = entry.getKey(); + // translate key names between Iceberg and HMS where needed + String hmsKey = ICEBERG_TO_HMS_TRANSLATION.getOrDefault(key, key); + parameters.put(hmsKey, entry.getValue()); + }); + if (metadata.uuid() != null) { + parameters.put(TableProperties.UUID, metadata.uuid()); + } + + // remove any props from HMS that are no longer present in Iceberg table props + obsoleteProps.forEach(parameters::remove); + + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); + parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); + + if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { + parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); + } + + // If needed set the 'storage_handler' property to enable query from Hive + if (hiveEngineEnabled) { + parameters.put( + hive_metastoreConstants.META_TABLE_STORAGE, + "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"); + } else { + parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE); + } + + // Set the basic statistics + if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) { + parameters.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + } + if (summary.get(SnapshotSummary.TOTAL_RECORDS_PROP) != null) { + parameters.put(StatsSetupConst.ROW_COUNT, summary.get(SnapshotSummary.TOTAL_RECORDS_PROP)); + } + if (summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP) != null) { + parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + } + + setSnapshotStats(metadata, parameters); + setSchema(metadata.schema(), parameters); + setPartitionSpec(metadata, parameters); + setSortOrder(metadata, parameters); + + tbl.setParameters(parameters); + } + + @VisibleForTesting + void setSnapshotStats(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID); + parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP); + parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY); + + Snapshot currentSnapshot = metadata.currentSnapshot(); + if (exposeInHmsProperties() && currentSnapshot != null) { + parameters.put( + TableProperties.CURRENT_SNAPSHOT_ID, String.valueOf(currentSnapshot.snapshotId())); + parameters.put( + TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, + String.valueOf(currentSnapshot.timestampMillis())); + setSnapshotSummary(parameters, currentSnapshot); + } + + parameters.put(TableProperties.SNAPSHOT_COUNT, String.valueOf(metadata.snapshots().size())); + } + + @VisibleForTesting + void setSnapshotSummary(Map parameters, Snapshot currentSnapshot) { + try { + String summary = JsonUtil.mapper().writeValueAsString(currentSnapshot.summary()); + if (summary.length() <= maxHiveTablePropertySize) { + parameters.put(TableProperties.CURRENT_SNAPSHOT_SUMMARY, summary); + } else { + LOG.warn( + "Not exposing the current snapshot({}) summary in HMS since it exceeds {} characters", + currentSnapshot.snapshotId(), + maxHiveTablePropertySize); + } + } catch (JsonProcessingException e) { + LOG.warn( + "Failed to convert current snapshot({}) summary to a json string", + currentSnapshot.snapshotId(), + e); + } + } + + @VisibleForTesting + void setPartitionSpec(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC); + if (exposeInHmsProperties() && metadata.spec() != null && metadata.spec().isPartitioned()) { + String spec = PartitionSpecParser.toJson(metadata.spec()); + setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec); + } + } + + @VisibleForTesting + void setSortOrder(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.DEFAULT_SORT_ORDER); + if (exposeInHmsProperties() + && metadata.sortOrder() != null + && metadata.sortOrder().isSorted()) { + String sortOrder = SortOrderParser.toJson(metadata.sortOrder()); + setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder); + } + } + + @Override + public long maxHiveTablePropertySize() { + return maxHiveTablePropertySize; + } + + @Override + public String database() { + return database; + } + + @Override + public String table() { + return tableName; + } + + @Override + public TableType tableType() { + return TableType.EXTERNAL_TABLE; + } + + @Override + public ClientPool metaClients() { + return metaClients; + } + + /** + * Returns if the hive engine related values should be enabled on the table, or not. + * + *

The decision is made like this: + * + *

    + *
  1. Table property value {@link TableProperties#ENGINE_HIVE_ENABLED} + *
  2. If the table property is not set then check the hive-site.xml property value {@link + * ConfigProperties#ENGINE_HIVE_ENABLED} + *
  3. If none of the above is enabled then use the default value {@link + * TableProperties#ENGINE_HIVE_ENABLED_DEFAULT} + *
+ * + * @param metadata Table metadata to use + * @param conf The hive configuration to use + * @return if the hive engine related values should be enabled or not + */ + private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration conf) { + if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != null) { + // We know that the property is set, so default value will not be used, + return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, false); + } + + return conf.getBoolean( + ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT); + } + + /** + * Returns if the hive locking should be enabled on the table, or not. + * + *

The decision is made like this: + * + *

    + *
  1. Table property value {@link TableProperties#HIVE_LOCK_ENABLED} + *
  2. If the table property is not set then check the hive-site.xml property value {@link + * ConfigProperties#LOCK_HIVE_ENABLED} + *
  3. If none of the above is enabled then use the default value {@link + * TableProperties#HIVE_LOCK_ENABLED_DEFAULT} + *
+ * + * @param metadata Table metadata to use + * @param conf The hive configuration to use + * @return if the hive engine related values should be enabled or not + */ + private static boolean hiveLockEnabled(TableMetadata metadata, Configuration conf) { + if (metadata != null && metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) { + // We know that the property is set, so default value will not be used, + return metadata.propertyAsBoolean(TableProperties.HIVE_LOCK_ENABLED, false); + } + + return conf.getBoolean( + ConfigProperties.LOCK_HIVE_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT); + } + + @VisibleForTesting + HiveLock lockObject(TableMetadata metadata) { + if (hiveLockEnabled(metadata, conf)) { + return new MetastoreLock(conf, metaClients, catalogName, database, tableName); + } else { + return new NoLock(); + } + } +} \ No newline at end of file diff --git a/project/ShadedIcebergBuild.scala b/project/ShadedIcebergBuild.scala new file mode 100644 index 00000000000..a59718ada88 --- /dev/null +++ b/project/ShadedIcebergBuild.scala @@ -0,0 +1,116 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sbt._ +import sbtassembly.* + +object ShadedIcebergBuild { + val icebergExclusionRules = List.apply( + ExclusionRule("com.github.ben-manes.caffeine"), + ExclusionRule("io.netty"), + ExclusionRule("org.apache.httpcomponents.client5"), + ExclusionRule("org.apache.httpcomponents.core5"), + ExclusionRule("io.airlift"), + ExclusionRule("org.apache.commons"), + ExclusionRule("commons-io"), + ExclusionRule("commons-compress"), + ExclusionRule("commons-lang3"), + ExclusionRule("commons-codec"), + ExclusionRule("org.apache.avro"), + ExclusionRule("com.fasterxml.jackson.core"), + ExclusionRule("com.fasterxml.jackson.databind"), + ) + + val hadoopClientExclusionRules = List.apply( + ExclusionRule("org.apache.avro"), + ExclusionRule("org.slf4j"), + ExclusionRule("commons-beanutils"), + ExclusionRule("org.datanucleus"), + ExclusionRule("io.netty") + ) + + val hiveMetastoreExclusionRules = List.apply( + ExclusionRule("org.apache.avro"), + ExclusionRule("org.slf4j"), + ExclusionRule("org.pentaho"), + ExclusionRule("org.apache.hbase"), + ExclusionRule("org.apache.logging.log4j"), + ExclusionRule("co.cask.tephra"), + ExclusionRule("com.google.code.findbugs"), + ExclusionRule("org.eclipse.jetty.aggregate"), + ExclusionRule("org.eclipse.jetty.orbit"), + ExclusionRule("org.apache.parquet"), + ExclusionRule("com.tdunning"), + ExclusionRule("javax.transaction"), + ExclusionRule("com.zaxxer"), + ExclusionRule("org.apache.ant"), + ExclusionRule("javax.servlet"), + ExclusionRule("javax.jdo"), + ExclusionRule("commons-beanutils"), + ExclusionRule("org.datanucleus") + ) + + def updateMergeStrategy(prev: String => MergeStrategy): String => MergeStrategy = { + case PathList("shadedForDelta", "org", "apache", "iceberg", "PartitionSpec$Builder.class") => + MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "PartitionSpec.class") => + MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog.class") => + MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog$1.class") => + MergeStrategy.first + case PathList( + "shadedForDelta", + "org", + "apache", + "iceberg", + "hive", + "HiveCatalog$ViewAwareTableBuilder.class" + ) => + MergeStrategy.first + case PathList( + "shadedForDelta", + "org", + "apache", + "iceberg", + "hive", + "HiveCatalog$TableAwareViewBuilder.class" + ) => + MergeStrategy.first + case PathList( + "shadedForDelta", + "org", + "apache", + "iceberg", + "hive", + "HiveTableOperations.class" + ) => + MergeStrategy.first + case PathList( + "shadedForDelta", + "org", + "apache", + "iceberg", + "hive", + "HiveTableOperations$1.class" + ) => + MergeStrategy.first + case PathList("org", "slf4j", xs @ _*) => + // SLF4J is provided by Spark runtime, exclude from assembly + MergeStrategy.discard + case x => prev(x) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala index 51bba838fde..69a4ebe33e1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala @@ -262,10 +262,8 @@ object UniversalFormat extends DeltaLogging { val ICEBERG_TABLE_TYPE_KEY = "table_type" /** - * Update CatalogTable to mark it readable by other table readers (iceberg for now). - * This method ensures 'table_type' = 'ICEBERG' when uniform is enabled, - * and ensure table_type is not 'ICEBERG' when uniform is not enabled - * If the key has other values than 'ICEBERG', this method will not touch it for compatibility + * HiveTableOperations ensures table_type is 'ICEBERG' when uniform is enabled + * This enforceSupportInCatalog ensure table_type is not 'ICEBERG' when uniform is not enabled * * @param table catalogTable before change * @param metadata snapshot metadata @@ -278,9 +276,6 @@ object UniversalFormat extends DeltaLogging { } (icebergEnabled(metadata), icebergInCatalog) match { - case (true, false) => - Some(table.copy(properties = table.properties - + (ICEBERG_TABLE_TYPE_KEY -> ICEBERG_FORMAT))) case (false, true) => Some(table.copy(properties = table.properties - ICEBERG_TABLE_TYPE_KEY))