From effbab0a8d5cb2f58e2201574696d7dc16bbf3cb Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Thu, 3 Oct 2024 10:17:16 +0530 Subject: [PATCH 01/13] added iceberg metadata validator --- .../IcebergTableMetadataValidator.java | 94 ++++++++++ .../IcebergTableMetadataValidatorTest.java | 165 ++++++++++++++++++ 2 files changed, 259 insertions(+) create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java create mode 100644 gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java new file mode 100644 index 0000000000..e121a985c4 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.util.Arrays; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; + +import com.google.common.annotations.VisibleForTesting; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class IcebergTableMetadataValidator { + + public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcTableMetadata, TableMetadata destTableMetadata) { + log.info("Starting validation of Source and Destination Iceberg Tables Metadata"); + Schema srcTableSchema = srcTableMetadata.schema(); + Schema destTableSchema = destTableMetadata.schema(); + PartitionSpec srcPartitionSpec = srcTableMetadata.spec(); + PartitionSpec destPartitionSpec = destTableMetadata.spec(); + validateSchema(srcTableSchema, destTableSchema); + validatePartitionSpec(srcPartitionSpec, destPartitionSpec); + log.info("Validation of Source and Destination Iceberg Tables Metadata completed successfully"); + } + + @VisibleForTesting + protected static void validateSchema(Schema srcTableSchema, Schema destTableSchema) { + // TODO: Need to add support for schema evolution, currently only supporting copying + // between iceberg tables with same schema. + // This function needs to be broken down into multiple functions to support schema evolution + // Possible cases - Src Schema == Dest Schema, + // - Src Schema is subset of Dest Schema [ Destination Schema Evolved ], + // - Src Schema is superset of Dest Schema [ Source Schema Evolved ], + // - Other cases? + if (!srcTableSchema.sameSchema(destTableSchema)) { + String errMsg = String.format( + "Schema Mismatch between Source and Destination Iceberg Tables Schema - Source : {%s} and Destination : {%s}", + srcTableSchema, + destTableSchema + ); + log.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + } + + @VisibleForTesting + protected static void validatePartitionSpec(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec) { + // Currently, only supporting copying between iceberg tables with same partition spec + if (!srcPartitionSpec.compatibleWith(destPartitionSpec)) { + String errMsg = String.format( + "Partition Spec Mismatch between Source and Destination Iceberg Tables Partition Spec - Source : {%s} and Destination : {%s}", + srcPartitionSpec, + destPartitionSpec + ); + log.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + // .compatibleWith() does not check if the partition field in partition spec have same java classes or not + // i.e. if the partition field in partition spec is of type Integer in src table and String in dest table, + // so need to put an additional check for that + // try to run test testValidatePartitionSpecWithDiffType() in IcebergTableMetadataValidatorTest.java with + // this check commented out + // TODO: This check can be removed after adding support for schema evolution + if (!Arrays.equals(srcPartitionSpec.javaClasses(), destPartitionSpec.javaClasses())) { + String errMsg = String.format( + "Partition Spec Have different types for same partition field between Source and Destination Iceberg Table - " + + "Source : {%s} and Destination : {%s}", + srcPartitionSpec, + destPartitionSpec + ); + log.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + } + +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java new file mode 100644 index 0000000000..bcea9cb0e3 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class IcebergTableMetadataValidatorTest { + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema1 = + SchemaBuilder.record("schema1") + .fields() + .requiredString("field1") + .requiredString("field2") + .endRecord(); + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema2 = + SchemaBuilder.record("schema2") + .fields() + .requiredString("field2") + .requiredString("field1") + .endRecord(); + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema3 = + SchemaBuilder.record("schema3") + .fields() + .requiredString("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord(); + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema4 = + SchemaBuilder.record("schema3") + .fields() + .requiredInt("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord(); + private static final Schema schema1 = AvroSchemaUtil.toIceberg(avroDataSchema1); + private static final Schema schema2 = AvroSchemaUtil.toIceberg(avroDataSchema2); + private static final Schema schema3 = AvroSchemaUtil.toIceberg(avroDataSchema3); + private static final Schema schema4 = AvroSchemaUtil.toIceberg(avroDataSchema4); + private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1) + .identity("field1") + .build(); + private static final String SHOULD_NOT_THROW_EXCEPTION = "Should not throw any exception"; + private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Source and Destination Iceberg Tables Schema"; + private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Source and Destination Iceberg Tables Partition Spec"; + private static final String PARTITION_SPEC_DIFF_TYPE_EXCEPTION = "Partition Spec Have different types for same partition field between Source and Destination Iceberg Table"; + + @Test + public void testValidateSameSchema() { + try { + IcebergTableMetadataValidator.validateSchema(schema1, schema1); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(SHOULD_NOT_THROW_EXCEPTION); + } + } + + @Test + public void testValidateDifferentSchema() { + // Schema 1 and Schema 2 have different field order + verifyValidateSchemaIllegalArgumentException(schema1, schema2); + } + + @Test + public void testValidateSchemaWithDifferentTypes() { + verifyValidateSchemaIllegalArgumentException(schema3, schema4); + } + + @Test + public void testValidateSchemaWithEvolvedDestinationSchema() { + // TODO: This test should pass in the future when we support source side schema evolution and here there should + // not be any commit needed on destination side + // Schema 3 has one more extra field as compared to Schema 1 + verifyValidateSchemaIllegalArgumentException(schema1, schema3); + } + + @Test + public void testValidateSchemaWithEvolvedSourceSchema() { + // TODO: This test should pass in the future when we support source side schema evolution and commit the changes + // on destination side either through IcebergRegisterStep or any other CommitStep + // Schema 3 has one more extra field as compared to Schema 1 + verifyValidateSchemaIllegalArgumentException(schema3, schema1); + } + + @Test + public void testValidateSamePartitionSpec() { + try { + IcebergTableMetadataValidator.validatePartitionSpec(partitionSpec1, partitionSpec1); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(SHOULD_NOT_THROW_EXCEPTION); + } + } + + @Test + public void testValidatePartitionSpecWithDiffType() { + PartitionSpec partitionSpec4 = PartitionSpec.builderFor(schema4) + .identity("field1") + .build(); + verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec4, PARTITION_SPEC_DIFF_TYPE_EXCEPTION); + } + + @Test + public void testValidatePartitionSpecWithDiffName() { + PartitionSpec partitionSpec2 = PartitionSpec.builderFor(schema2) + .identity("field2") + .build(); + verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec2, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + @Test + public void testValidatePartitionSpecWithDiffNameAndDiffType() { + PartitionSpec partitionSpec5 = PartitionSpec.builderFor(schema3) + .identity("field3") + .build(); + verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec5, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + @Test + public void testValidatePartitionSpecWithUnpartitioned() { + PartitionSpec partitionSpec3 = PartitionSpec.unpartitioned(); + verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec3, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + @Test + public void testPartitionSpecWithDifferentTransform() { + PartitionSpec partitionSpec = PartitionSpec.builderFor(schema2) + .truncate("field2", 4) + .build(); + verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + private void verifyValidateSchemaIllegalArgumentException(Schema srcSchema, Schema destSchema) { + IllegalArgumentException exception = Assert.expectThrows(IllegalArgumentException.class, () -> { + IcebergTableMetadataValidator.validateSchema(srcSchema, destSchema); + }); + Assert.assertTrue(exception.getMessage().startsWith(SCHEMA_MISMATCH_EXCEPTION)); + } + + private void verifyValidatePartitionSpecIllegalArgumentException(PartitionSpec srcPartitionSpec, + PartitionSpec destPartitionSpec, String expectedMessage) { + IllegalArgumentException exception = Assert.expectThrows(IllegalArgumentException.class, () -> { + IcebergTableMetadataValidator.validatePartitionSpec(srcPartitionSpec, destPartitionSpec); + }); + Assert.assertTrue(exception.getMessage().startsWith(expectedMessage)); + } +} From f4f8464aa32def96d48e4450f89b8aa333a22f1d Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Thu, 3 Oct 2024 10:35:20 +0530 Subject: [PATCH 02/13] added javadoc --- .../IcebergTableMetadataValidator.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java index e121a985c4..8bf9d2f5ff 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java @@ -27,9 +27,20 @@ import lombok.extern.slf4j.Slf4j; +/** + * Validator for Iceberg table metadata, ensuring that the source and destination tables have + * compatible schemas and partition specifications. + */ @Slf4j public class IcebergTableMetadataValidator { + /** + * Validates the metadata of the source and destination Iceberg tables. + * + * @param srcTableMetadata the metadata of the source table + * @param destTableMetadata the metadata of the destination table + * @throws IllegalArgumentException if the schemas or partition specifications do not match + */ public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcTableMetadata, TableMetadata destTableMetadata) { log.info("Starting validation of Source and Destination Iceberg Tables Metadata"); Schema srcTableSchema = srcTableMetadata.schema(); @@ -41,6 +52,13 @@ public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcT log.info("Validation of Source and Destination Iceberg Tables Metadata completed successfully"); } + /** + * Validates that the schemas of the source and destination tables are identical. + * + * @param srcTableSchema the schema of the source table + * @param destTableSchema the schema of the destination table + * @throws IllegalArgumentException if the schemas do not match + */ @VisibleForTesting protected static void validateSchema(Schema srcTableSchema, Schema destTableSchema) { // TODO: Need to add support for schema evolution, currently only supporting copying @@ -61,6 +79,13 @@ protected static void validateSchema(Schema srcTableSchema, Schema destTableSche } } + /** + * Validates that the partition specifications of the source and destination tables are compatible. + * + * @param srcPartitionSpec the partition specification of the source table + * @param destPartitionSpec the partition specification of the destination table + * @throws IllegalArgumentException if the partition specifications do not match + */ @VisibleForTesting protected static void validatePartitionSpec(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec) { // Currently, only supporting copying between iceberg tables with same partition spec From d55f4f5d26096588b526513c699f38b9c1206d53 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Mon, 7 Oct 2024 23:25:32 +0530 Subject: [PATCH 03/13] addressed comments --- .../IcebergTableMetadataValidator.java | 20 +++++++++++++------ .../IcebergTableMetadataValidatorTest.java | 8 ++++---- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java index 8bf9d2f5ff..bfc35d0072 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java @@ -34,6 +34,10 @@ @Slf4j public class IcebergTableMetadataValidator { + private IcebergTableMetadataValidator() { + // Do not instantiate + } + /** * Validates the metadata of the source and destination Iceberg tables. * @@ -42,14 +46,18 @@ public class IcebergTableMetadataValidator { * @throws IllegalArgumentException if the schemas or partition specifications do not match */ public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcTableMetadata, TableMetadata destTableMetadata) { - log.info("Starting validation of Source and Destination Iceberg Tables Metadata"); + log.info("Starting validation of Source : {} and Destination : {} Iceberg Tables Metadata", + srcTableMetadata.location(), + destTableMetadata.location()); Schema srcTableSchema = srcTableMetadata.schema(); Schema destTableSchema = destTableMetadata.schema(); PartitionSpec srcPartitionSpec = srcTableMetadata.spec(); PartitionSpec destPartitionSpec = destTableMetadata.spec(); - validateSchema(srcTableSchema, destTableSchema); - validatePartitionSpec(srcPartitionSpec, destPartitionSpec); - log.info("Validation of Source and Destination Iceberg Tables Metadata completed successfully"); + validateSchemaForEquality(srcTableSchema, destTableSchema); + validatePartitionSpecForEquality(srcPartitionSpec, destPartitionSpec); + log.info("Validation of Source : {} and Destination : {} Iceberg Tables Metadata completed successfully", + srcTableMetadata.location(), + destTableMetadata.location()); } /** @@ -60,7 +68,7 @@ public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcT * @throws IllegalArgumentException if the schemas do not match */ @VisibleForTesting - protected static void validateSchema(Schema srcTableSchema, Schema destTableSchema) { + protected static void validateSchemaForEquality(Schema srcTableSchema, Schema destTableSchema) { // TODO: Need to add support for schema evolution, currently only supporting copying // between iceberg tables with same schema. // This function needs to be broken down into multiple functions to support schema evolution @@ -87,7 +95,7 @@ protected static void validateSchema(Schema srcTableSchema, Schema destTableSche * @throws IllegalArgumentException if the partition specifications do not match */ @VisibleForTesting - protected static void validatePartitionSpec(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec) { + protected static void validatePartitionSpecForEquality(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec) { // Currently, only supporting copying between iceberg tables with same partition spec if (!srcPartitionSpec.compatibleWith(destPartitionSpec)) { String errMsg = String.format( diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java index bcea9cb0e3..96d8a7c55c 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java @@ -66,7 +66,7 @@ public class IcebergTableMetadataValidatorTest { @Test public void testValidateSameSchema() { try { - IcebergTableMetadataValidator.validateSchema(schema1, schema1); + IcebergTableMetadataValidator.validateSchemaForEquality(schema1, schema1); } catch (Exception e) { System.out.println(e.getMessage()); Assert.fail(SHOULD_NOT_THROW_EXCEPTION); @@ -103,7 +103,7 @@ public void testValidateSchemaWithEvolvedSourceSchema() { @Test public void testValidateSamePartitionSpec() { try { - IcebergTableMetadataValidator.validatePartitionSpec(partitionSpec1, partitionSpec1); + IcebergTableMetadataValidator.validatePartitionSpecForEquality(partitionSpec1, partitionSpec1); } catch (Exception e) { System.out.println(e.getMessage()); Assert.fail(SHOULD_NOT_THROW_EXCEPTION); @@ -150,7 +150,7 @@ public void testPartitionSpecWithDifferentTransform() { private void verifyValidateSchemaIllegalArgumentException(Schema srcSchema, Schema destSchema) { IllegalArgumentException exception = Assert.expectThrows(IllegalArgumentException.class, () -> { - IcebergTableMetadataValidator.validateSchema(srcSchema, destSchema); + IcebergTableMetadataValidator.validateSchemaForEquality(srcSchema, destSchema); }); Assert.assertTrue(exception.getMessage().startsWith(SCHEMA_MISMATCH_EXCEPTION)); } @@ -158,7 +158,7 @@ private void verifyValidateSchemaIllegalArgumentException(Schema srcSchema, Sche private void verifyValidatePartitionSpecIllegalArgumentException(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec, String expectedMessage) { IllegalArgumentException exception = Assert.expectThrows(IllegalArgumentException.class, () -> { - IcebergTableMetadataValidator.validatePartitionSpec(srcPartitionSpec, destPartitionSpec); + IcebergTableMetadataValidator.validatePartitionSpecForEquality(srcPartitionSpec, destPartitionSpec); }); Assert.assertTrue(exception.getMessage().startsWith(expectedMessage)); } From f3e8c9b5e9d818ccbbe4ade1af6ca91775a33ce9 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Tue, 8 Oct 2024 12:20:30 +0530 Subject: [PATCH 04/13] refactored and addressed pr comments --- ...> IcebergTableMetadataValidatorUtils.java} | 63 ++---- .../IcebergTableMetadataValidatorTest.java | 165 ---------------- ...cebergTableMetadataValidatorUtilsTest.java | 186 ++++++++++++++++++ 3 files changed, 202 insertions(+), 212 deletions(-) rename gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/{IcebergTableMetadataValidator.java => IcebergTableMetadataValidatorUtils.java} (58%) delete mode 100644 gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java create mode 100644 gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java similarity index 58% rename from gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java rename to gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java index bfc35d0072..321b877547 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java @@ -17,14 +17,12 @@ package org.apache.gobblin.data.management.copy.iceberg; -import java.util.Arrays; +import java.io.IOException; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; -import com.google.common.annotations.VisibleForTesting; - import lombok.extern.slf4j.Slf4j; /** @@ -32,9 +30,9 @@ * compatible schemas and partition specifications. */ @Slf4j -public class IcebergTableMetadataValidator { +public class IcebergTableMetadataValidatorUtils { - private IcebergTableMetadataValidator() { + private IcebergTableMetadataValidatorUtils() { // Do not instantiate } @@ -43,9 +41,10 @@ private IcebergTableMetadataValidator() { * * @param srcTableMetadata the metadata of the source table * @param destTableMetadata the metadata of the destination table - * @throws IllegalArgumentException if the schemas or partition specifications do not match + * @throws IOException if the schemas or partition specifications do not match */ - public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcTableMetadata, TableMetadata destTableMetadata) { + public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcTableMetadata, + TableMetadata destTableMetadata) throws IOException { log.info("Starting validation of Source : {} and Destination : {} Iceberg Tables Metadata", srcTableMetadata.location(), destTableMetadata.location()); @@ -60,15 +59,7 @@ public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcT destTableMetadata.location()); } - /** - * Validates that the schemas of the source and destination tables are identical. - * - * @param srcTableSchema the schema of the source table - * @param destTableSchema the schema of the destination table - * @throws IllegalArgumentException if the schemas do not match - */ - @VisibleForTesting - protected static void validateSchemaForEquality(Schema srcTableSchema, Schema destTableSchema) { + private static void validateSchemaForEquality(Schema srcTableSchema, Schema destTableSchema) throws IOException { // TODO: Need to add support for schema evolution, currently only supporting copying // between iceberg tables with same schema. // This function needs to be broken down into multiple functions to support schema evolution @@ -76,26 +67,21 @@ protected static void validateSchemaForEquality(Schema srcTableSchema, Schema de // - Src Schema is subset of Dest Schema [ Destination Schema Evolved ], // - Src Schema is superset of Dest Schema [ Source Schema Evolved ], // - Other cases? + // Also consider using Strategy or any other design pattern for this to make it a better solution if (!srcTableSchema.sameSchema(destTableSchema)) { String errMsg = String.format( - "Schema Mismatch between Source and Destination Iceberg Tables Schema - Source : {%s} and Destination : {%s}", - srcTableSchema, - destTableSchema + "Schema Mismatch between Source and Destination Iceberg Tables Schema - Source-Schema-Id : {%s} and " + + "Destination-Schema-Id : {%s}", + srcTableSchema.schemaId(), + destTableSchema.schemaId() ); log.error(errMsg); - throw new IllegalArgumentException(errMsg); + throw new IOException(errMsg); } } - /** - * Validates that the partition specifications of the source and destination tables are compatible. - * - * @param srcPartitionSpec the partition specification of the source table - * @param destPartitionSpec the partition specification of the destination table - * @throws IllegalArgumentException if the partition specifications do not match - */ - @VisibleForTesting - protected static void validatePartitionSpecForEquality(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec) { + private static void validatePartitionSpecForEquality(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec) + throws IOException { // Currently, only supporting copying between iceberg tables with same partition spec if (!srcPartitionSpec.compatibleWith(destPartitionSpec)) { String errMsg = String.format( @@ -104,24 +90,7 @@ protected static void validatePartitionSpecForEquality(PartitionSpec srcPartitio destPartitionSpec ); log.error(errMsg); - throw new IllegalArgumentException(errMsg); - } - // .compatibleWith() does not check if the partition field in partition spec have same java classes or not - // i.e. if the partition field in partition spec is of type Integer in src table and String in dest table, - // so need to put an additional check for that - // try to run test testValidatePartitionSpecWithDiffType() in IcebergTableMetadataValidatorTest.java with - // this check commented out - // TODO: This check can be removed after adding support for schema evolution - if (!Arrays.equals(srcPartitionSpec.javaClasses(), destPartitionSpec.javaClasses())) { - String errMsg = String.format( - "Partition Spec Have different types for same partition field between Source and Destination Iceberg Table - " - + "Source : {%s} and Destination : {%s}", - srcPartitionSpec, - destPartitionSpec - ); - log.error(errMsg); - throw new IllegalArgumentException(errMsg); + throw new IOException(errMsg); } } - } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java deleted file mode 100644 index 96d8a7c55c..0000000000 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.data.management.copy.iceberg; - -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.Schema; -import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder; -import org.testng.Assert; -import org.testng.annotations.Test; - -public class IcebergTableMetadataValidatorTest { - private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema1 = - SchemaBuilder.record("schema1") - .fields() - .requiredString("field1") - .requiredString("field2") - .endRecord(); - private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema2 = - SchemaBuilder.record("schema2") - .fields() - .requiredString("field2") - .requiredString("field1") - .endRecord(); - private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema3 = - SchemaBuilder.record("schema3") - .fields() - .requiredString("field1") - .requiredString("field2") - .requiredInt("field3") - .endRecord(); - private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema4 = - SchemaBuilder.record("schema3") - .fields() - .requiredInt("field1") - .requiredString("field2") - .requiredInt("field3") - .endRecord(); - private static final Schema schema1 = AvroSchemaUtil.toIceberg(avroDataSchema1); - private static final Schema schema2 = AvroSchemaUtil.toIceberg(avroDataSchema2); - private static final Schema schema3 = AvroSchemaUtil.toIceberg(avroDataSchema3); - private static final Schema schema4 = AvroSchemaUtil.toIceberg(avroDataSchema4); - private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1) - .identity("field1") - .build(); - private static final String SHOULD_NOT_THROW_EXCEPTION = "Should not throw any exception"; - private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Source and Destination Iceberg Tables Schema"; - private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Source and Destination Iceberg Tables Partition Spec"; - private static final String PARTITION_SPEC_DIFF_TYPE_EXCEPTION = "Partition Spec Have different types for same partition field between Source and Destination Iceberg Table"; - - @Test - public void testValidateSameSchema() { - try { - IcebergTableMetadataValidator.validateSchemaForEquality(schema1, schema1); - } catch (Exception e) { - System.out.println(e.getMessage()); - Assert.fail(SHOULD_NOT_THROW_EXCEPTION); - } - } - - @Test - public void testValidateDifferentSchema() { - // Schema 1 and Schema 2 have different field order - verifyValidateSchemaIllegalArgumentException(schema1, schema2); - } - - @Test - public void testValidateSchemaWithDifferentTypes() { - verifyValidateSchemaIllegalArgumentException(schema3, schema4); - } - - @Test - public void testValidateSchemaWithEvolvedDestinationSchema() { - // TODO: This test should pass in the future when we support source side schema evolution and here there should - // not be any commit needed on destination side - // Schema 3 has one more extra field as compared to Schema 1 - verifyValidateSchemaIllegalArgumentException(schema1, schema3); - } - - @Test - public void testValidateSchemaWithEvolvedSourceSchema() { - // TODO: This test should pass in the future when we support source side schema evolution and commit the changes - // on destination side either through IcebergRegisterStep or any other CommitStep - // Schema 3 has one more extra field as compared to Schema 1 - verifyValidateSchemaIllegalArgumentException(schema3, schema1); - } - - @Test - public void testValidateSamePartitionSpec() { - try { - IcebergTableMetadataValidator.validatePartitionSpecForEquality(partitionSpec1, partitionSpec1); - } catch (Exception e) { - System.out.println(e.getMessage()); - Assert.fail(SHOULD_NOT_THROW_EXCEPTION); - } - } - - @Test - public void testValidatePartitionSpecWithDiffType() { - PartitionSpec partitionSpec4 = PartitionSpec.builderFor(schema4) - .identity("field1") - .build(); - verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec4, PARTITION_SPEC_DIFF_TYPE_EXCEPTION); - } - - @Test - public void testValidatePartitionSpecWithDiffName() { - PartitionSpec partitionSpec2 = PartitionSpec.builderFor(schema2) - .identity("field2") - .build(); - verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec2, PARTITION_SPEC_MISMATCH_EXCEPTION); - } - - @Test - public void testValidatePartitionSpecWithDiffNameAndDiffType() { - PartitionSpec partitionSpec5 = PartitionSpec.builderFor(schema3) - .identity("field3") - .build(); - verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec5, PARTITION_SPEC_MISMATCH_EXCEPTION); - } - - @Test - public void testValidatePartitionSpecWithUnpartitioned() { - PartitionSpec partitionSpec3 = PartitionSpec.unpartitioned(); - verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec3, PARTITION_SPEC_MISMATCH_EXCEPTION); - } - - @Test - public void testPartitionSpecWithDifferentTransform() { - PartitionSpec partitionSpec = PartitionSpec.builderFor(schema2) - .truncate("field2", 4) - .build(); - verifyValidatePartitionSpecIllegalArgumentException(partitionSpec1, partitionSpec, PARTITION_SPEC_MISMATCH_EXCEPTION); - } - - private void verifyValidateSchemaIllegalArgumentException(Schema srcSchema, Schema destSchema) { - IllegalArgumentException exception = Assert.expectThrows(IllegalArgumentException.class, () -> { - IcebergTableMetadataValidator.validateSchemaForEquality(srcSchema, destSchema); - }); - Assert.assertTrue(exception.getMessage().startsWith(SCHEMA_MISMATCH_EXCEPTION)); - } - - private void verifyValidatePartitionSpecIllegalArgumentException(PartitionSpec srcPartitionSpec, - PartitionSpec destPartitionSpec, String expectedMessage) { - IllegalArgumentException exception = Assert.expectThrows(IllegalArgumentException.class, () -> { - IcebergTableMetadataValidator.validatePartitionSpecForEquality(srcPartitionSpec, destPartitionSpec); - }); - Assert.assertTrue(exception.getMessage().startsWith(expectedMessage)); - } -} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java new file mode 100644 index 0000000000..2760c58e95 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class IcebergTableMetadataValidatorUtilsTest { + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema1 = + SchemaBuilder.record("schema1") + .fields() + .requiredString("field1") + .requiredString("field2") + .endRecord(); + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema2 = + SchemaBuilder.record("schema2") + .fields() + .requiredString("field2") + .requiredString("field1") + .endRecord(); + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema3 = + SchemaBuilder.record("schema3") + .fields() + .requiredString("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord(); + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema4 = + SchemaBuilder.record("schema4") + .fields() + .requiredInt("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord(); + private static final PartitionSpec unpartitionedPartitionSpec = PartitionSpec.unpartitioned(); + private static final Schema schema1 = AvroSchemaUtil.toIceberg(avroDataSchema1); + private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(avroDataSchema2); + private static final Schema schema3 = AvroSchemaUtil.toIceberg(avroDataSchema3); + private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(avroDataSchema4); + private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1) + .identity("field1") + .build(); + private static final TableMetadata tableMetadataWithSchema1AndUnpartitionedSpec = TableMetadata.newTableMetadata( + schema1, unpartitionedPartitionSpec, "metadataLocationWithSchema1AndUnpartitionedSpec", new HashMap<>()); + private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 = TableMetadata.newTableMetadata( + schema1, partitionSpec1, "metadataLocationWithSchema2AndUnpartitionedSpec", new HashMap<>()); + private static final TableMetadata tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata( + schema3, unpartitionedPartitionSpec, "metadataLocationWithSchema1AndUnpartitionedSpec", new HashMap<>()); + private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Source and Destination Iceberg Tables Schema"; + private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Source and Destination Iceberg Tables Partition Spec"; + @Test + public void testValidateSameSchema() throws IOException { + IcebergTableMetadataValidatorUtils.validateSourceAndDestinationTablesMetadata( + tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec + ); + } + + @Test + public void testValidateDifferentSchema() { + // Schema 1 and Schema 2 have different field order + + TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat, + unpartitionedPartitionSpec, "destMetadataLocationForSchema2", new HashMap<>()); + + verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + destTableMetadata, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateSchemaWithDifferentTypes() { + // schema 3 and schema 4 have different field types for field1 + + TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat, + unpartitionedPartitionSpec, "destMetadataLocationForSchema4", new HashMap<>()); + + verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema3AndUnpartitionedSpec, + destTableMetadata, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateSchemaWithEvolvedDestinationSchema() { + // TODO: This test should pass in the future when we support source side schema evolution and here there should + // not be any commit needed on destination side + // Schema 3 has one more extra field as compared to Schema 1 + verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateSchemaWithEvolvedSourceSchema() { + // TODO: This test should pass in the future when we support source side schema evolution and commit the changes + // on destination side either through IcebergRegisterStep or any other CommitStep + // Schema 3 has one more extra field as compared to Schema 1 + verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema3AndUnpartitionedSpec, + tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateEvolvedSourceSchemaFromIntToLongType() { + // Adding this test as to verify that partition copy doesn't proceed further for this case + // as while doing poc and testing had seen final commit gets fail if there is mismatch in field type + // specially from int to long + org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema = + SchemaBuilder.record("schema5") + .fields() + .requiredLong("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord(); + Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(avroDataSchema); + PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4) + .identity("field1") + .build(); + TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4, partitionSpec, + "destMetadataLocationForSchema5", new HashMap<>()); + + verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + destTableMetadata, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateSamePartitionSpec() throws IOException { + IcebergTableMetadataValidatorUtils.validateSourceAndDestinationTablesMetadata( + tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1 + ); + } + + @Test + public void testValidatePartitionSpecWithDiffName() { + PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1) + .identity("field2") + .build(); + TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema1, partitionSpec12, + "destMetadataLocationForSchema1", new HashMap<>()); + verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndPartitionSpec1, + destTableMetadata, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + @Test + public void testValidatePartitionSpecWithUnpartitioned() { + verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + tableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + @Test + public void testPartitionSpecWithDifferentTransform() { + PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1) + .truncate("field1", 4) + .build(); + TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema1, partitionSpec12, + "destMetadataLocationForSchema1", new HashMap<>()); + verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndPartitionSpec1, + destTableMetadata, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + private void verifyValidateSourceAndDestinationTablesMetadataIOException(TableMetadata srcTableMetadata, + TableMetadata destTableMetadata, String expectedMessage) { + IOException exception = Assert.expectThrows(IOException.class, () -> { + IcebergTableMetadataValidatorUtils.validateSourceAndDestinationTablesMetadata(srcTableMetadata, destTableMetadata); + }); + Assert.assertTrue(exception.getMessage().startsWith(expectedMessage)); + } +} From 753dc9403768202a9e1e950a2572d9762f3b399c Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Thu, 24 Oct 2024 08:02:40 +0530 Subject: [PATCH 05/13] adding metadata validator in partition dataset finder --- .../management/copy/iceberg/IcebergPartitionDatasetFinder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index 581a265e38..2d675dcc0f 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -46,7 +46,8 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) @Override protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException { -// TODO: Add Validator for source and destination tables later + IcebergTableMetadataValidatorUtils.validateSourceAndDestinationTablesMetadata( + srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata()); String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, ICEBERG_PARTITION_NAME_KEY); From 205ec9e3ff5db605278dbe59baba14fc0f236565 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Fri, 25 Oct 2024 00:16:32 +0530 Subject: [PATCH 06/13] refactored based on review comments --- .../IcebergPartitionDatasetFinder.java | 2 +- .../IcebergTableMetadataValidatorUtils.java | 76 +++++---- ...cebergTableMetadataValidatorUtilsTest.java | 144 ++++++++---------- 3 files changed, 104 insertions(+), 118 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index 2d675dcc0f..d4cd85d053 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -46,7 +46,7 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) @Override protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException { - IcebergTableMetadataValidatorUtils.validateSourceAndDestinationTablesMetadata( + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata()); String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java index 321b877547..1b1e38b9d8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java @@ -37,60 +37,58 @@ private IcebergTableMetadataValidatorUtils() { } /** - * Validates the metadata of the source and destination Iceberg tables. - * - * @param srcTableMetadata the metadata of the source table - * @param destTableMetadata the metadata of the destination table - * @throws IOException if the schemas or partition specifications do not match + * Compares the metadata of the given two iceberg tables. + *
    + *
  • First compares the schema of the metadata.
  • + *
  • Then compares the partition spec of the metadata.
  • + *
+ * @param tableAMetadata the metadata of the first table + * @param tableBMetadata the metadata of the second table + * @throws IOException if the schemas or partition spec do not match */ - public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcTableMetadata, - TableMetadata destTableMetadata) throws IOException { - log.info("Starting validation of Source : {} and Destination : {} Iceberg Tables Metadata", - srcTableMetadata.location(), - destTableMetadata.location()); - Schema srcTableSchema = srcTableMetadata.schema(); - Schema destTableSchema = destTableMetadata.schema(); - PartitionSpec srcPartitionSpec = srcTableMetadata.spec(); - PartitionSpec destPartitionSpec = destTableMetadata.spec(); - validateSchemaForEquality(srcTableSchema, destTableSchema); - validatePartitionSpecForEquality(srcPartitionSpec, destPartitionSpec); - log.info("Validation of Source : {} and Destination : {} Iceberg Tables Metadata completed successfully", - srcTableMetadata.location(), - destTableMetadata.location()); - } + public static void failUnlessCompatibleStructure(TableMetadata tableAMetadata, + TableMetadata tableBMetadata) throws IOException { + log.info("Starting comparison between iceberg tables with metadata file location : {} and {}", + tableAMetadata.metadataFileLocation(), + tableBMetadata.metadataFileLocation()); - private static void validateSchemaForEquality(Schema srcTableSchema, Schema destTableSchema) throws IOException { - // TODO: Need to add support for schema evolution, currently only supporting copying - // between iceberg tables with same schema. + Schema tableASchema = tableAMetadata.schema(); + Schema tableBSchema = tableBMetadata.schema(); + // TODO: Need to add support for schema evolution // This function needs to be broken down into multiple functions to support schema evolution - // Possible cases - Src Schema == Dest Schema, - // - Src Schema is subset of Dest Schema [ Destination Schema Evolved ], - // - Src Schema is superset of Dest Schema [ Source Schema Evolved ], + // Possible cases - tableASchema == tableBSchema, + // - tableASchema is subset of tableBSchema [ tableBSchema Evolved ], + // - tableASchema is superset of tableBSchema [ tableASchema Evolved ], // - Other cases? // Also consider using Strategy or any other design pattern for this to make it a better solution - if (!srcTableSchema.sameSchema(destTableSchema)) { + if (!tableASchema.sameSchema(tableBSchema)) { String errMsg = String.format( - "Schema Mismatch between Source and Destination Iceberg Tables Schema - Source-Schema-Id : {%s} and " - + "Destination-Schema-Id : {%s}", - srcTableSchema.schemaId(), - destTableSchema.schemaId() + "Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}", + tableAMetadata.metadataFileLocation(), + tableASchema.schemaId(), + tableBMetadata.metadataFileLocation(), + tableBSchema.schemaId() ); log.error(errMsg); throw new IOException(errMsg); } - } - private static void validatePartitionSpecForEquality(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec) - throws IOException { - // Currently, only supporting copying between iceberg tables with same partition spec - if (!srcPartitionSpec.compatibleWith(destPartitionSpec)) { + PartitionSpec tableAPartitionSpec = tableAMetadata.spec(); + PartitionSpec tableBPartitionSpec = tableBMetadata.spec(); + if (!tableAPartitionSpec.compatibleWith(tableBPartitionSpec)) { String errMsg = String.format( - "Partition Spec Mismatch between Source and Destination Iceberg Tables Partition Spec - Source : {%s} and Destination : {%s}", - srcPartitionSpec, - destPartitionSpec + "Partition Spec Mismatch between Metadata{%s} - PartitionSpecId{%d} and Metadata{%s} - PartitionSpecId{%d}", + tableAMetadata.metadataFileLocation(), + tableAPartitionSpec.specId(), + tableBMetadata.metadataFileLocation(), + tableBPartitionSpec.specId() ); log.error(errMsg); throw new IOException(errMsg); } + + log.info("Comparison completed successfully between iceberg tables with metadata file location : {} and {}", + tableAMetadata.metadataFileLocation(), + tableBMetadata.metadataFileLocation()); } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java index 2760c58e95..d8651f304e 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java @@ -29,51 +29,43 @@ import org.testng.annotations.Test; public class IcebergTableMetadataValidatorUtilsTest { - private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema1 = - SchemaBuilder.record("schema1") - .fields() - .requiredString("field1") - .requiredString("field2") - .endRecord(); - private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema2 = - SchemaBuilder.record("schema2") - .fields() - .requiredString("field2") - .requiredString("field1") - .endRecord(); - private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema3 = - SchemaBuilder.record("schema3") - .fields() - .requiredString("field1") - .requiredString("field2") - .requiredInt("field3") - .endRecord(); - private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema4 = - SchemaBuilder.record("schema4") - .fields() - .requiredInt("field1") - .requiredString("field2") - .requiredInt("field3") - .endRecord(); private static final PartitionSpec unpartitionedPartitionSpec = PartitionSpec.unpartitioned(); - private static final Schema schema1 = AvroSchemaUtil.toIceberg(avroDataSchema1); - private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(avroDataSchema2); - private static final Schema schema3 = AvroSchemaUtil.toIceberg(avroDataSchema3); - private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(avroDataSchema4); + private static final Schema schema1 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1") + .fields() + .requiredString("field1") + .requiredString("field2") + .endRecord()); + private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2") + .fields() + .requiredString("field2") + .requiredString("field1") + .endRecord()); + private static final Schema schema3 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3") + .fields() + .requiredString("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord()); + private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4") + .fields() + .requiredInt("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord()); private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1) .identity("field1") .build(); private static final TableMetadata tableMetadataWithSchema1AndUnpartitionedSpec = TableMetadata.newTableMetadata( - schema1, unpartitionedPartitionSpec, "metadataLocationWithSchema1AndUnpartitionedSpec", new HashMap<>()); + schema1, unpartitionedPartitionSpec, "tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>()); private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 = TableMetadata.newTableMetadata( - schema1, partitionSpec1, "metadataLocationWithSchema2AndUnpartitionedSpec", new HashMap<>()); + schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", new HashMap<>()); private static final TableMetadata tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata( - schema3, unpartitionedPartitionSpec, "metadataLocationWithSchema1AndUnpartitionedSpec", new HashMap<>()); - private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Source and Destination Iceberg Tables Schema"; - private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Source and Destination Iceberg Tables Partition Spec"; + schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>()); + private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata"; + private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata"; @Test public void testValidateSameSchema() throws IOException { - IcebergTableMetadataValidatorUtils.validateSourceAndDestinationTablesMetadata( + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec ); } @@ -82,68 +74,64 @@ public void testValidateSameSchema() throws IOException { public void testValidateDifferentSchema() { // Schema 1 and Schema 2 have different field order - TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat, - unpartitionedPartitionSpec, "destMetadataLocationForSchema2", new HashMap<>()); + TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat, + unpartitionedPartitionSpec, "tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>()); - verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndUnpartitionedSpec, - destTableMetadata, SCHEMA_MISMATCH_EXCEPTION); + verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + tableMetadataWithSchema2AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); } @Test public void testValidateSchemaWithDifferentTypes() { // schema 3 and schema 4 have different field types for field1 - TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat, - unpartitionedPartitionSpec, "destMetadataLocationForSchema4", new HashMap<>()); + TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat, + unpartitionedPartitionSpec, "tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>()); - verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema3AndUnpartitionedSpec, - destTableMetadata, SCHEMA_MISMATCH_EXCEPTION); + verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema3AndUnpartitionedSpec, + tableMetadataWithSchema4AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); } @Test - public void testValidateSchemaWithEvolvedDestinationSchema() { - // TODO: This test should pass in the future when we support source side schema evolution and here there should - // not be any commit needed on destination side + public void testValidateSchemaWithEvolvedSchemaI() { + // TODO: This test should pass in the future when we support schema evolution // Schema 3 has one more extra field as compared to Schema 1 - verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); } @Test - public void testValidateSchemaWithEvolvedSourceSchema() { - // TODO: This test should pass in the future when we support source side schema evolution and commit the changes - // on destination side either through IcebergRegisterStep or any other CommitStep + public void testValidateSchemaWithEvolvedSchemaII() { + // TODO: This test should pass in the future when we support schema evolution // Schema 3 has one more extra field as compared to Schema 1 - verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema3AndUnpartitionedSpec, + verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema3AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); } @Test - public void testValidateEvolvedSourceSchemaFromIntToLongType() { + public void testValidateOneSchemaEvolvedFromIntToLongType() { // Adding this test as to verify that partition copy doesn't proceed further for this case // as while doing poc and testing had seen final commit gets fail if there is mismatch in field type // specially from int to long - org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema = - SchemaBuilder.record("schema5") - .fields() - .requiredLong("field1") - .requiredString("field2") - .requiredInt("field3") - .endRecord(); - Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(avroDataSchema); + Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5") + .fields() + .requiredLong("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord()); PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4) .identity("field1") .build(); - TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4, partitionSpec, - "destMetadataLocationForSchema5", new HashMap<>()); + TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4, partitionSpec, + "tableLocationForSchema5WithPartitionSpec", new HashMap<>()); - verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndUnpartitionedSpec, - destTableMetadata, SCHEMA_MISMATCH_EXCEPTION); + verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION); } @Test public void testValidateSamePartitionSpec() throws IOException { - IcebergTableMetadataValidatorUtils.validateSourceAndDestinationTablesMetadata( + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1 ); } @@ -153,15 +141,15 @@ public void testValidatePartitionSpecWithDiffName() { PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1) .identity("field2") .build(); - TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema1, partitionSpec12, - "destMetadataLocationForSchema1", new HashMap<>()); - verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndPartitionSpec1, - destTableMetadata, PARTITION_SPEC_MISMATCH_EXCEPTION); + TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12, + "tableLocationForSchema1WithPartitionSpec12", new HashMap<>()); + verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndPartitionSpec1, + tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION); } @Test public void testValidatePartitionSpecWithUnpartitioned() { - verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION); } @@ -170,16 +158,16 @@ public void testPartitionSpecWithDifferentTransform() { PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1) .truncate("field1", 4) .build(); - TableMetadata destTableMetadata = TableMetadata.newTableMetadata(schema1, partitionSpec12, - "destMetadataLocationForSchema1", new HashMap<>()); - verifyValidateSourceAndDestinationTablesMetadataIOException(tableMetadataWithSchema1AndPartitionSpec1, - destTableMetadata, PARTITION_SPEC_MISMATCH_EXCEPTION); + TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12, + "tableLocationForSchema1WithPartitionSpec12", new HashMap<>()); + verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndPartitionSpec1, + tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION); } - private void verifyValidateSourceAndDestinationTablesMetadataIOException(TableMetadata srcTableMetadata, - TableMetadata destTableMetadata, String expectedMessage) { + private void verifyFailUnlessCompatibleStructureIOException(TableMetadata tableAMetadata, + TableMetadata tableBMetadata, String expectedMessage) { IOException exception = Assert.expectThrows(IOException.class, () -> { - IcebergTableMetadataValidatorUtils.validateSourceAndDestinationTablesMetadata(srcTableMetadata, destTableMetadata); + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata, tableBMetadata); }); Assert.assertTrue(exception.getMessage().startsWith(expectedMessage)); } From 5dff8568957567e286f5e5fd0d2a7afc2f993115 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Fri, 25 Oct 2024 10:09:08 +0530 Subject: [PATCH 07/13] fixed typos --- .../copy/iceberg/IcebergTableMetadataValidatorUtils.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java index 1b1e38b9d8..f6de5b8021 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java @@ -26,8 +26,7 @@ import lombok.extern.slf4j.Slf4j; /** - * Validator for Iceberg table metadata, ensuring that the source and destination tables have - * compatible schemas and partition specifications. + * Validator for Iceberg table metadata, ensuring that the given tables metadata have same schema and partition spec. */ @Slf4j public class IcebergTableMetadataValidatorUtils { @@ -55,7 +54,7 @@ public static void failUnlessCompatibleStructure(TableMetadata tableAMetadata, Schema tableASchema = tableAMetadata.schema(); Schema tableBSchema = tableBMetadata.schema(); // TODO: Need to add support for schema evolution - // This function needs to be broken down into multiple functions to support schema evolution + // This check needs to be broken down into multiple checks to support schema evolution // Possible cases - tableASchema == tableBSchema, // - tableASchema is subset of tableBSchema [ tableBSchema Evolved ], // - tableASchema is superset of tableBSchema [ tableASchema Evolved ], From e38deffa77e83248d1b5e0c8141e7cbbe34ff9d3 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Sat, 26 Oct 2024 00:34:35 +0530 Subject: [PATCH 08/13] updated code to provide strict checkness for partition spec --- .../iceberg/IcebergPartitionDatasetFinder.java | 9 ++++++++- .../IcebergTableMetadataValidatorUtils.java | 8 ++++++-- .../IcebergTableMetadataValidatorUtilsTest.java | 14 +++++++++----- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index d4cd85d053..2b413712f3 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -38,6 +38,9 @@ public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value"; + public static final String ICEBERG_PARTITION_DATASET_PARTITION_SPEC_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + "strictEqualityForPartitionSpec"; + // Taking default value as true so that no partition spec evaluation is allowed on neither source nor destination + public static final String DEFAULT_ICEBERG_PARTITION_DATASET_PARTITION_SPEC_STRICT_EQUALITY = "true"; public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) { super(sourceFs, properties); @@ -46,8 +49,12 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) @Override protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException { + + boolean strictEqualityForPartitionSpec = Boolean.parseBoolean(properties.getProperty(ICEBERG_PARTITION_DATASET_PARTITION_SPEC_STRICT_EQUALITY, + DEFAULT_ICEBERG_PARTITION_DATASET_PARTITION_SPEC_STRICT_EQUALITY)); + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( - srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata()); + srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), strictEqualityForPartitionSpec); String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, ICEBERG_PARTITION_NAME_KEY); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java index f6de5b8021..d25fc2ae64 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java @@ -43,10 +43,11 @@ private IcebergTableMetadataValidatorUtils() { * * @param tableAMetadata the metadata of the first table * @param tableBMetadata the metadata of the second table + * @param strictEqualityForPartitionSpec boolean value to control strictness of partition spec comparison * @throws IOException if the schemas or partition spec do not match */ public static void failUnlessCompatibleStructure(TableMetadata tableAMetadata, - TableMetadata tableBMetadata) throws IOException { + TableMetadata tableBMetadata, boolean strictEqualityForPartitionSpec) throws IOException { log.info("Starting comparison between iceberg tables with metadata file location : {} and {}", tableAMetadata.metadataFileLocation(), tableBMetadata.metadataFileLocation()); @@ -74,7 +75,10 @@ public static void failUnlessCompatibleStructure(TableMetadata tableAMetadata, PartitionSpec tableAPartitionSpec = tableAMetadata.spec(); PartitionSpec tableBPartitionSpec = tableBMetadata.spec(); - if (!tableAPartitionSpec.compatibleWith(tableBPartitionSpec)) { + // .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does + boolean partitionSpecMatch = strictEqualityForPartitionSpec ? tableAPartitionSpec.equals(tableBPartitionSpec) + : tableAPartitionSpec.compatibleWith(tableBPartitionSpec); + if (!partitionSpecMatch) { String errMsg = String.format( "Partition Spec Mismatch between Metadata{%s} - PartitionSpecId{%d} and Metadata{%s} - PartitionSpecId{%d}", tableAMetadata.metadataFileLocation(), diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java index d8651f304e..7968025330 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java @@ -63,10 +63,12 @@ public class IcebergTableMetadataValidatorUtilsTest { schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>()); private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata"; private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata"; + private static final boolean PARTITION_SPEC_STRICT_EQUALITY_CHECK = Boolean.TRUE; @Test public void testValidateSameSchema() throws IOException { IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( - tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec + tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec, + PARTITION_SPEC_STRICT_EQUALITY_CHECK ); } @@ -122,8 +124,8 @@ public void testValidateOneSchemaEvolvedFromIntToLongType() { PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4) .identity("field1") .build(); - TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4, partitionSpec, - "tableLocationForSchema5WithPartitionSpec", new HashMap<>()); + TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4, + partitionSpec, "tableLocationForSchema5WithPartitionSpec", new HashMap<>()); verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION); @@ -132,7 +134,8 @@ public void testValidateOneSchemaEvolvedFromIntToLongType() { @Test public void testValidateSamePartitionSpec() throws IOException { IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( - tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1 + tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1, + PARTITION_SPEC_STRICT_EQUALITY_CHECK ); } @@ -167,7 +170,8 @@ public void testPartitionSpecWithDifferentTransform() { private void verifyFailUnlessCompatibleStructureIOException(TableMetadata tableAMetadata, TableMetadata tableBMetadata, String expectedMessage) { IOException exception = Assert.expectThrows(IOException.class, () -> { - IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata, tableBMetadata); + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata, tableBMetadata, + PARTITION_SPEC_STRICT_EQUALITY_CHECK); }); Assert.assertTrue(exception.getMessage().startsWith(expectedMessage)); } From 7a2c1d21863e405cb61f0a14d1143cb2de5f0b18 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Sat, 26 Oct 2024 18:37:36 +0530 Subject: [PATCH 09/13] added test for strict equality --- .../IcebergPartitionDatasetFinder.java | 10 ++-- .../IcebergTableMetadataValidatorUtils.java | 54 +++++++++---------- ...cebergTableMetadataValidatorUtilsTest.java | 21 ++++++++ 3 files changed, 53 insertions(+), 32 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index 2b413712f3..0d623982d8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -38,9 +38,9 @@ public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value"; - public static final String ICEBERG_PARTITION_DATASET_PARTITION_SPEC_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + "strictEqualityForPartitionSpec"; + public static final String ICEBERG_DATASET_VALIDATE_STRICT_PARTITION_EQUALITY = ICEBERG_DATASET_PREFIX + "validate.strict.partition.equality"; // Taking default value as true so that no partition spec evaluation is allowed on neither source nor destination - public static final String DEFAULT_ICEBERG_PARTITION_DATASET_PARTITION_SPEC_STRICT_EQUALITY = "true"; + public static final String DEFAULT_ICEBERG_DATASET_VALIDATE_STRICT_PARTITION_EQUALITY = "true"; public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) { super(sourceFs, properties); @@ -50,11 +50,11 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException { - boolean strictEqualityForPartitionSpec = Boolean.parseBoolean(properties.getProperty(ICEBERG_PARTITION_DATASET_PARTITION_SPEC_STRICT_EQUALITY, - DEFAULT_ICEBERG_PARTITION_DATASET_PARTITION_SPEC_STRICT_EQUALITY)); + boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_VALIDATE_STRICT_PARTITION_EQUALITY, + DEFAULT_ICEBERG_DATASET_VALIDATE_STRICT_PARTITION_EQUALITY)); IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( - srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), strictEqualityForPartitionSpec); + srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality); String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, ICEBERG_PARTITION_NAME_KEY); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java index d25fc2ae64..7c47f2dfb8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java @@ -41,57 +41,57 @@ private IcebergTableMetadataValidatorUtils() { *
  • First compares the schema of the metadata.
  • *
  • Then compares the partition spec of the metadata.
  • * - * @param tableAMetadata the metadata of the first table - * @param tableBMetadata the metadata of the second table - * @param strictEqualityForPartitionSpec boolean value to control strictness of partition spec comparison + * @param tableMetadataA the metadata of the first table + * @param tableMetadataB the metadata of the second table + * @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison * @throws IOException if the schemas or partition spec do not match */ - public static void failUnlessCompatibleStructure(TableMetadata tableAMetadata, - TableMetadata tableBMetadata, boolean strictEqualityForPartitionSpec) throws IOException { + public static void failUnlessCompatibleStructure(TableMetadata tableMetadataA, + TableMetadata tableMetadataB, boolean validateStrictPartitionEquality) throws IOException { log.info("Starting comparison between iceberg tables with metadata file location : {} and {}", - tableAMetadata.metadataFileLocation(), - tableBMetadata.metadataFileLocation()); + tableMetadataA.metadataFileLocation(), + tableMetadataB.metadataFileLocation()); - Schema tableASchema = tableAMetadata.schema(); - Schema tableBSchema = tableBMetadata.schema(); + Schema schemaA = tableMetadataA.schema(); + Schema schemaB = tableMetadataB.schema(); // TODO: Need to add support for schema evolution // This check needs to be broken down into multiple checks to support schema evolution - // Possible cases - tableASchema == tableBSchema, - // - tableASchema is subset of tableBSchema [ tableBSchema Evolved ], - // - tableASchema is superset of tableBSchema [ tableASchema Evolved ], + // Possible cases - schemaA == schemaB, + // - schemaA is subset of schemaB [ schemaB Evolved ], + // - schemaA is superset of schemaB [ schemaA Evolved ], // - Other cases? // Also consider using Strategy or any other design pattern for this to make it a better solution - if (!tableASchema.sameSchema(tableBSchema)) { + if (!schemaA.sameSchema(schemaB)) { String errMsg = String.format( "Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}", - tableAMetadata.metadataFileLocation(), - tableASchema.schemaId(), - tableBMetadata.metadataFileLocation(), - tableBSchema.schemaId() + tableMetadataA.metadataFileLocation(), + schemaA.schemaId(), + tableMetadataB.metadataFileLocation(), + schemaB.schemaId() ); log.error(errMsg); throw new IOException(errMsg); } - PartitionSpec tableAPartitionSpec = tableAMetadata.spec(); - PartitionSpec tableBPartitionSpec = tableBMetadata.spec(); + PartitionSpec partitionSpecA = tableMetadataA.spec(); + PartitionSpec partitionSpecB = tableMetadataB.spec(); // .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does - boolean partitionSpecMatch = strictEqualityForPartitionSpec ? tableAPartitionSpec.equals(tableBPartitionSpec) - : tableAPartitionSpec.compatibleWith(tableBPartitionSpec); + boolean partitionSpecMatch = validateStrictPartitionEquality ? partitionSpecA.equals(partitionSpecB) + : partitionSpecA.compatibleWith(partitionSpecB); if (!partitionSpecMatch) { String errMsg = String.format( "Partition Spec Mismatch between Metadata{%s} - PartitionSpecId{%d} and Metadata{%s} - PartitionSpecId{%d}", - tableAMetadata.metadataFileLocation(), - tableAPartitionSpec.specId(), - tableBMetadata.metadataFileLocation(), - tableBPartitionSpec.specId() + tableMetadataA.metadataFileLocation(), + partitionSpecA.specId(), + tableMetadataB.metadataFileLocation(), + partitionSpecB.specId() ); log.error(errMsg); throw new IOException(errMsg); } log.info("Comparison completed successfully between iceberg tables with metadata file location : {} and {}", - tableAMetadata.metadataFileLocation(), - tableBMetadata.metadataFileLocation()); + tableMetadataA.metadataFileLocation(), + tableMetadataB.metadataFileLocation()); } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java index 7968025330..80ec8f9e91 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java @@ -167,6 +167,27 @@ public void testPartitionSpecWithDifferentTransform() { tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION); } + @Test + public void testStrictPartitionSpecEquality() throws IOException { + PartitionSpec partitionSpecWithTwoCols = PartitionSpec.builderFor(schema1) + .identity("field1") + .identity("field2") + .build(); + + TableMetadata tableMetadataWithSchema1AndPartitionSpecWithTwoCols = TableMetadata.newTableMetadata(schema1, + partitionSpecWithTwoCols, "tableLocationForSchema1WithPartitionSpecWithTwoCols", new HashMap<>()); + TableMetadata updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1 = tableMetadataWithSchema1AndPartitionSpec1 + .updatePartitionSpec(tableMetadataWithSchema1AndPartitionSpecWithTwoCols.spec()); + + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( + tableMetadataWithSchema1AndPartitionSpecWithTwoCols, + updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, + false); + + verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndPartitionSpecWithTwoCols, + updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + private void verifyFailUnlessCompatibleStructureIOException(TableMetadata tableAMetadata, TableMetadata tableBMetadata, String expectedMessage) { IOException exception = Assert.expectThrows(IOException.class, () -> { From c09eeedc23e530859887e2f1d5db47ea4b4113d3 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Mon, 28 Oct 2024 21:33:10 +0530 Subject: [PATCH 10/13] corrected variable name --- .../IcebergTableMetadataValidatorUtilsTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java index 80ec8f9e91..90c5fe34ea 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java @@ -63,12 +63,13 @@ public class IcebergTableMetadataValidatorUtilsTest { schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>()); private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata"; private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata"; - private static final boolean PARTITION_SPEC_STRICT_EQUALITY_CHECK = Boolean.TRUE; + private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = Boolean.TRUE; + private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = Boolean.FALSE; @Test public void testValidateSameSchema() throws IOException { IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec, - PARTITION_SPEC_STRICT_EQUALITY_CHECK + VALIDATE_STRICT_PARTITION_EQUALITY_TRUE ); } @@ -135,7 +136,7 @@ public void testValidateOneSchemaEvolvedFromIntToLongType() { public void testValidateSamePartitionSpec() throws IOException { IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1, - PARTITION_SPEC_STRICT_EQUALITY_CHECK + VALIDATE_STRICT_PARTITION_EQUALITY_TRUE ); } @@ -182,7 +183,7 @@ public void testStrictPartitionSpecEquality() throws IOException { IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( tableMetadataWithSchema1AndPartitionSpecWithTwoCols, updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, - false); + VALIDATE_STRICT_PARTITION_EQUALITY_FALSE); verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndPartitionSpecWithTwoCols, updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION); @@ -192,7 +193,7 @@ private void verifyFailUnlessCompatibleStructureIOException(TableMetadata tableA TableMetadata tableBMetadata, String expectedMessage) { IOException exception = Assert.expectThrows(IOException.class, () -> { IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata, tableBMetadata, - PARTITION_SPEC_STRICT_EQUALITY_CHECK); + VALIDATE_STRICT_PARTITION_EQUALITY_TRUE); }); Assert.assertTrue(exception.getMessage().startsWith(expectedMessage)); } From 0ffd8aef74aca1b017e447bf4c2b75c1d3d82c8c Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Tue, 29 Oct 2024 05:41:02 +0530 Subject: [PATCH 11/13] addressed review comments --- .../IcebergPartitionDatasetFinder.java | 10 ++-- ...cebergTableMetadataValidatorUtilsTest.java | 48 ++++++++++--------- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index 0d623982d8..21aad0e1e5 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -38,9 +38,9 @@ public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value"; - public static final String ICEBERG_DATASET_VALIDATE_STRICT_PARTITION_EQUALITY = ICEBERG_DATASET_PREFIX + "validate.strict.partition.equality"; - // Taking default value as true so that no partition spec evaluation is allowed on neither source nor destination - public static final String DEFAULT_ICEBERG_DATASET_VALIDATE_STRICT_PARTITION_EQUALITY = "true"; + public static final String ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + "partition.validate.strict.equality"; + // Taking the default value as true ensures that partition spec evaluation is not allowed on either the source or the destination + public static final String DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY= "true"; public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) { super(sourceFs, properties); @@ -50,8 +50,8 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException { - boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_VALIDATE_STRICT_PARTITION_EQUALITY, - DEFAULT_ICEBERG_DATASET_VALIDATE_STRICT_PARTITION_EQUALITY)); + boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY, + DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY)); IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality); diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java index 90c5fe34ea..2cbde2df95 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java @@ -63,56 +63,56 @@ public class IcebergTableMetadataValidatorUtilsTest { schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>()); private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata"; private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata"; - private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = Boolean.TRUE; - private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = Boolean.FALSE; + private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true; + private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = false; @Test public void testValidateSameSchema() throws IOException { IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec, VALIDATE_STRICT_PARTITION_EQUALITY_TRUE ); + Assert.assertTrue(true); } @Test - public void testValidateDifferentSchema() { + public void testValidateDifferentSchemaFails() { // Schema 1 and Schema 2 have different field order TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat, unpartitionedPartitionSpec, "tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>()); - verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema2AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); } @Test - public void testValidateSchemaWithDifferentTypes() { + public void testValidateSchemaWithDifferentTypesFails() { // schema 3 and schema 4 have different field types for field1 TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat, unpartitionedPartitionSpec, "tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>()); - verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema3AndUnpartitionedSpec, + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec, tableMetadataWithSchema4AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); } @Test - public void testValidateSchemaWithEvolvedSchemaI() { - // TODO: This test should pass in the future when we support schema evolution + public void testValidateSchemaWithEvolvedSchemaIFails() { // Schema 3 has one more extra field as compared to Schema 1 - verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); } @Test - public void testValidateSchemaWithEvolvedSchemaII() { + public void testValidateSchemaWithEvolvedSchemaIIFails() { // TODO: This test should pass in the future when we support schema evolution // Schema 3 has one more extra field as compared to Schema 1 - verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema3AndUnpartitionedSpec, + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); } @Test - public void testValidateOneSchemaEvolvedFromIntToLongType() { + public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() { // Adding this test as to verify that partition copy doesn't proceed further for this case // as while doing poc and testing had seen final commit gets fail if there is mismatch in field type // specially from int to long @@ -128,7 +128,7 @@ public void testValidateOneSchemaEvolvedFromIntToLongType() { TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4, partitionSpec, "tableLocationForSchema5WithPartitionSpec", new HashMap<>()); - verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION); } @@ -138,38 +138,39 @@ public void testValidateSamePartitionSpec() throws IOException { tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1, VALIDATE_STRICT_PARTITION_EQUALITY_TRUE ); + Assert.assertTrue(true); } @Test - public void testValidatePartitionSpecWithDiffName() { + public void testValidatePartitionSpecWithDiffNameFails() { PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1) .identity("field2") .build(); TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12, "tableLocationForSchema1WithPartitionSpec12", new HashMap<>()); - verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndPartitionSpec1, + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION); } @Test - public void testValidatePartitionSpecWithUnpartitioned() { - verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndUnpartitionedSpec, + public void testValidatePartitionSpecWithUnpartitionedFails() { + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION); } @Test - public void testPartitionSpecWithDifferentTransform() { + public void testPartitionSpecWithDifferentTransformFails() { PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1) .truncate("field1", 4) .build(); TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12, "tableLocationForSchema1WithPartitionSpec12", new HashMap<>()); - verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndPartitionSpec1, + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION); } @Test - public void testStrictPartitionSpecEquality() throws IOException { + public void testStrictPartitionSpecEqualityOffVsOn() throws IOException { PartitionSpec partitionSpecWithTwoCols = PartitionSpec.builderFor(schema1) .identity("field1") .identity("field2") @@ -184,12 +185,13 @@ public void testStrictPartitionSpecEquality() throws IOException { tableMetadataWithSchema1AndPartitionSpecWithTwoCols, updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, VALIDATE_STRICT_PARTITION_EQUALITY_FALSE); - - verifyFailUnlessCompatibleStructureIOException(tableMetadataWithSchema1AndPartitionSpecWithTwoCols, + Assert.assertTrue(true); // passes w/ non-strict equality... + // but fails when strict equality + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpecWithTwoCols, updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION); } - private void verifyFailUnlessCompatibleStructureIOException(TableMetadata tableAMetadata, + private void verifyStrictFailUnlessCompatibleStructureThrows(TableMetadata tableAMetadata, TableMetadata tableBMetadata, String expectedMessage) { IOException exception = Assert.expectThrows(IOException.class, () -> { IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata, tableBMetadata, From 3f7e05c278f1a3433a0f6019a439dd98d1d2b6e7 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Tue, 29 Oct 2024 08:48:57 +0530 Subject: [PATCH 12/13] changed comment statement --- .../management/copy/iceberg/IcebergPartitionDatasetFinder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index 21aad0e1e5..d0e88aa644 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -39,7 +39,7 @@ public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value"; public static final String ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + "partition.validate.strict.equality"; - // Taking the default value as true ensures that partition spec evaluation is not allowed on either the source or the destination + // true, to ensure equality of the partitions' specId as well as the partitions' fields' fieldId public static final String DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY= "true"; public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) { From 3332048d9c0baf577a4636d6960905d72a6eb3d2 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Tue, 29 Oct 2024 08:51:04 +0530 Subject: [PATCH 13/13] better comment --- .../management/copy/iceberg/IcebergPartitionDatasetFinder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index d0e88aa644..3d3e5c1917 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -39,7 +39,7 @@ public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value"; public static final String ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + "partition.validate.strict.equality"; - // true, to ensure equality of the partitions' specId as well as the partitions' fields' fieldId + // true, requires equality of the partitions' specId as well as the partitions' fields' fieldId public static final String DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY= "true"; public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) {