From 0ad93ba28aa36b2bff767e5f0fe2380cfb208798 Mon Sep 17 00:00:00 2001 From: Sushant Raikar Date: Tue, 17 Dec 2024 11:51:32 -0800 Subject: [PATCH] Revert "Skip field-id re-assignment during table creation (#247)" (#272) This reverts commit 6d69b6cd824ace8a27c18d181c3f189c972aec4c. ## Summary [Issue](https://github.com/linkedin/openhouse/issues/#nnn)] Briefly discuss the summary of the changes made in this pull request in 2-3 lines. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [X] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [ ] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [X] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request. --- .../internal/catalog/CatalogConstants.java | 2 - .../OpenHouseInternalTableOperations.java | 126 ------------ .../OpenHouseInternalTableOperationsTest.java | 189 ------------------ .../catalogtest/CatalogOperationTest.java | 69 ------- 4 files changed, 386 deletions(-) diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/CatalogConstants.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/CatalogConstants.java index 8d4a7449e..ebdbaf304 100644 --- a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/CatalogConstants.java +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/CatalogConstants.java @@ -17,8 +17,6 @@ public final class CatalogConstants { static final String FEATURE_TOGGLE_STOP_CREATE = "stop_create"; - static final String CLIENT_TABLE_SCHEMA = "client.table.schema"; - private CatalogConstants() { // Noop } diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java index 8fb6cdce1..5435b91ce 100644 --- a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java @@ -30,16 +30,9 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.iceberg.BaseMetastoreTableOperations; -import org.apache.iceberg.PartitionField; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SnapshotSummary; -import org.apache.iceberg.SortDirection; -import org.apache.iceberg.SortField; -import org.apache.iceberg.SortOrder; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableProperties; @@ -47,8 +40,6 @@ import org.apache.iceberg.exceptions.BadRequestException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; -import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.expressions.Term; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.springframework.data.util.Pair; @@ -177,26 +168,6 @@ public void commit(TableMetadata base, TableMetadata metadata) { @SuppressWarnings("checkstyle:MissingSwitchDefault") @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { - - /** - * During table creation, the table metadata object that arrives here has the field-ids - * reassigned from the client supplied schema.This code block creates a new table metadata - * object using the client supplied schema by preserving its field-ids. - */ - if (base == null && metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA) != null) { - Schema clientSchema = - SchemaParser.fromJson(metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA)); - metadata = - TableMetadata.buildFromEmpty() - .setLocation(metadata.location()) - .setCurrentSchema(clientSchema, metadata.lastColumnId()) - .addPartitionSpec( - rebuildPartitionSpec(metadata.spec(), metadata.schema(), clientSchema)) - .addSortOrder(rebuildSortOrder(metadata.sortOrder(), clientSchema)) - .setProperties(metadata.properties()) - .build(); - } - int version = currentVersion() + 1; CommitStatus commitStatus = CommitStatus.FAILURE; @@ -311,103 +282,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } } - /** - * Build a new partition spec with new schema from original pspec. The new pspec has the same - * partition fields as the original pspec with source ids from the new schema - * - * @param originalPspec - * @param originalSchema - * @param newSchema - * @return new partition spec - */ - static PartitionSpec rebuildPartitionSpec( - PartitionSpec originalPspec, Schema originalSchema, Schema newSchema) { - PartitionSpec.Builder builder = PartitionSpec.builderFor(newSchema); - - for (PartitionField field : originalPspec.fields()) { - // get field name from original schema using source id of partition field - // because Pspec appends _bucket and _trunc to field name for bucket and truncate fields - String fieldName = originalSchema.findField(field.sourceId()).name(); - // Check if the partition field is present in new schema - if (newSchema.findField(fieldName) == null) { - throw new IllegalArgumentException( - "Field " + fieldName + " does not exist in the new schema"); - } - // build the pspec from transform string representation - buildPspecFromTransform(builder, field, fieldName); - } - - return builder.build(); - } - - static void buildPspecFromTransform( - PartitionSpec.Builder builder, PartitionField field, String fieldName) { - // Recreate the transform using the string representation - String transformString = field.transform().toString(); - - // Add the field to the new PartitionSpec based on the transform type - if ("identity".equalsIgnoreCase(transformString)) { - builder.identity(fieldName); - } else if (transformString.startsWith("bucket[")) { - // Extract bucket number from the string (e.g., bucket[16]) - int numBuckets = - Integer.parseInt( - transformString.substring( - transformString.indexOf('[') + 1, transformString.indexOf(']'))); - builder.bucket(fieldName, numBuckets); - } else if (transformString.startsWith("truncate[")) { - // Extract width from the string (e.g., truncate[10]) - int width = - Integer.parseInt( - transformString.substring( - transformString.indexOf('[') + 1, transformString.indexOf(']'))); - builder.truncate(fieldName, width); - } else if ("year".equalsIgnoreCase(transformString)) { - builder.year(fieldName); - } else if ("month".equalsIgnoreCase(transformString)) { - builder.month(fieldName); - } else if ("day".equalsIgnoreCase(transformString)) { - builder.day(fieldName); - } else if ("hour".equalsIgnoreCase(transformString)) { - builder.hour(fieldName); - } else { - throw new UnsupportedOperationException("Unsupported transform: " + transformString); - } - } - - /** - * Build a new sort order with new schema from original sort order. The new sort order has the - * same fields as the original sort order with source ids from the new schema - * - * @param originalSortOrder - * @param newSchema - * @return new SortOrder - */ - static SortOrder rebuildSortOrder(SortOrder originalSortOrder, Schema newSchema) { - SortOrder.Builder builder = SortOrder.builderFor(newSchema); - - for (SortField field : originalSortOrder.fields()) { - // Find the field name in the original schema based on the sourceId - String fieldName = originalSortOrder.schema().findField(field.sourceId()).name(); - // Check if the sortorder field is present in new schema - if (newSchema.findField(fieldName) == null) { - throw new IllegalArgumentException( - "Field " + fieldName + " does not exist in the new schema"); - } - // Create a new SortField with the updated sourceId and original direction and null order - Term term = Expressions.ref(fieldName); - - // Apply sort direction and null ordering with the updated sourceId - if (field.direction() == SortDirection.ASC) { - builder.asc(term, field.nullOrder()); - } else { - builder.desc(term, field.nullOrder()); - } - } - - return builder.build(); - } - /** * If this commit comes from Iceberg built-in retry in * org.apache.iceberg.PropertiesUpdate#commit() Then throw fatal {@link CommitFailedException} to diff --git a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java index ede88fde5..f3f41d105 100644 --- a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java +++ b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java @@ -31,8 +31,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; -import org.apache.iceberg.SortDirection; -import org.apache.iceberg.SortOrder; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.catalog.TableIdentifier; @@ -634,191 +632,4 @@ void testDoCommitDeleteLastStagedSnapshotWhenNoRefs() throws IOException { Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.eq(mockHouseTable)); } } - - @Test - void testRebuildPartitionSpecUnpartitioned() { - Schema originalSchema = - new Schema(Types.NestedField.optional(1, "field1", Types.StringType.get())); - - PartitionSpec originalSpec = PartitionSpec.unpartitioned(); - PartitionSpec rebuiltSpec = - OpenHouseInternalTableOperations.rebuildPartitionSpec( - originalSpec, originalSchema, originalSchema); - - Assertions.assertNotNull(rebuiltSpec); - Assertions.assertTrue(rebuiltSpec.isUnpartitioned()); - } - - @Test - void testRebuildPartitionSpec_NewSchemaSameFieldIds() { - Schema originalSchema = - new Schema( - Types.NestedField.optional(1, "field1", Types.StringType.get()), - Types.NestedField.optional(2, "field2", Types.IntegerType.get()), - Types.NestedField.optional(3, "field3", Types.LongType.get()), - Types.NestedField.optional(4, "field4", Types.LongType.get())); - - PartitionSpec originalSpec = - PartitionSpec.builderFor(originalSchema) - .identity("field1") - .bucket("field2", 10) - .truncate("field3", 20) - .build(); - - PartitionSpec rebuiltSpec = - OpenHouseInternalTableOperations.rebuildPartitionSpec( - originalSpec, originalSchema, originalSchema); - - Assertions.assertNotNull(rebuiltSpec); - Assertions.assertEquals(0, rebuiltSpec.specId()); - Assertions.assertEquals(3, rebuiltSpec.fields().size()); - Assertions.assertEquals("field1", rebuiltSpec.fields().get(0).name()); - Assertions.assertEquals("identity", rebuiltSpec.fields().get(0).transform().toString()); - // field id in table schema should match sourceid in partition spec - Assertions.assertEquals(1, rebuiltSpec.fields().get(0).sourceId()); - // Iceberg internally appends _bucket to partition field name - Assertions.assertEquals("field2_bucket", rebuiltSpec.fields().get(1).name()); - Assertions.assertEquals("bucket[10]", rebuiltSpec.fields().get(1).transform().toString()); - Assertions.assertEquals(2, rebuiltSpec.fields().get(1).sourceId()); - // Iceberg internally appends _trunc to partition field name - Assertions.assertEquals("field3_trunc", rebuiltSpec.fields().get(2).name()); - Assertions.assertEquals("truncate[20]", rebuiltSpec.fields().get(2).transform().toString()); - Assertions.assertEquals(3, rebuiltSpec.fields().get(2).sourceId()); - } - - @Test - void testRebuildPartitionSpec_NewSchemaDifferentFieldIds() { - Schema originalSchema = - new Schema( - Types.NestedField.optional(1, "field1", Types.StringType.get()), - Types.NestedField.optional(2, "field2", Types.IntegerType.get()), - Types.NestedField.optional(3, "field3", Types.LongType.get()), - Types.NestedField.optional(4, "field4", Types.LongType.get())); - - PartitionSpec originalSpec = - PartitionSpec.builderFor(originalSchema) - .identity("field1") - .bucket("field2", 10) - .truncate("field3", 20) - .build(); - - // field2 and field3 have different fieldids compared to original schema - Schema newSchema = - new Schema( - Types.NestedField.optional(1, "field1", Types.StringType.get()), - Types.NestedField.optional(3, "field2", Types.IntegerType.get()), - Types.NestedField.optional(2, "field3", Types.LongType.get()), - Types.NestedField.optional(4, "field4", Types.LongType.get())); - - PartitionSpec rebuiltSpec = - OpenHouseInternalTableOperations.rebuildPartitionSpec( - originalSpec, originalSchema, newSchema); - - Assertions.assertNotNull(rebuiltSpec); - Assertions.assertEquals(0, rebuiltSpec.specId()); - Assertions.assertEquals(3, rebuiltSpec.fields().size()); - Assertions.assertEquals("field1", rebuiltSpec.fields().get(0).name()); - Assertions.assertEquals("identity", rebuiltSpec.fields().get(0).transform().toString()); - // field id in table schema should match sourceid in partition spec - Assertions.assertEquals(1, rebuiltSpec.fields().get(0).sourceId()); - // Iceberg internally appends _bucket to partition field name - Assertions.assertEquals("field2_bucket", rebuiltSpec.fields().get(1).name()); - Assertions.assertEquals("bucket[10]", rebuiltSpec.fields().get(1).transform().toString()); - Assertions.assertEquals(3, rebuiltSpec.fields().get(1).sourceId()); - // Iceberg internally appends _trunc to partition field name - Assertions.assertEquals("field3_trunc", rebuiltSpec.fields().get(2).name()); - Assertions.assertEquals("truncate[20]", rebuiltSpec.fields().get(2).transform().toString()); - Assertions.assertEquals(2, rebuiltSpec.fields().get(2).sourceId()); - } - - @Test - void testRebuildPartitionSpec_fieldMissingInNewSchema() { - Schema originalSchema = - new Schema(Types.NestedField.optional(1, "field1", Types.StringType.get())); - - PartitionSpec originalSpec = - PartitionSpec.builderFor(originalSchema).identity("field1").build(); - - Schema newSchema = new Schema(Types.NestedField.optional(2, "field2", Types.IntegerType.get())); - - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, - () -> - OpenHouseInternalTableOperations.rebuildPartitionSpec( - originalSpec, originalSchema, newSchema)); - - Assertions.assertEquals( - "Field field1 does not exist in the new schema", exception.getMessage()); - } - - @Test - void testRebuildSortOrder_NewSchemaSameFieldIds() { - Schema originalSchema = - new Schema( - Types.NestedField.optional(1, "field1", Types.StringType.get()), - Types.NestedField.optional(2, "field2", Types.IntegerType.get())); - - SortOrder originalSortOrder = - SortOrder.builderFor(originalSchema).asc("field1").desc("field2").build(); - - Schema newSchema = - new Schema( - Types.NestedField.optional(1, "field1", Types.StringType.get()), - Types.NestedField.optional(2, "field2", Types.IntegerType.get())); - - SortOrder rebuiltSortOrder = - OpenHouseInternalTableOperations.rebuildSortOrder(originalSortOrder, newSchema); - - Assertions.assertNotNull(rebuiltSortOrder); - Assertions.assertEquals(2, rebuiltSortOrder.fields().size()); - Assertions.assertEquals(SortDirection.ASC, rebuiltSortOrder.fields().get(0).direction()); - Assertions.assertEquals(1, rebuiltSortOrder.fields().get(0).sourceId()); - Assertions.assertEquals(SortDirection.DESC, rebuiltSortOrder.fields().get(1).direction()); - Assertions.assertEquals(2, rebuiltSortOrder.fields().get(1).sourceId()); - } - - @Test - void testRebuildSortOrder_NewSchemaDifferentFieldIds() { - Schema originalSchema = - new Schema( - Types.NestedField.optional(1, "field1", Types.StringType.get()), - Types.NestedField.optional(2, "field2", Types.IntegerType.get())); - - SortOrder originalSortOrder = - SortOrder.builderFor(originalSchema).asc("field1").desc("field2").build(); - - Schema newSchema = - new Schema( - Types.NestedField.optional(2, "field1", Types.StringType.get()), - Types.NestedField.optional(1, "field2", Types.IntegerType.get())); - - SortOrder rebuiltSortOrder = - OpenHouseInternalTableOperations.rebuildSortOrder(originalSortOrder, newSchema); - - Assertions.assertNotNull(rebuiltSortOrder); - Assertions.assertEquals(2, rebuiltSortOrder.fields().size()); - Assertions.assertEquals(SortDirection.ASC, rebuiltSortOrder.fields().get(0).direction()); - Assertions.assertEquals(2, rebuiltSortOrder.fields().get(0).sourceId()); - Assertions.assertEquals(SortDirection.DESC, rebuiltSortOrder.fields().get(1).direction()); - Assertions.assertEquals(1, rebuiltSortOrder.fields().get(1).sourceId()); - } - - @Test - void testRebuildSortOrder_fieldMissingInNewSchema() { - Schema originalSchema = - new Schema(Types.NestedField.optional(1, "field1", Types.StringType.get())); - - SortOrder originalSortOrder = SortOrder.builderFor(originalSchema).asc("field1").build(); - - Schema newSchema = new Schema(Types.NestedField.optional(2, "field2", Types.IntegerType.get())); - - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, - () -> OpenHouseInternalTableOperations.rebuildSortOrder(originalSortOrder, newSchema)); - - Assertions.assertEquals( - "Field field1 does not exist in the new schema", exception.getMessage()); - } } diff --git a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java index 737ecda78..19e3e5a81 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java +++ b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java @@ -1,6 +1,5 @@ package com.linkedin.openhouse.spark.catalogtest; -import com.google.common.collect.Sets; import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; import java.util.HashMap; import java.util.Map; @@ -10,7 +9,6 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -70,73 +68,6 @@ public void testCatalogWriteAPI() throws Exception { } } - @Test - public void testCreateReplicaSkipFieldIdReassignmentUnPartitionedTable() throws Exception { - try (SparkSession spark = getSparkSession()) { - Catalog icebergCatalog = getOpenHouseCatalog(spark); - Schema schema = - new Schema( - Types.NestedField.required( - 1, - "a", - Types.StructType.of(Types.NestedField.required(2, "b", Types.StringType.get()))), - Types.NestedField.required(3, "c", Types.StringType.get())); - - // Field ids not reassigned - TableIdentifier tableIdentifier = TableIdentifier.of("replication_test", "t1"); - Map props = new HashMap<>(); - props.put("client.table.schema", SchemaParser.toJson(schema)); - Table table = icebergCatalog.createTable(tableIdentifier, schema, null, props); - Schema schemaAfterCreation = table.schema(); - Assertions.assertTrue(schemaAfterCreation.sameSchema(schema)); - Assertions.assertEquals(1, schemaAfterCreation.findField("a").fieldId()); - Assertions.assertNotEquals(3, schemaAfterCreation.findField("a.b").fieldId()); - Assertions.assertNotEquals(2, schemaAfterCreation.findField("c").fieldId()); - // Evolve schema, add top level column d (should work as before) - table.updateSchema().addColumn("d", Types.StringType.get()).commit(); - Assertions.assertEquals(4, table.schema().findField("d").fieldId()); - // Evolve schema, add child column e to a (should work as before) - table.updateSchema().addColumn("a", "e", Types.StringType.get()).commit(); - Assertions.assertEquals(5, table.schema().findField("a.e").fieldId()); - } - } - - @Test - public void testCreateReplicaSkipFieldIdReassignmentPartitionedTable() throws Exception { - try (SparkSession spark = getSparkSession()) { - Catalog icebergCatalog = getOpenHouseCatalog(spark); - Schema schema = - new Schema( - Types.NestedField.required( - 1, - "a", - Types.StructType.of(Types.NestedField.required(2, "b", Types.StringType.get()))), - Types.NestedField.required(3, "c", Types.StringType.get())); - // Partition spec with identity partitioning on c - PartitionSpec partitionSpec = PartitionSpec.builderFor(schema).identity("c").build(); - - // Field ids not reassigned - TableIdentifier tableIdentifier = TableIdentifier.of("replication_test", "t2"); - Map props = new HashMap<>(); - props.put("client.table.schema", SchemaParser.toJson(schema)); - Table table = icebergCatalog.createTable(tableIdentifier, schema, null, props); - Schema schemaAfterCreation = table.schema(); - Assertions.assertTrue(schemaAfterCreation.sameSchema(schema)); - Assertions.assertEquals(1, schemaAfterCreation.findField("a").fieldId()); - Assertions.assertNotEquals(3, schemaAfterCreation.findField("a.b").fieldId()); - Assertions.assertNotEquals(2, schemaAfterCreation.findField("c").fieldId()); - PartitionSpec pspecAfterCreation = table.spec(); - // pspec on c changes to 2 - Assertions.assertNotEquals(Sets.newHashSet(3), pspecAfterCreation.identitySourceIds()); - // Evolve schema, add top level column d (should work as before) - table.updateSchema().addColumn("d", Types.StringType.get()).commit(); - Assertions.assertEquals(4, table.schema().findField("d").fieldId()); - // Evolve schema, add child column e to a (should work as before) - table.updateSchema().addColumn("a", "e", Types.StringType.get()).commit(); - Assertions.assertEquals(5, table.schema().findField("a.e").fieldId()); - } - } - /** * This is a copy of com.linkedin.openhouse.jobs.spark.Operations#getCatalog() temporarily. * Refactoring these pieces require deployment coordination, thus we shall create an artifact