diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index 581a265e38..3d3e5c1917 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -38,6 +38,9 @@ public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value"; + public static final String ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + "partition.validate.strict.equality"; + // true, requires equality of the partitions' specId as well as the partitions' fields' fieldId + public static final String DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY= "true"; public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) { super(sourceFs, properties); @@ -46,7 +49,12 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) @Override protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException { -// TODO: Add Validator for source and destination tables later + + boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY, + DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY)); + + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( + srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality); String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, ICEBERG_PARTITION_NAME_KEY); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java new file mode 100644 index 0000000000..7c47f2dfb8 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; + +import lombok.extern.slf4j.Slf4j; + +/** + * Validator for Iceberg table metadata, ensuring that the given tables metadata have same schema and partition spec. + */ +@Slf4j +public class IcebergTableMetadataValidatorUtils { + + private IcebergTableMetadataValidatorUtils() { + // Do not instantiate + } + + /** + * Compares the metadata of the given two iceberg tables. + * + * @param tableMetadataA the metadata of the first table + * @param tableMetadataB the metadata of the second table + * @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison + * @throws IOException if the schemas or partition spec do not match + */ + public static void failUnlessCompatibleStructure(TableMetadata tableMetadataA, + TableMetadata tableMetadataB, boolean validateStrictPartitionEquality) throws IOException { + log.info("Starting comparison between iceberg tables with metadata file location : {} and {}", + tableMetadataA.metadataFileLocation(), + tableMetadataB.metadataFileLocation()); + + Schema schemaA = tableMetadataA.schema(); + Schema schemaB = tableMetadataB.schema(); + // TODO: Need to add support for schema evolution + // This check needs to be broken down into multiple checks to support schema evolution + // Possible cases - schemaA == schemaB, + // - schemaA is subset of schemaB [ schemaB Evolved ], + // - schemaA is superset of schemaB [ schemaA Evolved ], + // - Other cases? + // Also consider using Strategy or any other design pattern for this to make it a better solution + if (!schemaA.sameSchema(schemaB)) { + String errMsg = String.format( + "Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}", + tableMetadataA.metadataFileLocation(), + schemaA.schemaId(), + tableMetadataB.metadataFileLocation(), + schemaB.schemaId() + ); + log.error(errMsg); + throw new IOException(errMsg); + } + + PartitionSpec partitionSpecA = tableMetadataA.spec(); + PartitionSpec partitionSpecB = tableMetadataB.spec(); + // .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does + boolean partitionSpecMatch = validateStrictPartitionEquality ? partitionSpecA.equals(partitionSpecB) + : partitionSpecA.compatibleWith(partitionSpecB); + if (!partitionSpecMatch) { + String errMsg = String.format( + "Partition Spec Mismatch between Metadata{%s} - PartitionSpecId{%d} and Metadata{%s} - PartitionSpecId{%d}", + tableMetadataA.metadataFileLocation(), + partitionSpecA.specId(), + tableMetadataB.metadataFileLocation(), + partitionSpecB.specId() + ); + log.error(errMsg); + throw new IOException(errMsg); + } + + log.info("Comparison completed successfully between iceberg tables with metadata file location : {} and {}", + tableMetadataA.metadataFileLocation(), + tableMetadataB.metadataFileLocation()); + } +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java new file mode 100644 index 0000000000..2cbde2df95 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class IcebergTableMetadataValidatorUtilsTest { + private static final PartitionSpec unpartitionedPartitionSpec = PartitionSpec.unpartitioned(); + private static final Schema schema1 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1") + .fields() + .requiredString("field1") + .requiredString("field2") + .endRecord()); + private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2") + .fields() + .requiredString("field2") + .requiredString("field1") + .endRecord()); + private static final Schema schema3 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3") + .fields() + .requiredString("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord()); + private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4") + .fields() + .requiredInt("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord()); + private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1) + .identity("field1") + .build(); + private static final TableMetadata tableMetadataWithSchema1AndUnpartitionedSpec = TableMetadata.newTableMetadata( + schema1, unpartitionedPartitionSpec, "tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>()); + private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 = TableMetadata.newTableMetadata( + schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", new HashMap<>()); + private static final TableMetadata tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata( + schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>()); + private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata"; + private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata"; + private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true; + private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = false; + @Test + public void testValidateSameSchema() throws IOException { + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( + tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec, + VALIDATE_STRICT_PARTITION_EQUALITY_TRUE + ); + Assert.assertTrue(true); + } + + @Test + public void testValidateDifferentSchemaFails() { + // Schema 1 and Schema 2 have different field order + + TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat, + unpartitionedPartitionSpec, "tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>()); + + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec, + tableMetadataWithSchema2AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateSchemaWithDifferentTypesFails() { + // schema 3 and schema 4 have different field types for field1 + + TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat, + unpartitionedPartitionSpec, "tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>()); + + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec, + tableMetadataWithSchema4AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateSchemaWithEvolvedSchemaIFails() { + // Schema 3 has one more extra field as compared to Schema 1 + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec, + tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateSchemaWithEvolvedSchemaIIFails() { + // TODO: This test should pass in the future when we support schema evolution + // Schema 3 has one more extra field as compared to Schema 1 + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec, + tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() { + // Adding this test as to verify that partition copy doesn't proceed further for this case + // as while doing poc and testing had seen final commit gets fail if there is mismatch in field type + // specially from int to long + Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5") + .fields() + .requiredLong("field1") + .requiredString("field2") + .requiredInt("field3") + .endRecord()); + PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4) + .identity("field1") + .build(); + TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4, + partitionSpec, "tableLocationForSchema5WithPartitionSpec", new HashMap<>()); + + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec, + tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION); + } + + @Test + public void testValidateSamePartitionSpec() throws IOException { + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( + tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1, + VALIDATE_STRICT_PARTITION_EQUALITY_TRUE + ); + Assert.assertTrue(true); + } + + @Test + public void testValidatePartitionSpecWithDiffNameFails() { + PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1) + .identity("field2") + .build(); + TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12, + "tableLocationForSchema1WithPartitionSpec12", new HashMap<>()); + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1, + tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + @Test + public void testValidatePartitionSpecWithUnpartitionedFails() { + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec, + tableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + @Test + public void testPartitionSpecWithDifferentTransformFails() { + PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1) + .truncate("field1", 4) + .build(); + TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12, + "tableLocationForSchema1WithPartitionSpec12", new HashMap<>()); + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1, + tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + @Test + public void testStrictPartitionSpecEqualityOffVsOn() throws IOException { + PartitionSpec partitionSpecWithTwoCols = PartitionSpec.builderFor(schema1) + .identity("field1") + .identity("field2") + .build(); + + TableMetadata tableMetadataWithSchema1AndPartitionSpecWithTwoCols = TableMetadata.newTableMetadata(schema1, + partitionSpecWithTwoCols, "tableLocationForSchema1WithPartitionSpecWithTwoCols", new HashMap<>()); + TableMetadata updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1 = tableMetadataWithSchema1AndPartitionSpec1 + .updatePartitionSpec(tableMetadataWithSchema1AndPartitionSpecWithTwoCols.spec()); + + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( + tableMetadataWithSchema1AndPartitionSpecWithTwoCols, + updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, + VALIDATE_STRICT_PARTITION_EQUALITY_FALSE); + Assert.assertTrue(true); // passes w/ non-strict equality... + // but fails when strict equality + verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpecWithTwoCols, + updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION); + } + + private void verifyStrictFailUnlessCompatibleStructureThrows(TableMetadata tableAMetadata, + TableMetadata tableBMetadata, String expectedMessage) { + IOException exception = Assert.expectThrows(IOException.class, () -> { + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata, tableBMetadata, + VALIDATE_STRICT_PARTITION_EQUALITY_TRUE); + }); + Assert.assertTrue(exception.getMessage().startsWith(expectedMessage)); + } +}