From e1e6f571cb4bbf5e730028afa52607a640aa14b2 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Tue, 8 Oct 2024 14:56:45 +0530 Subject: [PATCH] addressed pr comments and added few extra logs --- .../IcebergOverwritePartitionsStep.java | 4 +- .../copy/iceberg/IcebergPartitionDataset.java | 7 ++++ .../IcebergPartitionDatasetFinder.java | 3 -- .../management/copy/iceberg/IcebergTable.java | 14 +++---- .../IcebergOverwritePartitionsStepTest.java | 37 +++++-------------- .../copy/iceberg/IcebergTableTest.java | 31 ++++++++++------ 6 files changed, 43 insertions(+), 53 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java index 968b6fcce9..375c050b3b 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java @@ -72,7 +72,7 @@ public class IcebergOverwritePartitionsStep implements CommitStep { * Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters. * * @param destTableIdStr the identifier of the destination table as a string - * @param serializedDataFiles the serialized data files to be used for replacing partitions + * @param serializedDataFiles [from List] the serialized data files to be used for replacing partitions * @param properties the properties containing configuration */ public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, byte[] serializedDataFiles, Properties properties) { @@ -106,7 +106,7 @@ public void execute() throws IOException { ); Retryer overwritePartitionsRetryer = createOverwritePartitionsRetryer(); overwritePartitionsRetryer.call(() -> { - destTable.overwritePartitions(dataFiles, this.partitionColName, this.partitionValue); + destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue); return null; }); log.info("Overwriting Data files completed for partition {} with value {} for destination table : {} ", diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java index 1f1e5feba4..f2b496d752 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java @@ -17,6 +17,7 @@ package org.apache.gobblin.data.management.copy.iceberg; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -91,6 +92,9 @@ private Predicate createPartitionFilterPredicate() throws IcebergTab srcTableMetadata, supportedTransforms ); + Preconditions.checkArgument(partitionColumnIndex >= 0, String.format( + "Partition column %s not found in table %s", + this.partitionColumnName, this.getFileSetId())); return new IcebergMatchesAnyPropNamePartitionFilterPredicate(partitionColumnIndex, this.partitionColValue); } @@ -152,6 +156,8 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati private List getDestDataFiles(List srcDataFiles) throws IcebergTable.TableNotFoundException { List destDataFiles = new ArrayList<>(); if (srcDataFiles.isEmpty()) { + log.warn("No data files found for partition col : {} with partition value : {} in source table : {}", + this.partitionColumnName, this.partitionColValue, this.getFileSetId()); return destDataFiles; } TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); @@ -179,6 +185,7 @@ private List getDestDataFiles(List srcDataFiles) throws Iceb .withPath(updatedDestFilePath.toString()) .build()); // Store the mapping of srcPath to destPath to be used in creating list of src file status to dest path + log.debug("Path changed from Src : {} to Dest : {}", srcFilePath, updatedDestFilePath); srcPathToDestPath.put(new Path(srcFilePath), updatedDestFilePath); if (growthMilestoneTracker.isAnotherMilestone(destDataFiles.size())) { log.info("Generated {} destination data files", destDataFiles.size()); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index b2ac1fc1bc..34859e0012 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -46,9 +46,6 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { // TODO: Add Validator for source and destination tables later -// TableMetadata srcTableMetadata = srcIcebergTable.accessTableMetadata(); -// TableMetadata destTableMetadata = destIcebergTable.accessTableMetadata(); -// IcebergTableMetadataValidator.validateSourceAndDestinationTablesMetadata(srcTableMetadata, destTableMetadata); String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, ICEBERG_PARTITION_NAME_KEY); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index 1c4bfdfbb2..71df4a23a3 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -88,11 +88,6 @@ public TableNotFoundException(TableIdentifier tableId) { private final String catalogUri; private final Table table; - @VisibleForTesting - IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri) { - this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri, null); - } - @VisibleForTesting IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri, Table table) { this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri, table); @@ -265,23 +260,26 @@ public List getPartitionSpecificDataFiles(Predicate iceber } /** - * Overwrite partitions in the table with the specified list of data files. + * Overwrite partition data files in the table for the specified partition col name & partition value. *

- * Overwrite partition replaces the partitions using the expression filter provided. + * Overwrite partition replaces the partition using the expression filter provided. *

* @param dataFiles the list of data files to replace partitions with * @param partitionColName the partition column name whose data files are to be replaced * @param partitionValue the partition column value on which data files will be replaced */ - protected void overwritePartitions(List dataFiles, String partitionColName, String partitionValue) { + protected void overwritePartition(List dataFiles, String partitionColName, String partitionValue) + throws TableNotFoundException { if (dataFiles.isEmpty()) { return; } + log.info("SnapshotId before overwrite: {}", accessTableMetadata().currentSnapshot().snapshotId()); OverwriteFiles overwriteFiles = this.table.newOverwrite(); overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue)); dataFiles.forEach(overwriteFiles::addFile); overwriteFiles.commit(); this.tableOps.refresh(); + log.info("SnapshotId after overwrite: {}", accessTableMetadata().currentSnapshot().snapshotId()); } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java index 46b0f4e82c..98430d9c1b 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java @@ -86,10 +86,10 @@ public void testIsCompleted() { @Test public void testExecute() { try { - Mockito.doNothing().when(mockIcebergTable).overwritePartitions(Mockito.anyList(), Mockito.anyString(), + Mockito.doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(), Mockito.anyString(), Mockito.anyString()); mockIcebergOverwritePartitionsStep.execute(); - Mockito.verify(mockIcebergTable, Mockito.times(1)).overwritePartitions(Mockito.anyList(), + Mockito.verify(mockIcebergTable, Mockito.times(1)).overwritePartition(Mockito.anyList(), Mockito.anyString(), Mockito.anyString()); } catch (IOException e) { Assert.fail(String.format("Unexpected IOException : %s", e)); @@ -100,10 +100,10 @@ public void testExecute() { public void testExecuteWithRetry() { try { // first call throw exception which will be retried and on second call nothing happens - Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartitions(Mockito.anyList(), + Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(), Mockito.anyString(), Mockito.anyString()); mockIcebergOverwritePartitionsStep.execute(); - Mockito.verify(mockIcebergTable, Mockito.times(2)).overwritePartitions(Mockito.anyList(), + Mockito.verify(mockIcebergTable, Mockito.times(2)).overwritePartition(Mockito.anyList(), Mockito.anyString(), Mockito.anyString()); } catch (IOException e) { Assert.fail(String.format("Unexpected IOException : %s", e)); @@ -111,14 +111,14 @@ public void testExecuteWithRetry() { } @Test - public void testExecuteWithDefaultRetry() { + public void testExecuteWithDefaultRetry() throws IcebergTable.TableNotFoundException { try { // Always throw exception - Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartitions(Mockito.anyList(), + Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(), Mockito.anyString(), Mockito.anyString()); mockIcebergOverwritePartitionsStep.execute(); } catch (RuntimeException e) { - Mockito.verify(mockIcebergTable, Mockito.times(3)).overwritePartitions(Mockito.anyList(), + Mockito.verify(mockIcebergTable, Mockito.times(3)).overwritePartition(Mockito.anyList(), Mockito.anyString(), Mockito.anyString()); assertRetryTimes(e, 3); } catch (IOException e) { @@ -126,25 +126,6 @@ public void testExecuteWithDefaultRetry() { } } - /** Disabling this test to avoid interrupting thread */ - @Test(enabled = false) - public void testExecuteWithRetryAndInterrupt() { - // first call throw exception which will be retried and on second call nothing happens - Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartitions(Mockito.anyList(), - Mockito.anyString(), Mockito.anyString()); - Thread.currentThread().interrupt(); - try { - mockIcebergOverwritePartitionsStep.execute(); - Assert.fail("Expected Runtime Exception to be thrown"); - } catch (RuntimeException e) { - Assert.assertTrue(e.getMessage().startsWith( - String.format("Failed to overwrite partition for destination table : {%s} : (retried 1 times) ... then interrupted ", destTableIdStr)), - e.getMessage()); - } catch (IOException e) { - Assert.fail("Expected Runtime Exception to be thrown"); - } - } - @Test public void testExecuteWithCustomRetryConfig() throws IOException { int retryCount = 7; @@ -156,11 +137,11 @@ public void testExecuteWithCustomRetryConfig() throws IOException { Mockito.doReturn(mockIcebergCatalog).when(mockIcebergOverwritePartitionsStep).createDestinationCatalog(); try { // Always throw exception - Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartitions(Mockito.anyList(), + Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(), Mockito.anyString(), Mockito.anyString()); mockIcebergOverwritePartitionsStep.execute(); } catch (RuntimeException e) { - Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwritePartitions(Mockito.anyList(), + Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwritePartition(Mockito.anyList(), Mockito.anyString(), Mockito.anyString()); assertRetryTimes(e, retryCount); } catch (IOException e) { diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index 4617011d74..b72eda5282 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -45,6 +45,7 @@ import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hive.HiveMetastoreTest; import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder; import org.testng.Assert; @@ -111,15 +112,17 @@ public void testGetCurrentSnapshotInfo() throws IOException { ); initializeSnapshots(table, perSnapshotFilesets); - IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getCurrentSnapshotInfo(); + IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri, + catalog.loadTable(tableId)).getCurrentSnapshotInfo(); verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, perSnapshotFilesets.size()); } /** Verify failure when attempting to get current snapshot info for non-existent table */ - @Test(expectedExceptions = IcebergTable.TableNotFoundException.class) + @Test(expectedExceptions = {IcebergTable.TableNotFoundException.class, NoSuchTableException.class}) public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException { TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + "_BOGUS"); - IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId), catalogUri).getCurrentSnapshotInfo(); + IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId), catalogUri, + catalog.loadTable(bogusTableId)).getCurrentSnapshotInfo(); Assert.fail("expected an exception when using table ID '" + bogusTableId + "'"); } @@ -134,7 +137,8 @@ public void testGetAllSnapshotInfosIterator() throws IOException { ); initializeSnapshots(table, perSnapshotFilesets); - List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getAllSnapshotInfosIterator()); + List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), + catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator()); Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots"); for (int i = 0; i < snapshotInfos.size(); ++i) { @@ -154,7 +158,8 @@ public void testGetIncrementalSnapshotInfosIterator() throws IOException { ); initializeSnapshots(table, perSnapshotFilesets); - List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator()); + List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), + catalogUri, catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator()); Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots"); for (int i = 0; i < snapshotInfos.size(); ++i) { @@ -174,7 +179,8 @@ public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles() throws IOExce ); initializeSnapshots(table, perSnapshotFilesets); - List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator()); + List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), + catalogUri, catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator()); Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots"); for (int i = 0; i < snapshotInfos.size(); ++i) { @@ -205,7 +211,8 @@ public void testNewTablePropertiesAreRegistered() throws Exception { TableIdentifier destTableId = TableIdentifier.of(dbName, "destTable"); catalog.createTable(destTableId, icebergSchema, null, destTableProperties); - IcebergTable destIcebergTable = new IcebergTable(destTableId, catalog.newTableOps(destTableId), catalogUri); + IcebergTable destIcebergTable = new IcebergTable(destTableId, catalog.newTableOps(destTableId), catalogUri, + catalog.loadTable(destTableId)); // Mock a source table with the same table UUID copying new properties TableMetadata newSourceTableProperties = destIcebergTable.accessTableMetadata().replaceProperties(srcTableProperties); @@ -339,7 +346,7 @@ protected static > List flatten(Collection cc) return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } - @Test(enabled = false) + @Test(groups = {"disabledOnCI"}) public void testGetPartitionSpecificDataFiles() throws IOException { TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); @@ -371,8 +378,8 @@ public void testGetPartitionSpecificDataFiles() throws IOException { catalog.dropTable(testTableId); } - @Test(enabled = false) - public void testOverwritePartitions() throws IOException { + @Test(groups = {"disabledOnCI"}) + public void testOverwritePartition() throws IOException { TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); @@ -409,7 +416,7 @@ public void testOverwritePartitions() throws IOException { List dataFiles2 = getDataFiles(paths2, partitionDataList2); // here, since partition data with value 2 doesn't exist yet, we expect it to get added to the table - icebergTable.overwritePartitions(dataFiles2, "id", "2"); + icebergTable.overwritePartition(dataFiles2, "id", "2"); List expectedPaths2 = new ArrayList<>(paths); expectedPaths2.addAll(paths2); verifyAnyOrder(expectedPaths2, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); @@ -421,7 +428,7 @@ public void testOverwritePartitions() throws IOException { // Reusing same partition dats to create data file with different paths List dataFiles3 = getDataFiles(paths3, partitionDataList); // here, since partition data with value 1 already exists, we expect it to get updated in the table with newer path - icebergTable.overwritePartitions(dataFiles3, "id", "1"); + icebergTable.overwritePartition(dataFiles3, "id", "1"); List expectedPaths3 = new ArrayList<>(paths2); expectedPaths3.addAll(paths3); verifyAnyOrder(expectedPaths3, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match");