diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
index 581a265e38..3d3e5c1917 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
@@ -38,6 +38,9 @@
public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder {
public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value";
+ public static final String ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + "partition.validate.strict.equality";
+ // true, requires equality of the partitions' specId as well as the partitions' fields' fieldId
+ public static final String DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY= "true";
public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) {
super(sourceFs, properties);
@@ -46,7 +49,12 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties)
@Override
protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable,
Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException {
-// TODO: Add Validator for source and destination tables later
+
+ boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
+ DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));
+
+ IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
+ srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);
String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE,
ICEBERG_PARTITION_NAME_KEY);
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
new file mode 100644
index 0000000000..7c47f2dfb8
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Validator for Iceberg table metadata, ensuring that the given tables metadata have same schema and partition spec.
+ */
+@Slf4j
+public class IcebergTableMetadataValidatorUtils {
+
+ private IcebergTableMetadataValidatorUtils() {
+ // Do not instantiate
+ }
+
+ /**
+ * Compares the metadata of the given two iceberg tables.
+ *
+ * - First compares the schema of the metadata.
+ * - Then compares the partition spec of the metadata.
+ *
+ * @param tableMetadataA the metadata of the first table
+ * @param tableMetadataB the metadata of the second table
+ * @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison
+ * @throws IOException if the schemas or partition spec do not match
+ */
+ public static void failUnlessCompatibleStructure(TableMetadata tableMetadataA,
+ TableMetadata tableMetadataB, boolean validateStrictPartitionEquality) throws IOException {
+ log.info("Starting comparison between iceberg tables with metadata file location : {} and {}",
+ tableMetadataA.metadataFileLocation(),
+ tableMetadataB.metadataFileLocation());
+
+ Schema schemaA = tableMetadataA.schema();
+ Schema schemaB = tableMetadataB.schema();
+ // TODO: Need to add support for schema evolution
+ // This check needs to be broken down into multiple checks to support schema evolution
+ // Possible cases - schemaA == schemaB,
+ // - schemaA is subset of schemaB [ schemaB Evolved ],
+ // - schemaA is superset of schemaB [ schemaA Evolved ],
+ // - Other cases?
+ // Also consider using Strategy or any other design pattern for this to make it a better solution
+ if (!schemaA.sameSchema(schemaB)) {
+ String errMsg = String.format(
+ "Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}",
+ tableMetadataA.metadataFileLocation(),
+ schemaA.schemaId(),
+ tableMetadataB.metadataFileLocation(),
+ schemaB.schemaId()
+ );
+ log.error(errMsg);
+ throw new IOException(errMsg);
+ }
+
+ PartitionSpec partitionSpecA = tableMetadataA.spec();
+ PartitionSpec partitionSpecB = tableMetadataB.spec();
+ // .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does
+ boolean partitionSpecMatch = validateStrictPartitionEquality ? partitionSpecA.equals(partitionSpecB)
+ : partitionSpecA.compatibleWith(partitionSpecB);
+ if (!partitionSpecMatch) {
+ String errMsg = String.format(
+ "Partition Spec Mismatch between Metadata{%s} - PartitionSpecId{%d} and Metadata{%s} - PartitionSpecId{%d}",
+ tableMetadataA.metadataFileLocation(),
+ partitionSpecA.specId(),
+ tableMetadataB.metadataFileLocation(),
+ partitionSpecB.specId()
+ );
+ log.error(errMsg);
+ throw new IOException(errMsg);
+ }
+
+ log.info("Comparison completed successfully between iceberg tables with metadata file location : {} and {}",
+ tableMetadataA.metadataFileLocation(),
+ tableMetadataB.metadataFileLocation());
+ }
+}
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
new file mode 100644
index 0000000000..2cbde2df95
--- /dev/null
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class IcebergTableMetadataValidatorUtilsTest {
+ private static final PartitionSpec unpartitionedPartitionSpec = PartitionSpec.unpartitioned();
+ private static final Schema schema1 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1")
+ .fields()
+ .requiredString("field1")
+ .requiredString("field2")
+ .endRecord());
+ private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2")
+ .fields()
+ .requiredString("field2")
+ .requiredString("field1")
+ .endRecord());
+ private static final Schema schema3 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3")
+ .fields()
+ .requiredString("field1")
+ .requiredString("field2")
+ .requiredInt("field3")
+ .endRecord());
+ private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4")
+ .fields()
+ .requiredInt("field1")
+ .requiredString("field2")
+ .requiredInt("field3")
+ .endRecord());
+ private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1)
+ .identity("field1")
+ .build();
+ private static final TableMetadata tableMetadataWithSchema1AndUnpartitionedSpec = TableMetadata.newTableMetadata(
+ schema1, unpartitionedPartitionSpec, "tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>());
+ private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 = TableMetadata.newTableMetadata(
+ schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", new HashMap<>());
+ private static final TableMetadata tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata(
+ schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>());
+ private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata";
+ private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata";
+ private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true;
+ private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = false;
+ @Test
+ public void testValidateSameSchema() throws IOException {
+ IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
+ tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec,
+ VALIDATE_STRICT_PARTITION_EQUALITY_TRUE
+ );
+ Assert.assertTrue(true);
+ }
+
+ @Test
+ public void testValidateDifferentSchemaFails() {
+ // Schema 1 and Schema 2 have different field order
+
+ TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat,
+ unpartitionedPartitionSpec, "tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>());
+
+ verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
+ tableMetadataWithSchema2AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
+ }
+
+ @Test
+ public void testValidateSchemaWithDifferentTypesFails() {
+ // schema 3 and schema 4 have different field types for field1
+
+ TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat,
+ unpartitionedPartitionSpec, "tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>());
+
+ verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
+ tableMetadataWithSchema4AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
+ }
+
+ @Test
+ public void testValidateSchemaWithEvolvedSchemaIFails() {
+ // Schema 3 has one more extra field as compared to Schema 1
+ verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
+ tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
+ }
+
+ @Test
+ public void testValidateSchemaWithEvolvedSchemaIIFails() {
+ // TODO: This test should pass in the future when we support schema evolution
+ // Schema 3 has one more extra field as compared to Schema 1
+ verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
+ tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
+ }
+
+ @Test
+ public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() {
+ // Adding this test as to verify that partition copy doesn't proceed further for this case
+ // as while doing poc and testing had seen final commit gets fail if there is mismatch in field type
+ // specially from int to long
+ Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5")
+ .fields()
+ .requiredLong("field1")
+ .requiredString("field2")
+ .requiredInt("field3")
+ .endRecord());
+ PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4)
+ .identity("field1")
+ .build();
+ TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4,
+ partitionSpec, "tableLocationForSchema5WithPartitionSpec", new HashMap<>());
+
+ verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
+ tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION);
+ }
+
+ @Test
+ public void testValidateSamePartitionSpec() throws IOException {
+ IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
+ tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1,
+ VALIDATE_STRICT_PARTITION_EQUALITY_TRUE
+ );
+ Assert.assertTrue(true);
+ }
+
+ @Test
+ public void testValidatePartitionSpecWithDiffNameFails() {
+ PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1)
+ .identity("field2")
+ .build();
+ TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12,
+ "tableLocationForSchema1WithPartitionSpec12", new HashMap<>());
+ verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1,
+ tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION);
+ }
+
+ @Test
+ public void testValidatePartitionSpecWithUnpartitionedFails() {
+ verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
+ tableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION);
+ }
+
+ @Test
+ public void testPartitionSpecWithDifferentTransformFails() {
+ PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1)
+ .truncate("field1", 4)
+ .build();
+ TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12,
+ "tableLocationForSchema1WithPartitionSpec12", new HashMap<>());
+ verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1,
+ tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION);
+ }
+
+ @Test
+ public void testStrictPartitionSpecEqualityOffVsOn() throws IOException {
+ PartitionSpec partitionSpecWithTwoCols = PartitionSpec.builderFor(schema1)
+ .identity("field1")
+ .identity("field2")
+ .build();
+
+ TableMetadata tableMetadataWithSchema1AndPartitionSpecWithTwoCols = TableMetadata.newTableMetadata(schema1,
+ partitionSpecWithTwoCols, "tableLocationForSchema1WithPartitionSpecWithTwoCols", new HashMap<>());
+ TableMetadata updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1 = tableMetadataWithSchema1AndPartitionSpec1
+ .updatePartitionSpec(tableMetadataWithSchema1AndPartitionSpecWithTwoCols.spec());
+
+ IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
+ tableMetadataWithSchema1AndPartitionSpecWithTwoCols,
+ updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1,
+ VALIDATE_STRICT_PARTITION_EQUALITY_FALSE);
+ Assert.assertTrue(true); // passes w/ non-strict equality...
+ // but fails when strict equality
+ verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpecWithTwoCols,
+ updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION);
+ }
+
+ private void verifyStrictFailUnlessCompatibleStructureThrows(TableMetadata tableAMetadata,
+ TableMetadata tableBMetadata, String expectedMessage) {
+ IOException exception = Assert.expectThrows(IOException.class, () -> {
+ IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata, tableBMetadata,
+ VALIDATE_STRICT_PARTITION_EQUALITY_TRUE);
+ });
+ Assert.assertTrue(exception.getMessage().startsWith(expectedMessage));
+ }
+}